Observable 结构
// 伪代码
class 可观察类 {constructor(数据源: (subscriber: 订阅者) => void) {}管道(operators: 操作符[]): 可观察类订阅(observer: 观察者): 订阅对象}
class Observable {constructor(source: (subscriber: Subscriber) => void) {}pipe(operators: Operator[]): Observablesubscribe(observer: Observer): Subscription}
非官方解释
Observable: 可观察类,逻辑实现的主类
Subscriber: 订阅者,数据来源,也可以说是数据发送者
Operator: 操作符,遵循相同的接口,对数据进行操作的一系列工具函数
Observer:观察者,接收数据的函数
Subscription:可以让观察者不再接收数据的工具类
同 Promise 类似,上述一切功能的实现,其本质是回调函数。只不过 rx 将这些回调函数起了名字,并且对这些函数进行了特定的组织与调用。
Rxjs 的约定
- 一个可观察对象,有可能会发送值,也可以什么值也不发送
- 当一个可观察对象发生错误后,不再发送数据
- 当一个可观察对象完成后,不再发送数据
官方翻译
单播?
每一次订阅即产生一个全新的逻辑链路,可以看成是 Observale
多播?
多个订阅共享一个逻辑链路,可以看成是 Subject
最简单实现
interface Subscriber {next(value: any): void}interface Observer {next(value: any): void}class Observable {constructor(private source: (subscriber: Subscriber) => void) {}subscribe(observer: Observer) {this.source({next(value: any) {observer.next(value)}})}}const obs = new Observable(subscriber => {debuggersubscriber.next(1)subscriber.next(2)})obs.subscribe({next(value: any) {console.log(value)}})obs.subscribe({next(value: any) {console.log(value)}})
手动实现分享数据源
const subject = new Subject()interval().subscribe(value => {subject.next(value)})subject.subscribe(value => {console.log(value)})subject.subscribe(value => {console.log(value)})
