Observable 结构
// 伪代码
class 可观察类 {
constructor(数据源: (subscriber: 订阅者) => void) {}
管道(operators: 操作符[]): 可观察类
订阅(observer: 观察者): 订阅对象
}
class Observable {
constructor(source: (subscriber: Subscriber) => void) {}
pipe(operators: Operator[]): Observable
subscribe(observer: Observer): Subscription
}
非官方解释
Observable: 可观察类,逻辑实现的主类
Subscriber: 订阅者,数据来源,也可以说是数据发送者
Operator: 操作符,遵循相同的接口,对数据进行操作的一系列工具函数
Observer:观察者,接收数据的函数
Subscription:可以让观察者不再接收数据的工具类
同 Promise 类似,上述一切功能的实现,其本质是回调函数。只不过 rx 将这些回调函数起了名字,并且对这些函数进行了特定的组织与调用。
Rxjs 的约定
- 一个可观察对象,有可能会发送值,也可以什么值也不发送
- 当一个可观察对象发生错误后,不再发送数据
- 当一个可观察对象完成后,不再发送数据
官方翻译
单播?
每一次订阅即产生一个全新的逻辑链路,可以看成是 Observale
多播?
多个订阅共享一个逻辑链路,可以看成是 Subject
最简单实现
interface Subscriber {
next(value: any): void
}
interface Observer {
next(value: any): void
}
class Observable {
constructor(private source: (subscriber: Subscriber) => void) {
}
subscribe(observer: Observer) {
this.source({
next(value: any) {
observer.next(value)
}
})
}
}
const obs = new Observable(subscriber => {
debugger
subscriber.next(1)
subscriber.next(2)
})
obs.subscribe({
next(value: any) {
console.log(value)
}
})
obs.subscribe({
next(value: any) {
console.log(value)
}
})
手动实现分享数据源
const subject = new Subject()
interval().subscribe(value => {
subject.next(value)
})
subject.subscribe(value => {
console.log(value)
})
subject.subscribe(value => {
console.log(value)
})