本文为作者原创,转载请注明出处:https://www.cnblogs.com/zwvista/p/9324298.html


操作符(Operators)
Creating Observables
Create / Generate
Defer
Empty / Never / Throw
From / ToObservable / AsObservable
Interval
Just / Return / Of
Range
Repeat
Start / FromCallable / ToAsync
Timer

操作符(Operators)

Rx 的操作符能够操作(创建/转换/组合) Observable。

Creating Observables

本文主题为创建/生成 Observable 的操作符。
这里的 Observable 实质上是可观察的数据流。
RxJava操作符(一)Creating Observables
Creation Operators


Create / Generate

ReactiveX - Create operator
Create 通过调用 Observer 的 3 个方法 OnNext, OnError, OnComplete 来创建数据流。
Generate 通过模拟 for 循环来生成数据流。
Generate 带有如下参数

  1. 初始状态
  2. 终止条件
  3. 一个能根据当前状态产生下一个状态的函数
  4. 一个能把状态转换为输出结果的函数
  5. (可选)一个指定状态产生时间的函数

ReactiveX 学习笔记(2)创建数据流 - 图1
ReactiveX 学习笔记(2)创建数据流 - 图2

  • RxNET

    1. var ob = Observable.Create<string>(observer =>
    2. {
    3. var timer = new Timer();
    4. timer.Interval = 1000;
    5. timer.Elapsed += (s, e) => observer.OnNext("tick");
    6. timer.Elapsed += (s, e) => Console.WriteLine(e.SignalTime);
    7. timer.Start();
    8. return timer;
    9. });
    10. using (ob.Subscribe(Console.WriteLine))
    11. Console.ReadLine();
    12. /*
    13. tick
    14. 2018/07/18 10:32:09
    15. tick
    16. 2018/07/18 10:32:10
    17. ...
    18. */
    1. var source = Observable.Generate(0, i => i < 10, i => ++i, i => i * i);
    2. var subscription = source.Subscribe(
    3. i => Console.WriteLine("OnNext({0})", i),
    4. ex => Console.WriteLine("OnError({0})", ex.Message),
    5. () => Console.WriteLine("Completed()"));
    6. /*
    7. OnNext(0)
    8. OnNext(1)
    9. OnNext(4)
    10. OnNext(9)
    11. OnNext(16)
    12. OnNext(25)
    13. OnNext(36)
    14. OnNext(49)
    15. OnNext(64)
    16. OnNext(81)
    17. Completed()
    18. */
    1. var source = Observable.Generate(
    2. 0,
    3. i => i < 10,
    4. i => ++i,
    5. i => i * i,
    6. i => TimeSpan.FromMilliseconds(i * 100));
    7. var subscription = source.Subscribe(
    8. i => Console.WriteLine("OnNext({0})", i),
    9. ex => Console.WriteLine("OnError({0})", ex.Message),
    10. () => Console.WriteLine("Completed()"));
    11. Console.ReadLine();
    12. /*
    13. OnNext(0)
    14. OnNext(1)
    15. OnNext(4)
    16. OnNext(9)
    17. OnNext(16)
    18. OnNext(25)
    19. OnNext(36)
    20. OnNext(49)
    21. OnNext(64)
    22. OnNext(81)
    23. Completed()
    24. */

    Defer

    ReactiveX - Defer operator
    Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
    Defer 创建数据流,但只有在有观察者来订阅时才创建数据流,而且对于每个观察者来说都创建一个新的数据流。
    ReactiveX 学习笔记(2)创建数据流 - 图3

  • RxNET ```csharp var source = Observable.Defer(() => { Console.WriteLine(“# Defer method called.”); var s = new ReplaySubject(); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnCompleted(); return s.AsObservable(); }); var subscription1 = source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”)); var subscription2 = source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”));

/*

Defer method called.

OnNext(1) OnNext(2) OnNext(3) Completed()

Defer method called.

OnNext(1) OnNext(2) OnNext(3) Completed() */

  1. <a name="My1ON"></a>
  2. ### <br />![](https://cdn.nlark.com/yuque/0/2021/png/12898916/1625282092537-2e9441d8-57a5-496b-ac9a-673c6cfe6080.png#clientId=ub4212919-4d24-4&from=paste&id=u2557971c&margin=%5Bobject%20Object%5D&originHeight=370&originWidth=1280&originalType=url&ratio=1&status=done&style=none&taskId=u7c071781-ba69-404a-9801-dd54cd37a25)<br />![](https://cdn.nlark.com/yuque/0/2021/png/12898916/1625282094114-d2f0b00e-2b00-45a9-8e29-54f8fd6b2b71.png#clientId=ub4212919-4d24-4&from=paste&id=uc9a4880d&margin=%5Bobject%20Object%5D&originHeight=380&originWidth=1280&originalType=url&ratio=1&status=done&style=none&taskId=u84da7b83-ede3-4440-b284-7a38aad3e8b)
  3. <a name="aERUN"></a>
  4. ### From / ToObservable / AsObservable
  5. [ReactiveX - From operator](http://reactivex.io/documentation/operators/from.html)<br />[Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」](https://blog.okazuki.jp/entry/20111109/1320849106)<br />From / ToObservable / AsObservable 把其他对象和数据类型转换成数据流。<br />![](https://cdn.nlark.com/yuque/0/2021/png/12898916/1625282094143-5c042374-7db5-44d2-b024-83e89a6c5eb1.png#clientId=ub4212919-4d24-4&from=paste&id=u760bcb61&margin=%5Bobject%20Object%5D&originHeight=630&originWidth=1280&originalType=url&ratio=1&status=done&style=none&taskId=u353d51bd-57d9-4420-9aeb-45a677bda4d)<br />![](https://cdn.nlark.com/yuque/0/2021/png/12898916/1625282094388-9dbd87a8-67d4-4e00-83da-29da4e090750.png#clientId=ub4212919-4d24-4&from=paste&id=ub1b37def&margin=%5Bobject%20Object%5D&originHeight=620&originWidth=1280&originalType=url&ratio=1&status=done&style=none&taskId=u66b30dda-535a-4356-a1a1-57a4a2ec1c4)
  6. - RxNET
  7. 将 **IEnumerable<T>** 转换成数据流。
  8. ```csharp
  9. var values = new List<string> { "Rx", "is", "easy" };
  10. values.ToObservable().Subscribe(
  11. Console.WriteLine,
  12. () => Console.WriteLine("Completed"));
  13. /*
  14. Rx
  15. is
  16. easy
  17. Completed
  18. */

