本文为作者原创,转载请注明出处:https://www.cnblogs.com/zwvista/p/9324298.html
操作符(Operators)
Creating Observables
Create / Generate
Defer
Empty / Never / Throw
From / ToObservable / AsObservable
Interval
Just / Return / Of
Range
Repeat
Start / FromCallable / ToAsync
Timer
操作符(Operators)
Rx 的操作符能够操作(创建/转换/组合) Observable。
Creating Observables
本文主题为创建/生成 Observable 的操作符。
这里的 Observable 实质上是可观察的数据流。
RxJava操作符(一)Creating Observables
Creation Operators
Create / Generate
ReactiveX - Create operator
Create 通过调用 Observer 的 3 个方法 OnNext, OnError, OnComplete 来创建数据流。
Generate 通过模拟 for 循环来生成数据流。
Generate 带有如下参数
- 初始状态
 - 终止条件
 - 一个能根据当前状态产生下一个状态的函数
 - 一个能把状态转换为输出结果的函数
 - (可选)一个指定状态产生时间的函数
 


RxNET
var ob = Observable.Create<string>(observer =>{var timer = new Timer();timer.Interval = 1000;timer.Elapsed += (s, e) => observer.OnNext("tick");timer.Elapsed += (s, e) => Console.WriteLine(e.SignalTime);timer.Start();return timer;});using (ob.Subscribe(Console.WriteLine))Console.ReadLine();/*tick2018/07/18 10:32:09tick2018/07/18 10:32:10...*/
var source = Observable.Generate(0, i => i < 10, i => ++i, i => i * i);var subscription = source.Subscribe(i => Console.WriteLine("OnNext({0})", i),ex => Console.WriteLine("OnError({0})", ex.Message),() => Console.WriteLine("Completed()"));/*OnNext(0)OnNext(1)OnNext(4)OnNext(9)OnNext(16)OnNext(25)OnNext(36)OnNext(49)OnNext(64)OnNext(81)Completed()*/
var source = Observable.Generate(0,i => i < 10,i => ++i,i => i * i,i => TimeSpan.FromMilliseconds(i * 100));var subscription = source.Subscribe(i => Console.WriteLine("OnNext({0})", i),ex => Console.WriteLine("OnError({0})", ex.Message),() => Console.WriteLine("Completed()"));Console.ReadLine();/*OnNext(0)OnNext(1)OnNext(4)OnNext(9)OnNext(16)OnNext(25)OnNext(36)OnNext(49)OnNext(64)OnNext(81)Completed()*/
Defer
ReactiveX - Defer operator
Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
Defer 创建数据流,但只有在有观察者来订阅时才创建数据流,而且对于每个观察者来说都创建一个新的数据流。
RxNET ```csharp var source = Observable.Defer
(() => { Console.WriteLine(“# Defer method called.”); var s = new ReplaySubject (); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnCompleted(); return s.AsObservable(); }); var subscription1 = source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”)); var subscription2 = source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”)); 
/*
Defer method called.
OnNext(1) OnNext(2) OnNext(3) Completed()
Defer method called.
OnNext(1) OnNext(2) OnNext(3) Completed() */
<a name="My1ON"></a>### <br /><br /><a name="aERUN"></a>### From / ToObservable / AsObservable[ReactiveX - From operator](http://reactivex.io/documentation/operators/from.html)<br />[Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」](https://blog.okazuki.jp/entry/20111109/1320849106)<br />From / ToObservable / AsObservable 把其他对象和数据类型转换成数据流。<br /><br />- RxNET将 **IEnumerable<T>** 转换成数据流。```csharpvar values = new List<string> { "Rx", "is", "easy" };values.ToObservable().Subscribe(Console.WriteLine,() => Console.WriteLine("Completed"));/*RxiseasyCompleted*/
将 Task
var t = Task.Factory.StartNew(() => "Test");var source = t.ToObservable();source.Subscribe(Console.WriteLine,() => Console.WriteLine("completed"));/*TestCompleted*/
public class LetterRepo{private readonly ReplaySubject<string> _letters;public LetterRepo(){_letters = new ReplaySubject<string>();_letters.OnNext("A");_letters.OnNext("B");_letters.OnNext("C");}public IObservable<string> GetLetters(){return _letters.AsObservable();}}var repo = new LetterRepo();var good = repo.GetLetters();var evil = repo.GetLetters();good.Subscribe(Console.WriteLine);//Be naughtyvar asSubject = evil as ISubject<string>;if (asSubject != null){//So naughty, 1 is not a letter!asSubject.OnNext("1");}else{Console.WriteLine("could not sabotage");}/*ABCcould not sabotage*/

