LQ20211217114038.jpg

    1. import { Observable, asyncScheduler } from 'rxjs';
    2. import { observeOn } from 'rxjs/operators';
    3. var observable = new Observable((proxyObserver) => {
    4. proxyObserver.next(1);
    5. proxyObserver.next(2);
    6. proxyObserver.next(3);
    7. proxyObserver.complete();
    8. }).pipe(
    9. observeOn(asyncScheduler)
    10. );
    11. 1. 单播多播 监听
    12. 2. 例子
    13. queUe = new Observable<any>();
    14. this.queUe = this.proAdd(body);
    15. this.queUe.subscribe(data => {
    16. // console.log(data);
    17. this.service.setSub(true);
    18. const msg = data.message;
    19. this.AddTit(msg, 0, sn);
    20. });
    21. proAdd(body): Observable<any> {
    22. // const date = new Date().getTime();
    23. // console.log(date, '1--start postadd---');
    24. const res: Observable<any> = this.service.postadd(body) as Observable<any>;
    25. res.pipe(
    26. catchError(err => {
    27. // console.log(new Date().getTime() - date, '3--error postadd--');
    28. // this.controlRail('0', '停轨', 'error');
    29. this.service.setSub(false);
    30. const msg = err.error.error.message;
    31. this.AddTit(msg, 1);
    32. return EMPTY;
    33. }),
    34. // .subscribe(data => console.log(new Date().getTime() - date, '2--end postadd--'));
    35. );
    36. return res;
    37. }