RxJava入门学习笔记

什么是RxJava?

RxJava的自我介绍是一个在 Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。这是一个非常精准的概括,但是用“异步”两字概括也许更为通俗易懂。此文将简要记录学习RxJava的笔记。

参考文献
给Android开发者的RxJava详解

基本概念

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava的使用

首先注意的是此处使用到RxJava不是 java.util包中的Observer和Observable,而是一个Github上的RxJava开源项目,JAR包可在此处下载。

创建观察者

观察者是Observer或者Subscriber,Subscriber是实现了前者接口的接口,扩充了 onStart()unsubscribe()方法。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Observer<String> observer = new Observer<String>() {

@Override
public void onCompleted() {
System.out.println("onCompleted");

}

@Override
public void onError(Throwable arg0) {
System.out.println("onError");

}

@Override
public void onNext(String arg0) {
System.out.println("item: " + arg0);

}

};

或者使用Subscriber:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Subscriber<String> subscriber = new Subscriber<String>() {

@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable arg0) {
System.out.println("onError");
}

@Override
public void onNext(String arg0) {
System.out.println("item: " + arg0);
}

};

创建被观察者

被观察者: 决定什么时候触发事件以及触发怎样的事件。使用 create()方法来获取Observable对象,并制定触发规则。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
private Observable<String> observable = Observable
.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> s) {
s.onNext("Hello");
s.onNext("World");
s.onNext("!");
s.onCompleted();
}

});

OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发。

这个例子很简单:事件的内容是字符串,而不是一些复杂的对象;事件的内容是已经定好了的,而不像有的观察者模式一样是待确定的(例如网络请求的结果在请求返回之前是未知的);所有事件在一瞬间被全部发送出去,而不是夹杂一些确定或不确定的时间间隔或者经过某种触发器来触发的。总之,这个例子看起来毫无实用价值。但这是为了便于说明,实质上只要你想,各种各样的事件发送规则你都可以自己来写。至于具体怎么做,后面都会讲到,但现在不行。只有把基础原理先说明白了,上层的运用才能更容易说清楚。

订阅

订阅绑定观察者与被观察者:

1
observable.subscribe(observer);

运行结果

此时控制台会依次打印出:

1
item: Hello
item: World
item: !
onCompleted