
import { Observable, asyncScheduler } from 'rxjs'; import { observeOn } from 'rxjs/operators'; var observable = new Observable((proxyObserver) => { proxyObserver.next(1); proxyObserver.next(2); proxyObserver.next(3); proxyObserver.complete(); }).pipe( observeOn(asyncScheduler) );1. 单播多播 监听2. 例子queUe = new Observable<any>();this.queUe = this.proAdd(body);this.queUe.subscribe(data => { // console.log(data); this.service.setSub(true); const msg = data.message; this.AddTit(msg, 0, sn);});proAdd(body): Observable<any> { // const date = new Date().getTime(); // console.log(date, '1--start postadd---'); const res: Observable<any> = this.service.postadd(body) as Observable<any>; res.pipe( catchError(err => { // console.log(new Date().getTime() - date, '3--error postadd--'); // this.controlRail('0', '停轨', 'error'); this.service.setSub(false); const msg = err.error.error.message; this.AddTit(msg, 1); return EMPTY; }), // .subscribe(data => console.log(new Date().getTime() - date, '2--end postadd--')); ); return res; }