Task 转换成数据流。

  1. var t = Task.Factory.StartNew(() => "Test");
  2. var source = t.ToObservable();
  3. source.Subscribe(
  4. Console.WriteLine,
  5. () => Console.WriteLine("completed"));
  6. /*
  7. Test
  8. Completed
  9. */
  1. public class LetterRepo
  2. {
  3. private readonly ReplaySubject<string> _letters;
  4. public LetterRepo()
  5. {
  6. _letters = new ReplaySubject<string>();
  7. _letters.OnNext("A");
  8. _letters.OnNext("B");
  9. _letters.OnNext("C");
  10. }
  11. public IObservable<string> GetLetters()
  12. {
  13. return _letters.AsObservable();
  14. }
  15. }
  16. var repo = new LetterRepo();
  17. var good = repo.GetLetters();
  18. var evil = repo.GetLetters();
  19. good.Subscribe(
  20. Console.WriteLine);
  21. //Be naughty
  22. var asSubject = evil as ISubject<string>;
  23. if (asSubject != null)
  24. {
  25. //So naughty, 1 is not a letter!
  26. asSubject.OnNext("1");
  27. }
  28. else
  29. {
  30. Console.WriteLine("could not sabotage");
  31. }
  32. /*
  33. A
  34. B
  35. C
  36. could not sabotage
  37. */

