思想重于形式。

RxJS 是JavaScript的一个响应式编程工具,可以将它比作 Lodash 之于数据流。响应式编程已经正式登陆了 JavaScript 的场景中。ECMAScript Observables 在第一阶段的草案中提议,RxJS 5+ 将会被作为一个标准典型来开发实现。

RxJS = Observables + Operators + Schedulers.

  • Reactive:在 Reactive 的理念中,我们定义的不是一次性赋值过程,而是可重复的赋值过程。
  • Callbacks, Events, Promise, Generators for Async Operations. 而异步处理的关键无非在于处理:「事件分发」和「流程控制」。事件分发使用 Observable 和流程控制则是使用 Streams 的思路。
  • Stream Based:Lodash 之于数据流。

Rx.js 本质上什么?用来解决什么问题?适用于什么样的场景?

The ReactiveX Observable model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays. It frees you from tangled webs of callbacks, and thereby makes your code more readable and less prone to bugs. Composable / Flexible / Less Opinionated.

Code that deals with more than one event or asynchronous computation gets complicated quickly as it needs to build a state-machine to deal with ordering issues. Next to this, the code needs to deal with successful and failure termination of each separate computation. This leads to code that doesn’t follow normal control-flow, is hard to understand and hard to maintain.

RxJS makes these computations first-class citizens. This provides a model that allows for readable and composable APIs to deal with these asynchronous computations.

Essences

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

single items multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

Reactive Programming

ReactiveX provides a collection of operators with which you can filter, select, transform, combine, and compose Observables. This allows for efficient execution and composition.

  1. // iterable
  2. getDataFromLocalMemory()
  3. .filter (s => s != null)
  4. .map(s => `${s} transformed`)
  5. .forEach(s => console.log(`next => ${s}`))
  6. // observable
  7. getDataFromNetwork()
  8. .filter (s => s != null)
  9. .map(s => `${s} transformed`)
  10. .subscribe(s => console.log(`next => ${s}`))
  11. // beauty of Reactive Programming
  12. Observable
  13. .fromEvent(emitter, 'success')
  14. .map(() => `${REQUEST_URL}?since=${Math.floor(Math.random() * 500)}`)
  15. .flatMap(url => Observable.fromPromise(fetch(url).then(f => f.json())))
  16. .map(listUsers => listUsers[0])
  17. .subscribe((user) => console.log(user.id));
  18. emitter.emit('success');

In order to receive notifications from an observable collection, you use the subscribe method of Observable to hand it an Observer object. In return for this observer, the subscribe method returns a Disposable object that acts as a handle for the subscription.

Creating Observables

Operators that originate new Observables.

  • Create — create an Observable from scratch by calling observer methods programmatically
  • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
  • Empty/Never/Throw — create Observables that have very precise and limited behavior
  • From — convert some other object or data structure into an Observable
  • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
  • Just — convert an object or a set of objects into an Observable that emits that or those objects
  • Range — create an Observable that emits a range of sequential integers
  • Repeat — create an Observable that emits a particular item or sequence of items repeatedly
  • Start — create an Observable that emits the return value of a function
  • Timer — create an Observable that emits a single item after a given delay

Creation Operators


Cold V.S. Hot

  • Cold observables start running upon subscription, i.e., the observable sequence only starts pushing values to the observers when Subscribe is called. Values are also not shared among subscribers.
  • When an observer subscribes to a hot observable sequence, it will get the current value in the stream. The hot observable sequence is shared among all subscribers, and each subscriber is pushed the next value in the sequence.

The publish operator provides a mechanism to share subscriptions by broadcasting a single subscription to multiple subscribers.


Timer

const timeA$ = Observable.interval(1000)
const timeB$ = timeA$.filter(num => {
    return (num % 2 != 0)
      && (num % 3 != 0)
      && (num % 5 != 0)
      && (num % 7 != 0)
  })

const timeC$ = timeB$.debounceTime(3000)
const timeD$ = timeC$.delay(2000)

A: 0  1  2  3  4  5  6  7  8  9  10 11 12 13 14 15 16 17 18 19 20 21
B:    1                             11    13          17    19
C:          1                                   13                19
D:                1                                   13