var eventSource = new EventSource();var source = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e),h =>{Console.WriteLine("add handler");eventSource.Raised += h;},h =>{Console.WriteLine("remove handler");eventSource.Raised -= h;});var subscription1 = source.Subscribe(i => Console.WriteLine("1##OnNext({0})", i),ex => Console.WriteLine("1##OnError({0})", ex.Message),() => Console.WriteLine("1##Completed()"));var subscription2 = source.Subscribe(i => Console.WriteLine("2##OnNext({0})", i),ex => Console.WriteLine("2##OnError({0})", ex.Message),() => Console.WriteLine("2##Completed()"));eventSource.OnRaised();eventSource.OnRaised();Console.WriteLine("dispose method call.");subscription1.Dispose();subscription2.Dispose();/*add handler1##OnNext(System.EventArgs)2##OnNext(System.EventArgs)1##OnNext(System.EventArgs)2##OnNext(System.EventArgs)dispose method call.remove handler*/
class EventSource{public event EventHandler Raised;public void OnRaised(){var h = this.Raised;if (h != null){h(this, EventArgs.Empty);}}}Func<int, int, int> asyncProcess = (x, y) =>{Console.WriteLine("process start.");Thread.Sleep(2000);Console.WriteLine("process end.");return x + y;};var source = Observable.FromAsync(() => Task.FromResult(asyncProcess(10, 2)));Console.WriteLine("subscribe1");var subscription1 = source.Subscribe(i => Console.WriteLine("1##OnNext({0})", i),ex => Console.WriteLine("1##OnError({0})", ex.Message),() => Console.WriteLine("1##Completed()"));Console.WriteLine("sleep 5sec");Thread.Sleep(5000);Console.WriteLine("dispose method call.");subscription1.Dispose();Console.WriteLine("subscribe2");var subscription2 = source.Subscribe(i => Console.WriteLine("2##OnNext({0})", i),ex => Console.WriteLine("2##OnError({0})", ex.Message),() => Console.WriteLine("2##Completed()"));Console.WriteLine("dispose method call.");subscription2.Dispose();/*subscribe1process start.process end.1##OnNext(12)1##Completed()sleep 5secdispose method call.subscribe2process start.process end.2##OnNext(12)2##Completed()dispose method call.*/
Interval
ReactiveX - Interval operator
Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
Interval 创建一个数据流:每隔指定时间就发送一个值,只有在取消订阅时才会结束。
- RxNET ```csharp var interval = Observable.Interval(TimeSpan.FromMilliseconds(250)); using (interval.Subscribe( Console.WriteLine, () => Console.WriteLine(“completed”))) Console.ReadLine();
 
/ 0 1 2 … /
<a name="b640F"></a>### Just / Return / Of[ReactiveX - Just operator](http://reactivex.io/documentation/operators/just.html)<br />[Reactive Extensions再入門 その3「IObservableのファクトリメソッド」](https://blog.okazuki.jp/entry/20111104/1320409976)<br />Just/Of 创建一个数据流:发送一个或多个值然后结束。<br />Return 创建一个数据流:发送一个值然后结束。<br />- RxNET```csharpvar singleValue = Observable.Return<string>("Value");singleValue.Subscribe(Console.WriteLine,() => Console.WriteLine("completed"));/*Valuecompleted*/
Range
ReactiveX - Range operator
Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
Range 创建一个数据流:发送某个特定区间内连续的整数序列然后结束。
- RxNET ```csharp var source = Observable.Range(1, 10); source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”));
 
/ OnNext(1) OnNext(2) … OnNext(9) OnNext(10) Completed() /
<a name="zz5yh"></a>###<a name="Tne8e"></a>### Repeat[ReactiveX - Repeat operator](http://reactivex.io/documentation/operators/repeat.html)<br />[Reactive Extensions再入門 その3「IObservableのファクトリメソッド」](https://blog.okazuki.jp/entry/20111104/1320409976)<br />Repeat 创建一个数据流:重复多次发送一个或多个值然后结束。<br />- RxNET```csharpvar source = Observable.Repeat(2, 5);source.Subscribe(i => Console.WriteLine("OnNext({0})", i),ex => Console.WriteLine("OnError({0})", ex.Message),() => Console.WriteLine("Completed()"));/*OnNext(2)OnNext(2)OnNext(2)OnNext(2)OnNext(2)Completed()*/
var source = Observable.Range(1, 3);source = source.Repeat(3);source.Subscribe(i => Console.WriteLine("OnNext({0})", i),ex => Console.WriteLine("OnError({0})", ex.Message),() => Console.WriteLine("Completed()"));/*OnNext(1)OnNext(2)OnNext(3)OnNext(1)OnNext(2)OnNext(3)OnNext(1)OnNext(2)OnNext(3)Completed()*/
Start / FromCallable / ToAsync
ReactiveX - Start operator
Intro to Rx - Start
Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
Start / FromCallable 创建一个数据流:执行指定函数并发送函数的返回值,然后结束。
Start / FromCallable 缺省情况下会在新的线程中执行指定函数。
ToAsync 可以将一个普通函数转化为创建数据流的函数。
通过调用 ToAsync 的返回值函数,可以创建一个数据流:执行指定函数并发送函数的返回值,然后结束。
可以说 ToAsync 是 Start 的带参数版本。

- RxNET
```csharp
static void StartAction()
{
  var start = Observable.Start(() =>
  {
}); start.Subscribe( unit => Console.WriteLine(“Unit published”), () => Console.WriteLine(“Action completed”)); } static void StartFunc() { var start = Observable.Start(() => {Console.Write("Working away");for (int i = 0; i < 10; i++){Thread.Sleep(100);Console.Write(".");}
}); start.Subscribe( Console.WriteLine, () => Console.WriteLine(“Action completed”)); } StartAction(); Console.ReadLine(); StartFunc(); Console.ReadLine();Console.Write("Working away");for (int i = 0; i < 10; i++){Thread.Sleep(100);Console.Write(".");}return "Published value";
 
/ Working away……….Unit published Action completed {1} Working away……….Published value Action completed /
```csharpvar source = new Func<int,int>(i =>{Console.WriteLine("background task start.");Thread.Sleep(2000);Console.WriteLine("background task end.");return i;}).ToAsync();Console.WriteLine("source(1) call.");var subscription1 = source(1).Subscribe(i => Console.WriteLine("1##OnNext({0})", i),ex => Console.WriteLine("1##OnError({0})", ex.Message),() => Console.WriteLine("1##Completed()"));Console.WriteLine("sleep 3sec.");Thread.Sleep(3000);Console.WriteLine("dispose method call.");subscription1.Dispose();Console.WriteLine("source(2) call.");var subscription2 = source(2).Subscribe(i => Console.WriteLine("2##OnNext({0})", i),ex => Console.WriteLine("2##OnError({0})", ex.Message),() => Console.WriteLine("2##Completed()"));Console.WriteLine("sleep 3sec.");Thread.Sleep(3000);Console.WriteLine("dispose method call.");subscription2.Dispose();/*source(1) call.background task start.sleep 3sec.background task end.1##OnNext(1)1##Completed()dispose method call.source(2) call.sleep 3sec.background task start.background task end.2##OnNext(2)2##Completed()dispose method call.*/
Timer
ReactiveX - Timer operator
Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
Timer 创建一个数据流:延迟指定时间发送一个值然后结束。
- RxNET ```csharp var timer = Observable.Timer(TimeSpan.FromSeconds(1)); timer.Subscribe( Console.WriteLine, () => Console.WriteLine(“completed”));
 
/ 0 completed / ```
