vlambda博客
学习文章列表

响应式编程--JAVA成长之路

响应式编程关键性概念就是事件,在某种程度上,这并不是什么新东西。事件总线(Event buses)或咱们常见的单击事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义操作(原文:side effects,副作用,本文皆翻译为自定义操作)。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和悬停事件数据流。流廉价且无处不在,任何事物都可以当作一个流:变量、用户输入、属性、缓存、数据结构等等。比如,假设你的微博评论就是一个跟单击事件一样的数据流,你能够监听这个流,并做出响应。

最重要的是,有一堆的函数能够创建(create)任何流,也能将任何流进行组合(combine)和过滤(filter)。这正是“函数式”的魔力所在。一个流能作为另一个流的输入(input),甚至多个流也可以作为其它流的输入。你能合并(merge)两个流。你还能通过过滤(filter)一个流得到那些你感兴趣的事件。你能将一个流中的数据映射(map)到一个新的流中。

一个流就是一个将要发生的以时间为序的事件序列。它能发射出三种不同的东西:一个数据值(data value)(某种类型的),一个错误(error)或者一个“完成(completed)”的信号。比如说,当前按钮所在的窗口或视图关闭时,“单击”事件流也就“完成”了。

我们只能异步地捕获这些发出的事件:定义一个针对数据值的函数,在发出一个值时,该函数就会异步地执行;针对发出错误时的函数;还有针对发出‘完成’时的函数。有时你可以省略这最后两个函数,只专注于针对数据值的函数。“监听”流的行为叫做订阅。我们定义的这些函数就是观察者。这个流就是被观察的主体(subject)(或“可观察的(observable)”)。这正是观察者设计模式。

RxJava是什么

Rx — Reactive Extensions 原来是由微软提出的一个综合了异步和基于事件驱动编程的库包,使用可观察序列和LINQ-style查询操作。

RxJava — Reactive Extension for Java。是最开始根据微软的 RX为基础,由Netflix主导做出的提供在JVM上实现响应式编程的一种方式。

RxJava是一种在JVM上实现异步数据处理的库,是基于事件的扩展的观察模式

RxJava特点

jar包很小 < 1MB
轻量级框架
支持Java 8 lambda
支持Java 6+ & Android 2.3+
支持同步和异步
使用简洁
解耦、单一化、不嵌套

扩展的观察者

onCompleted()事件
onError()事件
组合而不是嵌套,避免陷入回调地域

RxAndroid是什么

是Rxjava针对Android平台的一个扩展,用于Android开发
提供响应式扩展组建快速方便开发android应用程序

Schedulers(调度器)

Schedulers在RxAndroid中解决Android主线程问题【针对Android】,解决多线程线程问题。

在不指定特定线程的情况下,RxAndroid遵循的是线程不变原则 ,在哪个线程调用subscribe()就在哪个线程生产事件,在哪个线程生成事件就在哪个线程消费事件。如果我们要让事件切换线程就要用到Schedulers.

Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

RxJava和观察者模式

观察者模式的四大要素:
Observable 被观察者
Observer 观察者
subscribe 订阅
事件

观察者模式本质上是通过订阅将被观察者产生的事件传递给观察者

RxJava扩展的观察者模式

RxJava中相对于观察者模式多了onCompleted()和onError()两个方法,这两个互斥的方法能唯一确定事件的结束。

RxJava实践

RxJava 的 Helloword

private void testRxJava(){
//创建被观察者
Observable mObservable = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hellowrod!");
subscriber.onCompleted();
}
});

//创建观察者
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};

//订阅事件
mObservable.subscribe(subscriber);

}
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

  • 22

  • 23

  • 24

  • 25

  • 26

  • 27

  • 28

  • 29

  • 30

  • 31

  • 32

  • 33

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

RxJava中的不完整回调

Action1和Action0都是RxJava的一个接口

Action0它只有一个方法call(),这个方法无参数返回值,可以和无参数的onCompleted()结合使用。

Action1也有一个方法call(T param),这个方法有一个参数返回可以和有一个参数的onNext(T obj)和onError(Throwable error)结合使用。

Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

  • 22

  • 23

  • 24

  • 25

  • 26

  • 27

  • 28

线程控制

Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});