Event

RxJS provides factory methods for you to bridge with existing asynchronous sources in the DOM or Node.js so that you can employ the rich composing, filtering and resource management features provided by RxJS on any kind of data streams.

const event$ = Observable.fromEvent(emitter, 'ready');
// event$.subscribe(successLog);
// emitter.emit('ready', 'nice job!');

const eventPattern$ = Observable.fromEventPattern(
  (handler) => { emitter.on('ready', handler); },
  (handler) => { emitter.removeListener('ready', handler); }

Callbacks & Promises

const timeout = Observable.bindCallback(setTimeout);
const writeFile = Observable.bindNodeCallback(require('fs').writeFileSync);

const timeout$ = timeout(2000);
const file$ = writeFile('path/to/file');

const promise$ = Observable.fromPromise(Promise.resolve(400));
promise$.subscribe(successLog);

Write code in a straight forward manner and can yield not only Observable sequences, but also Promises, Callbacks, Arrays, etc. This allows you to write your code in a very imperative manner without all the callbacks.

var Rx = require('rx');
var request = require('request');
var get = Rx.Observable.fromNodeCallback(request);

Rx.Observable.spawn(function* () {
  var data;
  try {
    data = yield get('http://bing.com').timeout(5000 /*ms*/);
  } catch (e) {
    console.log('Error %s', e);
  } 

  console.log(data);
}).subscribe();

// what about this?
var source = Rx.Observable.of(1,2,3)
  .flatMap(
    (x, i) => function* () { yield x; yield i; }(),
    (x, y, i1, i2) => x + y + i1 + i2
  );

var subscription = source.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e),
  () => console.log('onCompleted'));

// => Next: 2
// => Next: 2
// => Next: 5
// => Next: 5
// => Next: 8
// => Next: 8
// => Completed

Transforming Observables

Operators that transform items that are emitted by an Observable.

  • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
  • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
  • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
  • Map — transform the items emitted by an Observable by applying a function to each item
  • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

Filtering Observables

Operators that selectively emit items from a source Observable.

  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
  • Distinct — suppress duplicate items emitted by an Observable
  • ElementAt — emit only item n emitted by an Observable
  • Filter — emit only those items from an Observable that pass a predicate test
  • First — emit only the first item, or the first item that meets a condition, from an Observable
  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
  • Last — emit only the last item emitted by an Observable
  • Sample — emit the most recent item emitted by an Observable within periodic time intervals
  • Skip — suppress the first n items emitted by an Observable
  • SkipLast — suppress the last n items emitted by an Observable
  • Take — emit only the first n items emitted by an Observable
  • TakeLast — emit only the last n items emitted by an Observable

Filtering Operators

Combine Observables

Operators that work with multiple source Observables to create a single Observable

  • And/Then/When — combine sets of items emitted by two or more Observables by means of Patternand Plan intermediaries
  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
  • Merge — combine multiple Observables into one by merging their emissions
  • StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
  • Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
  • Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Combination Operators

Error Handling Operators

Operators that help to recover from error notifications from an Observable

  • Catch — recover from an onError notification by continuing the sequence without error
  • Retry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error

Error Handling Operators

Observable Utility Operators

A toolbox of useful Operators for working with Observables

  • Delay — shift the emissions from an Observable forward in time by a particular amount
  • Do — register an action to take upon a variety of Observable lifecycle events
  • Materialize/Dematerialize — represent both the items emitted and the notifications sent as emitted items, or reverse this process
  • ObserveOn — specify the scheduler on which an observer will observe this Observable
  • Serialize — force an Observable to make serialized calls and to be well-behaved
  • Subscribe — operate upon the emissions and notifications from an Observable
  • SubscribeOn — specify the scheduler an Observable should use when it is subscribed to
  • TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
  • Timeout — mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
  • Timestamp — attach a timestamp to each item emitted by an Observable
  • Using — create a disposable resource that has the same lifespan as the Observable

Utility Operators

Conditional and Boolean Operators