ReactiveX 学习笔记(2)创建数据流 - 图4

  1. var eventSource = new EventSource();
  2. var source = Observable.FromEvent<EventHandler, EventArgs>(
  3. h => (s, e) => h(e),
  4. h =>
  5. {
  6. Console.WriteLine("add handler");
  7. eventSource.Raised += h;
  8. },
  9. h =>
  10. {
  11. Console.WriteLine("remove handler");
  12. eventSource.Raised -= h;
  13. });
  14. var subscription1 = source.Subscribe(
  15. i => Console.WriteLine("1##OnNext({0})", i),
  16. ex => Console.WriteLine("1##OnError({0})", ex.Message),
  17. () => Console.WriteLine("1##Completed()"));
  18. var subscription2 = source.Subscribe(
  19. i => Console.WriteLine("2##OnNext({0})", i),
  20. ex => Console.WriteLine("2##OnError({0})", ex.Message),
  21. () => Console.WriteLine("2##Completed()"));
  22. eventSource.OnRaised();
  23. eventSource.OnRaised();
  24. Console.WriteLine("dispose method call.");
  25. subscription1.Dispose();
  26. subscription2.Dispose();
  27. /*
  28. add handler
  29. 1##OnNext(System.EventArgs)
  30. 2##OnNext(System.EventArgs)
  31. 1##OnNext(System.EventArgs)
  32. 2##OnNext(System.EventArgs)
  33. dispose method call.
  34. remove handler
  35. */
  1. class EventSource
  2. {
  3. public event EventHandler Raised;
  4. public void OnRaised()
  5. {
  6. var h = this.Raised;
  7. if (h != null)
  8. {
  9. h(this, EventArgs.Empty);
  10. }
  11. }
  12. }
  13. Func<int, int, int> asyncProcess = (x, y) =>
  14. {
  15. Console.WriteLine("process start.");
  16. Thread.Sleep(2000);
  17. Console.WriteLine("process end.");
  18. return x + y;
  19. };
  20. var source = Observable.FromAsync(() => Task.FromResult(asyncProcess(10, 2)));
  21. Console.WriteLine("subscribe1");
  22. var subscription1 = source.Subscribe(
  23. i => Console.WriteLine("1##OnNext({0})", i),
  24. ex => Console.WriteLine("1##OnError({0})", ex.Message),
  25. () => Console.WriteLine("1##Completed()"));
  26. Console.WriteLine("sleep 5sec");
  27. Thread.Sleep(5000);
  28. Console.WriteLine("dispose method call.");
  29. subscription1.Dispose();
  30. Console.WriteLine("subscribe2");
  31. var subscription2 = source.Subscribe(
  32. i => Console.WriteLine("2##OnNext({0})", i),
  33. ex => Console.WriteLine("2##OnError({0})", ex.Message),
  34. () => Console.WriteLine("2##Completed()"));
  35. Console.WriteLine("dispose method call.");
  36. subscription2.Dispose();
  37. /*
  38. subscribe1
  39. process start.
  40. process end.
  41. 1##OnNext(12)
  42. 1##Completed()
  43. sleep 5sec
  44. dispose method call.
  45. subscribe2
  46. process start.
  47. process end.
  48. 2##OnNext(12)
  49. 2##Completed()
  50. dispose method call.
  51. */

Interval

ReactiveX - Interval operator
Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
Interval 创建一个数据流:每隔指定时间就发送一个值,只有在取消订阅时才会结束。
ReactiveX 学习笔记(2)创建数据流 - 图5

  • RxNET ```csharp var interval = Observable.Interval(TimeSpan.FromMilliseconds(250)); using (interval.Subscribe( Console.WriteLine, () => Console.WriteLine(“completed”))) Console.ReadLine();

/ 0 1 2 … /

  1. <a name="b640F"></a>
  2. ### Just / Return / Of
  3. [ReactiveX - Just operator](http://reactivex.io/documentation/operators/just.html)<br />[Reactive Extensions再入門 その3「IObservableのファクトリメソッド」](https://blog.okazuki.jp/entry/20111104/1320409976)<br />Just/Of 创建一个数据流:发送一个或多个值然后结束。<br />Return 创建一个数据流:发送一个值然后结束。<br />![](https://cdn.nlark.com/yuque/0/2021/png/12898916/1625282096441-14903e64-2dc5-4109-9eb2-43ce3d3d7325.png#clientId=ub4212919-4d24-4&from=paste&id=uef7abbe5&margin=%5Bobject%20Object%5D&originHeight=620&originWidth=1280&originalType=url&ratio=1&status=done&style=none&taskId=u627657ac-847c-42d7-9762-882f0b265f7)
  4. - RxNET
  5. ```csharp
  6. var singleValue = Observable.Return<string>("Value");
  7. singleValue.Subscribe(
  8. Console.WriteLine,
  9. () => Console.WriteLine("completed"));
  10. /*
  11. Value
  12. completed
  13. */

Range

ReactiveX - Range operator
Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
Range 创建一个数据流:发送某个特定区间内连续的整数序列然后结束。
ReactiveX 学习笔记(2)创建数据流 - 图6

  • RxNET ```csharp var source = Observable.Range(1, 10); source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”));

/ OnNext(1) OnNext(2) … OnNext(9) OnNext(10) Completed() /

  1. <a name="zz5yh"></a>
  2. ###
  3. <a name="Tne8e"></a>
  4. ### Repeat
  5. [ReactiveX - Repeat operator](http://reactivex.io/documentation/operators/repeat.html)<br />[Reactive Extensions再入門 その3「IObservableのファクトリメソッド」](https://blog.okazuki.jp/entry/20111104/1320409976)<br />Repeat 创建一个数据流:重复多次发送一个或多个值然后结束。<br />![](https://cdn.nlark.com/yuque/0/2021/png/12898916/1625282096707-1be631e2-95dd-488d-96ff-05eb1351bdd1.png#clientId=ub4212919-4d24-4&from=paste&id=ufd26a4fd&margin=%5Bobject%20Object%5D&originHeight=610&originWidth=1280&originalType=url&ratio=1&status=done&style=none&taskId=ue364deca-26ef-403d-a995-7e0c1063128)
  6. - RxNET
  7. ```csharp
  8. var source = Observable.Repeat(2, 5);
  9. source.Subscribe(
  10. i => Console.WriteLine("OnNext({0})", i),
  11. ex => Console.WriteLine("OnError({0})", ex.Message),
  12. () => Console.WriteLine("Completed()"));
  13. /*
  14. OnNext(2)
  15. OnNext(2)
  16. OnNext(2)
  17. OnNext(2)
  18. OnNext(2)
  19. Completed()
  20. */
  1. var source = Observable.Range(1, 3);
  2. source = source.Repeat(3);
  3. source.Subscribe(
  4. i => Console.WriteLine("OnNext({0})", i),
  5. ex => Console.WriteLine("OnError({0})", ex.Message),
  6. () => Console.WriteLine("Completed()"));
  7. /*
  8. OnNext(1)
  9. OnNext(2)
  10. OnNext(3)
  11. OnNext(1)
  12. OnNext(2)
  13. OnNext(3)
  14. OnNext(1)
  15. OnNext(2)
  16. OnNext(3)
  17. Completed()
  18. */

