在 Observable 对象内部可以多次调用 next 方法向外发送数据

  1. const observable = new Observable(function (observer) {
  2. let index = 0
  3. setInterval(function () {
  4. observer.next(index++)
  5. }, 1000)
  6. })
  7. const observer = {
  8. next: function (value) {
  9. console.log(value)
  10. }
  11. }
  12. observable.subscribe(observer)

当所有数据发送完成以后,可以调用 complete 方法终止数据发送

  1. const observable = new Observable(function (observer) {
  2. let index = 0
  3. let timer = setInterval(function () {
  4. observer.next(index++)
  5. if (index === 3) {
  6. observer.complete()
  7. clearInterval(timer)
  8. }
  9. }, 1000)
  10. })
  11. const observer = {
  12. next: function (value) {
  13. console.log(value)
  14. },
  15. complete: function () {
  16. console.log("数据发送完成")
  17. }
  18. }
  19. observable.subscribe(observer)

当 Observable 内部逻辑发生错误时,可以调用 error 方法将失败信息发送给订阅者,Observable 终止

  1. import { Observable } from "rxjs"
  2. const observable = new Observable(function (observer) {
  3. let index = 0
  4. let timer = setInterval(function () {
  5. observer.next(index++)
  6. if (index === 3) {
  7. observer.error("发生错误")
  8. clearInterval(timer)
  9. }
  10. }, 1000)
  11. })
  12. const observer = {
  13. next: function (value) {
  14. console.log(value)
  15. },
  16. error: function (error) {
  17. console.log(error)
  18. }
  19. }
  20. observable.subscribe(observer)

可观察对象是惰性的,只有被订阅后才会执行,只有订阅它,它才执行

  1. const observable = new Observable(function () {
  2. console.log("Hello RxJS")
  3. })
  4. // 不订阅它不会执行
  5. // observable.subscribe()

可观察对象可以有 n 多订阅者,每次被订阅时都会得到执行

  1. const observable = new Observable(function () {
  2. console.log("Hello RxJS")
  3. })
  4. observable.subscribe()
  5. observable.subscribe()
  6. observable.subscribe()
  7. observable.subscribe()
  8. observable.subscribe()

64.png

取消订阅

  1. import { interval } from "rxjs"
  2. const obs = interval(1000)
  3. const subscription = obs.subscribe(console.log)
  4. setTimeout(function () {
  5. subscription.unsubscribe()
  6. }, 2000)