Operators that evaluate one or more Observables or items emitted by Observables

  • All — determine whether all items emitted by an Observable meet some criteria
  • Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
  • Contains — determine whether an Observable emits a particular item or not
  • DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
  • SequenceEqual — determine whether two Observables emit the same sequence of items
  • SkipUntil — discard items emitted by an Observable until a second Observable emits an item
  • SkipWhile — discard items emitted by an Observable until a specified condition becomes false
  • TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
  • TakeWhile — discard items emitted by an Observable after a specified condition becomes false

Conditional and Boolean Operators

Mathematical and Aggregate Operators

Operators that operate on the entire sequence of items emitted by an Observable

  • Average — calculates the average of numbers emitted by an Observable and emits this average
  • Concat — emit the emissions from two or more Observables without interleaving them
  • Count — count the number of items emitted by the source Observable and emit only this value
  • Max — determine, and emit, the maximum-valued item emitted by an Observable
  • Min — determine, and emit, the minimum-valued item emitted by an Observable
  • Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
  • Sum — calculate the sum of numbers emitted by an Observable and emit this sum

First-class nature of observable sequences as IObservable objects, in which generic LINQ operators are supplied by the Rx assemblies to manipulate these objects. Most operators take an observable sequence and perform some logic on it and output another observable sequence.


Mathematical and Aggregate Operators


Multicasting Operators

Examples

Back-pressure

Lossy Back-pressure: debounce, throttle, sample, pause …

var debounced = Rx.Observable.fromEvent(input, 'keyup')
  .map(e => e.target.value)
  .debounce(500 /* ms */);
var throttled = Rx.Observable.fromEvent(window, 'resize')
  .throttleFirst(250 /* ms */);
var sampled = getStockData()
  .sample(5000 /* ms */);


var pausable = getSomeObservableSource().pausable();
pausable.subscribeOnNext(data => console.log('Data: %o', data));
pausable.pause();
// Resume in five seconds
setTimeout(() => pausable.resume(), 5000);

Lossless back-pressure: Buffers & Windows, Pausable buffer, Controlled observables …

var keys = Rx.Observable.fromEvent(document, 'keyup')
  .map(e => e.keyCode)
  .bufferWithCount(10, 1)
  .filter(isKonamiCode)
  .subscribeOnNext(() => console.log('KONAMI!'));

var source = getStockData()
  .bufferWithTime(5000, 1000) // time in milliseconds
  .subscribeOnNext(data => data.forEach(d => console.log('Stock: %o', d)));

var source = getStockData()
  .bufferWithTimeOrCount(5000 /* ms */, 100 /* items */)
  .subscribeOnNext(data => data.forEach(d => console.log('Stock: %o', d)));

var source = getStockData()
  .pausableBuffered();
source.pause();

var source = getStockData()
  .controlled();
source.subscribeOnNext(stock => console.log('Stock data: %o', stock));
source.request(2);
// Keep getting more after 5 seconds
setInterval(() => source.request(2), 5000);

Implement Operators

You can extend RxJS by adding new operators for operations that are not provided by the base library, or by creating your own implementation of standard query operators to improve readability and performance. Writing a customized version of a standard operator is useful when you want to operate with in-memory objects and when the intended customization does not require a comprehensive view of the query.

// implement _.where to Observable as Operator
Rx.Observable.prototype.filterByProperties = properties => {
    var source = this,
        comparer = Rx.internals.isEqual;

    return Rx.Observable.create(observer => {
        // Our disposable is the subscription from the parent
        return source.subscribe(
            data => {

                try {
                    var shouldRun = true;

                    // Iterate the properties for deep equality
                    for (var prop in properties) {
                        if (!comparer(properties[prop], data[prop])) {
                            shouldRun = false;
                            break;
                        }
                    }
                } catch (e) {
                    observer.onError(e);
                }

                if (shouldRun) {
                    observer.onNext(data);
                }
            },
            observer.onError.bind(observer),
            observer.onCompleted.bind(observer)
        );
    });
};

Operators by Categories

Single

A Single is something like an Observable, but instead of emitting a series of values — anywhere from none at all to an infinite number — it always either emits one value or an error notification.

