Observable 结构
// 伪代码

  1. class 可观察类 {
  2. constructor(数据源: subscriber: 订阅者) => void) {}
  3. 管道(operators: 操作符[]): 可观察类
  4. 订阅(observer: 观察者): 订阅对象
  5. }
  1. class Observable {
  2. constructor(source: (subscriber: Subscriber) => void) {}
  3. pipe(operators: Operator[]): Observable
  4. subscribe(observer: Observer): Subscription
  5. }

非官方解释


Observable: 可观察类,逻辑实现的主类
Subscriber: 订阅者,数据来源,也可以说是数据发送者
Operator: 操作符,遵循相同的接口,对数据进行操作的一系列工具函数
Observer:观察者,接收数据的函数
Subscription:可以让观察者不再接收数据的工具类

同 Promise 类似,上述一切功能的实现,其本质是回调函数。只不过 rx 将这些回调函数起了名字,并且对这些函数进行了特定的组织与调用。
Rxjs 的约定

  • 一个可观察对象,有可能会发送值,也可以什么值也不发送
  • 当一个可观察对象发生错误后,不再发送数据
  • 当一个可观察对象完成后,不再发送数据

官方翻译
单播?
每一次订阅即产生一个全新的逻辑链路,可以看成是 Observale
多播?
多个订阅共享一个逻辑链路,可以看成是 Subject

最简单实现

  1. interface Subscriber {
  2. next(value: any): void
  3. }
  4. interface Observer {
  5. next(value: any): void
  6. }
  7. class Observable {
  8. constructor(private source: (subscriber: Subscriber) => void) {
  9. }
  10. subscribe(observer: Observer) {
  11. this.source({
  12. next(value: any) {
  13. observer.next(value)
  14. }
  15. })
  16. }
  17. }
  18. const obs = new Observable(subscriber => {
  19. debugger
  20. subscriber.next(1)
  21. subscriber.next(2)
  22. })
  23. obs.subscribe({
  24. next(value: any) {
  25. console.log(value)
  26. }
  27. })
  28. obs.subscribe({
  29. next(value: any) {
  30. console.log(value)
  31. }
  32. })

手动实现分享数据源

  1. const subject = new Subject()
  2. interval().subscribe(value => {
  3. subject.next(value)
  4. })
  5. subject.subscribe(value => {
  6. console.log(value)
  7. })
  8. subject.subscribe(value => {
  9. console.log(value)
  10. })