基础

转换成 observables

  1. // 来自一个或多个值
  2. Rx.Observable.of('foo', 'bar');
  3. // 来自数组
  4. Rx.Observable.from([1,2,3]);
  5. // 来自事件
  6. Rx.Observable.fromEvent(document.querySelector('button'), 'click');
  7. // 来自 Promise
  8. Rx.Observable.fromPromise(fetch('/users'));
  9. // 来自回调函数(最后一个参数得是回调函数,比如下面的 cb)
  10. // fs.exists = (path, cb(exists))
  11. var exists = Rx.Observable.bindCallback(fs.exists);
  12. exists('file.txt').subscribe(exists => console.log('Does file exist?', exists));
  13. // 来自回调函数(最后一个参数得是回调函数,比如下面的 cb)
  14. // fs.rename = (pathA, pathB, cb(err, result))
  15. var rename = Rx.Observable.bindNodeCallback(fs.rename);
  16. rename('file.txt', 'else.txt').subscribe(() => console.log('Renamed!'));

创建 observables

在外部产生新事件。

  1. var myObservable = new Rx.Subject();
  2. myObservable.subscribe(value => console.log(value));
  3. myObservable.next('foo');

在内部产生新事件。

  1. var myObservable = Rx.Observable.create(observer => {
  2. observer.next('foo');
  3. setTimeout(() => observer.next('bar'), 1000);
  4. });
  5. myObservable.subscribe(value => console.log(value));

选择哪种方式需要根据场景。当你想要包装随时间推移产生值的功能时,普通的 Observable 就已经很好了。使用 Subject,你可以从任何地方触发新事件,并且将已存在的 observables 和它进行连接。

控制流动

  1. // 输入 "hello world"
  2. var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');
  3. // 过滤掉小于3个字符长度的目标值
  4. input.filter(event => event.target.value.length > 2)
  5. .map(event => event.target.value)
  6. .subscribe(value => console.log(value)); // "hel"
  7. // 延迟事件
  8. input.delay(200)
  9. .map(event => event.target.value)
  10. .subscribe(value => console.log(value)); // "h" -200ms-> "e" -200ms-> "l" ...
  11. // 每200ms只能通过一个事件
  12. input.throttleTime(200)
  13. .map(event => event.target.value)
  14. .subscribe(value => console.log(value)); // "h" -200ms-> "w"
  15. // 停止输入后200ms方能通过最新的那个事件
  16. input.debounceTime(200)
  17. .map(event => event.target.value)
  18. .subscribe(value => console.log(value)); // "o" -200ms-> "d"
  19. // 在3次事件后停止事件流
  20. input.take(3)
  21. .map(event => event.target.value)
  22. .subscribe(value => console.log(value)); // "hel"
  23. // 直到其他 observable 触发事件才停止事件流
  24. var stopStream = Rx.Observable.fromEvent(document.querySelector('button'), 'click');
  25. input.takeUntil(stopStream)
  26. .map(event => event.target.value)
  27. .subscribe(value => console.log(value)); // "hello" (点击才能看到)

产生值

  1. // 输入 "hello world"
  2. var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');
  3. // 传递一个新的值
  4. input.map(event => event.target.value)
  5. .subscribe(value => console.log(value)); // "h"
  6. // 通过提取属性传递一个新的值
  7. input.pluck('target', 'value')
  8. .subscribe(value => console.log(value)); // "h"
  9. // 传递之前的两个值
  10. input.pluck('target', 'value').pairwise()
  11. .subscribe(value => console.log(value)); // ["h", "he"]
  12. // 只会通过唯一的值
  13. input.pluck('data').distinct()
  14. .subscribe(value => console.log(value)); // "helo wrd"
  15. // 不会传递重复的值
  16. input.pluck('data').distinctUntilChanged()
  17. .subscribe(value => console.log(value)); // "helo world"