Subject

The Subject class inherits both Observable and Observer, in the sense that it is both an observer and an observable.

You can use subjects to implement a custom observable with caching, buffering and time shifting. In addition, you can use subjects to broadcast data to multiple subscribers.

const $subject = new Subject();
const subscription = $subject.subscribe(successLog, warnLog, completeLog);
// $subject.next('23');
// $subject.complete('OK!');

const $sub = new Subject();
$sub.subscribe(successLog, warnLog, completeLog);

// $sub.next('hi');
// $sub.next('o hi yo!');

$sub.subscribe(successLog, warnLog, completeLog);

// $sub.next('miao!')
// $sub.complete('haha!');

const $replaySubject = new ReplaySubject();

const $replay1 = $replaySubject.subscribe(successLog);
// $replaySubject.next('hi!');
const $replay2 = $replaySubject.subscribe(successLog);
// $replaySubject.next('hello!');

Publish Subject are normal subjects.

ReplaySubject stores all the values that it has published. Therefore, when you subscribe to it, you automatically receive an entire history of values that it has published, even though your subscription might have come in after certain values have been pushed out.

BehaviourSubject is similar to ReplaySubject, except that it only stores the last value it published. BehaviourSubject also requires a default value upon initialization. This value is sent to observers when no other value has been received by the subject yet. This means that all subscribers will receive a value instantly on subscribe, unless the Subject has already completed.

AsyncSubject is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the sequence is completed. You can use the AsyncSubject type for situations when the source observable is hot and might complete before any observer can subscribe to it. In this case, AsyncSubject can still provide the last value and publish it to any future subscribers.

Scheduler

Some ReactiveX Observable operators have variants that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.

Whenever an Rx method needs to generate a notification, it schedules the work on a scheduler. By supplying a scheduler to the Rx method instead of using the default, you can subtly control how those notifications are sent out.

A scheduler controls when a subscription starts and when notifications are published. It consists of three components. It is first a data structure. When you schedule for tasks to be completed, they are put into the scheduler for queueing based on priority or other criteria. It also offers an execution context which denotes where the task is executed (e.g., in the immediately, current thread, or in another callback mechanism such as setTimeout or process.nextTick). Lastly, it has a clock which provides a notion of time for itself (by accessing the now method of a scheduler). Tasks being scheduled on a particular scheduler will adhere to the time denoted by that clock only.

Name Type Attribute Description
queue Scheduler Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations.
asap Scheduler Schedules on the micro task queue, which uses the fastest transport mechanism available, either Node.js’ process.nextTick() or Web Worker MessageChannel or setTimeout or others. Use this for asynchronous conversions.
async Scheduler Schedules work with setInterval. Use this for time-based operations.
animationFrame Scheduler Schedules work with requestAnimationFrame. Use this for synchronizing with the platform’s painting
Rx.Observable.generate(
    0,
    () => true,
    x => x + 1,
    x => x,
    Rx.Scheduler.default)
    .subscribe(...);

seq.groupBy(...)
  .map(x => x.observeOn(Rx.Scheduler.default))
  .map(x => expensive(x))  // perform operations that are expensive on resources

Let’s compare single-thread scheduler with multi-thread one like Java.

Scheduler purpose
Schedulers.computation( )) meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors
Schedulers.from(executor)) uses the specified Executor as a Scheduler
Schedulers.immediate( )) schedules work to begin immediately in the current thread
Schedulers.io( )) meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching
Schedulers.newThread( )) creates a new thread for each unit of work
Schedulers.trampoline( )) queues work to begin on the current thread after any already-queued work

Rx in Practice

Debug

  • use do
var seq1 = Rx.Observable.interval(1000)
   // do operator
   .do(console.log.bind(console))
   .bufferWithCount(5)
   .do(x => console.log('buffer is full'))
   .subscribe(x => console.log('Sum of the buffer is ' + x.reduce((acc, x) => acc + x, 0)));
  • use TestScheduler

Implementation and Thoughts

Think how we can use such tech and concepts in our related works?

Learn how to implement the core conceptions.


Ref

Quick Read: