ReactiveX的发展史

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源。Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io

ReactiveX是什么

ReactiveX是一个使用可观察数据流进行异步或基于时间编程的规范库,他结合了观察者模式,迭代器模式和函数式编程的优势。

  • 创建:轻松创建事件流或数据流。
  • 组合:使用类似查询的运算符组合和转换流
  • 监听:订阅任何可观察的流来完成一些附加的功能。

Observable很好的解决了处理异步数据序列的问题。

单个内容 多个内容
同步 T getData() Iterable<T> getData()
异步 Future<T> getData() Observable<T> getData()

但将Observable称为“函数式响应编程”是非常不严谨的。ReactiveX可能具有函数式和(或)响应式的特点,但是跟“函数式响应编程”是完全不一样的。 一个主要的区别是,函数式响应编程对随时间连续变化的值进行操作,而ReactiveX对随时间发出的离散值进行操作。(参考 Conal Elliott’s work for more-precise information on functional reactive programming.)

Observable的优势

通过对观察者模式的扩展支持了数据和(或)事件序列,并添提供了一些运算符,这些运算符提供了以声明方式组合序列的能力,而无需关注线程、同步、线程安全、并发数据结构和非阻塞I/O之类的问题。
Observable使用一系列便捷的操作符组合,让异步流的处理像处理数组集合一样简单。 减少回调嵌套,增强代码可读性,还不易出错。

可组合

Java中的Future可以用于单个的异步任务执行特别方便,但在多个异步任务嵌套时,由于异步任务在运行的等待时间会有所不同,用Future处理会变得比较复杂(因此容易出错)。另一种情况是过早的阻塞在Future.get()上,这样使异步执行的收益降低。而Observable就是为了处理异步数据流和序列而设计的。

灵活性

Observables不仅支持单个数据发射,还支持数据序列甚至是无限流的发射。Observabled的概念可以用于上述所有的场景。Observable在使用上与Iterable的非常相似,而且更加灵活性和优雅。
Observable是异步/推数据 “对偶” Iterable同步/拉数据

Iterable (pull) Observable (push)
取数据 T next() onNext(T)
发生异常 throws Exception onError(Exception)
结束 !hasNext() onCompleted()

多样化

ReactiveX并不局限于特定的并发或异步场景。 使用线程池,事件循环,非阻塞I/O,Actor模型(例如Akka框架)或任何适合您的需求,风格或专业知识都可以实现Observable。 无论是选择阻塞的还是非阻塞来实现,客户端代码都可以将Observables的所有交互视为异步的。
对于Observable的实现,你可能会有下面这些疑问。

public Observable<data> getData();

- 它是在调用者相同的线程上同步工作吗?
- 它是在调用者不同的线程上异步工作吗?
- 它是否将任务进行划分放到多个线程中执行,这些线程可能以任何顺序将数据返回给调用者?
- 它使用一个Actor(或多个Actor)代替线程池吗?
- 它是否使用带有事件循环的NIO(同步非阻塞的I/O模型)进行异步网络访问?
- 它是否使用事件循环将工作线程与回调线程分开?

从观察者的角度,并不需要过多的关注这些问题。
重要的是:有了ReactiveX,可以在完全不影响Observable程序库使用者的情况下,彻底的改变Observable的底层实现。

Callback引起的问题

回调解决了在Future.get()上过早的阻塞的问题,这种实现是非常有效的,因为回调时响应已经准备就绪了。
但是与Future一样,尽管回调单个异步任务执行中使用方便,但通过嵌套组合,代码会变得笨拙。

多种编程语言的实现

ReactiveX目前有多种语言实现,以保持这些语言的习惯用法,并且正在快速添加更多的语言。

响应式编程

ReactiveX提供了一系列过滤、选择、转换、组合和组合多个Observable操作,这些操作使得任务的执行和组合变得十分高效。使用Iterable,消费者从数据源获取数据,在数据获取完成之前会阻塞当前线程。相比之下,Observable在数据准备就绪时才会从数据源推向消费者。因为数据的传递既可以是同步的也可以是异步的,所以Observable表现得更加灵活。
示例代码:相似的高阶函数在Iterable和Observable上的应用。

Iterable Observable
```

getDataFromLocalMemory() .skip(10) .take(5) .map({ s -> return s + “ transformed” }) .forEach({ println “next => “ + it })

  1. |

getDataFromNetwork() .skip(10) .take(5) .map({ s -> return s + “ transformed” }) .subscribe({ println “onNext => “ + it }) ``` |

Observable添加了观察者模式两个缺失的语法:

  1. 没有可用数据时数据源通知Observer的能力。(onCompleted方法)
  2. 发生错误时数据源通Observer的能力。(onError方法)

在这些功能的加入后,ReactiveX使得Iterable和Observable两种模型表现一致,唯一的不同在于他们数据流动的方向。这种特性使得在Iterable上面的一系列操作同样可以用Oberservable实现。

参考资料

http://reactivex.io/intro.html