Start / FromCallable / ToAsync

ReactiveX - Start operator
Intro to Rx - Start
Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
Start / FromCallable 创建一个数据流:执行指定函数并发送函数的返回值,然后结束。
Start / FromCallable 缺省情况下会在新的线程中执行指定函数。
ToAsync 可以将一个普通函数转化为创建数据流的函数。
通过调用 ToAsync 的返回值函数,可以创建一个数据流:执行指定函数并发送函数的返回值,然后结束。
可以说 ToAsync 是 Start 的带参数版本。
ReactiveX 学习笔记(2)创建数据流 - 图7
ReactiveX 学习笔记(2)创建数据流 - 图8

  • RxNET ```csharp static void StartAction() { var start = Observable.Start(() => {
    1. Console.Write("Working away");
    2. for (int i = 0; i < 10; i++)
    3. {
    4. Thread.Sleep(100);
    5. Console.Write(".");
    6. }
    }); start.Subscribe( unit => Console.WriteLine(“Unit published”), () => Console.WriteLine(“Action completed”)); } static void StartFunc() { var start = Observable.Start(() => {
    1. Console.Write("Working away");
    2. for (int i = 0; i < 10; i++)
    3. {
    4. Thread.Sleep(100);
    5. Console.Write(".");
    6. }
    7. return "Published value";
    }); start.Subscribe( Console.WriteLine, () => Console.WriteLine(“Action completed”)); } StartAction(); Console.ReadLine(); StartFunc(); Console.ReadLine();

/ Working away……….Unit published Action completed {1} Working away……….Published value Action completed /

  1. ```csharp
  2. var source = new Func<int,int>(i =>
  3. {
  4. Console.WriteLine("background task start.");
  5. Thread.Sleep(2000);
  6. Console.WriteLine("background task end.");
  7. return i;
  8. }).ToAsync();
  9. Console.WriteLine("source(1) call.");
  10. var subscription1 = source(1).Subscribe(
  11. i => Console.WriteLine("1##OnNext({0})", i),
  12. ex => Console.WriteLine("1##OnError({0})", ex.Message),
  13. () => Console.WriteLine("1##Completed()"));
  14. Console.WriteLine("sleep 3sec.");
  15. Thread.Sleep(3000);
  16. Console.WriteLine("dispose method call.");
  17. subscription1.Dispose();
  18. Console.WriteLine("source(2) call.");
  19. var subscription2 = source(2).Subscribe(
  20. i => Console.WriteLine("2##OnNext({0})", i),
  21. ex => Console.WriteLine("2##OnError({0})", ex.Message),
  22. () => Console.WriteLine("2##Completed()"));
  23. Console.WriteLine("sleep 3sec.");
  24. Thread.Sleep(3000);
  25. Console.WriteLine("dispose method call.");
  26. subscription2.Dispose();
  27. /*
  28. source(1) call.
  29. background task start.
  30. sleep 3sec.
  31. background task end.
  32. 1##OnNext(1)
  33. 1##Completed()
  34. dispose method call.
  35. source(2) call.
  36. sleep 3sec.
  37. background task start.
  38. background task end.
  39. 2##OnNext(2)
  40. 2##Completed()
  41. dispose method call.
  42. */

Timer

ReactiveX - Timer operator
Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
Timer 创建一个数据流:延迟指定时间发送一个值然后结束。
ReactiveX 学习笔记(2)创建数据流 - 图9

  • RxNET ```csharp var timer = Observable.Timer(TimeSpan.FromSeconds(1)); timer.Subscribe( Console.WriteLine, () => Console.WriteLine(“completed”));

/ 0 completed / ```