本文为作者原创,转载请注明出处:https://www.cnblogs.com/zwvista/p/9324298.html
操作符(Operators)
Creating Observables
Create / Generate
Defer
Empty / Never / Throw
From / ToObservable / AsObservable
Interval
Just / Return / Of
Range
Repeat
Start / FromCallable / ToAsync
Timer
操作符(Operators)
Rx 的操作符能够操作(创建/转换/组合) Observable。
Creating Observables
本文主题为创建/生成 Observable 的操作符。
这里的 Observable 实质上是可观察的数据流。
RxJava操作符(一)Creating Observables
Creation Operators
Create / Generate
ReactiveX - Create operator
Create 通过调用 Observer 的 3 个方法 OnNext, OnError, OnComplete 来创建数据流。
Generate 通过模拟 for 循环来生成数据流。
Generate 带有如下参数
- 初始状态
- 终止条件
- 一个能根据当前状态产生下一个状态的函数
- 一个能把状态转换为输出结果的函数
- (可选)一个指定状态产生时间的函数
RxNET
var ob = Observable.Create<string>(observer =>
{
var timer = new Timer();
timer.Interval = 1000;
timer.Elapsed += (s, e) => observer.OnNext("tick");
timer.Elapsed += (s, e) => Console.WriteLine(e.SignalTime);
timer.Start();
return timer;
});
using (ob.Subscribe(Console.WriteLine))
Console.ReadLine();
/*
tick
2018/07/18 10:32:09
tick
2018/07/18 10:32:10
...
*/
var source = Observable.Generate(0, i => i < 10, i => ++i, i => i * i);
var subscription = source.Subscribe(
i => Console.WriteLine("OnNext({0})", i),
ex => Console.WriteLine("OnError({0})", ex.Message),
() => Console.WriteLine("Completed()"));
/*
OnNext(0)
OnNext(1)
OnNext(4)
OnNext(9)
OnNext(16)
OnNext(25)
OnNext(36)
OnNext(49)
OnNext(64)
OnNext(81)
Completed()
*/
var source = Observable.Generate(
0,
i => i < 10,
i => ++i,
i => i * i,
i => TimeSpan.FromMilliseconds(i * 100));
var subscription = source.Subscribe(
i => Console.WriteLine("OnNext({0})", i),
ex => Console.WriteLine("OnError({0})", ex.Message),
() => Console.WriteLine("Completed()"));
Console.ReadLine();
/*
OnNext(0)
OnNext(1)
OnNext(4)
OnNext(9)
OnNext(16)
OnNext(25)
OnNext(36)
OnNext(49)
OnNext(64)
OnNext(81)
Completed()
*/
Defer
ReactiveX - Defer operator
Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
Defer 创建数据流,但只有在有观察者来订阅时才创建数据流,而且对于每个观察者来说都创建一个新的数据流。RxNET ```csharp var source = Observable.Defer
(() => { Console.WriteLine(“# Defer method called.”); var s = new ReplaySubject (); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnCompleted(); return s.AsObservable(); }); var subscription1 = source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”)); var subscription2 = source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”));
/*
Defer method called.
OnNext(1) OnNext(2) OnNext(3) Completed()
Defer method called.
OnNext(1) OnNext(2) OnNext(3) Completed() */
<a name="My1ON"></a>
### <br /><br />
<a name="aERUN"></a>
### From / ToObservable / AsObservable
[ReactiveX - From operator](http://reactivex.io/documentation/operators/from.html)<br />[Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」](https://blog.okazuki.jp/entry/20111109/1320849106)<br />From / ToObservable / AsObservable 把其他对象和数据类型转换成数据流。<br /><br />
- RxNET
将 **IEnumerable<T>** 转换成数据流。
```csharp
var values = new List<string> { "Rx", "is", "easy" };
values.ToObservable().Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));
/*
Rx
is
easy
Completed
*/
将 Task
var t = Task.Factory.StartNew(() => "Test");
var source = t.ToObservable();
source.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));
/*
Test
Completed
*/
public class LetterRepo
{
private readonly ReplaySubject<string> _letters;
public LetterRepo()
{
_letters = new ReplaySubject<string>();
_letters.OnNext("A");
_letters.OnNext("B");
_letters.OnNext("C");
}
public IObservable<string> GetLetters()
{
return _letters.AsObservable();
}
}
var repo = new LetterRepo();
var good = repo.GetLetters();
var evil = repo.GetLetters();
good.Subscribe(
Console.WriteLine);
//Be naughty
var asSubject = evil as ISubject<string>;
if (asSubject != null)
{
//So naughty, 1 is not a letter!
asSubject.OnNext("1");
}
else
{
Console.WriteLine("could not sabotage");
}
/*
A
B
C
could not sabotage
*/
var eventSource = new EventSource();
var source = Observable.FromEvent<EventHandler, EventArgs>(
h => (s, e) => h(e),
h =>
{
Console.WriteLine("add handler");
eventSource.Raised += h;
},
h =>
{
Console.WriteLine("remove handler");
eventSource.Raised -= h;
});
var subscription1 = source.Subscribe(
i => Console.WriteLine("1##OnNext({0})", i),
ex => Console.WriteLine("1##OnError({0})", ex.Message),
() => Console.WriteLine("1##Completed()"));
var subscription2 = source.Subscribe(
i => Console.WriteLine("2##OnNext({0})", i),
ex => Console.WriteLine("2##OnError({0})", ex.Message),
() => Console.WriteLine("2##Completed()"));
eventSource.OnRaised();
eventSource.OnRaised();
Console.WriteLine("dispose method call.");
subscription1.Dispose();
subscription2.Dispose();
/*
add handler
1##OnNext(System.EventArgs)
2##OnNext(System.EventArgs)
1##OnNext(System.EventArgs)
2##OnNext(System.EventArgs)
dispose method call.
remove handler
*/
class EventSource
{
public event EventHandler Raised;
public void OnRaised()
{
var h = this.Raised;
if (h != null)
{
h(this, EventArgs.Empty);
}
}
}
Func<int, int, int> asyncProcess = (x, y) =>
{
Console.WriteLine("process start.");
Thread.Sleep(2000);
Console.WriteLine("process end.");
return x + y;
};
var source = Observable.FromAsync(() => Task.FromResult(asyncProcess(10, 2)));
Console.WriteLine("subscribe1");
var subscription1 = source.Subscribe(
i => Console.WriteLine("1##OnNext({0})", i),
ex => Console.WriteLine("1##OnError({0})", ex.Message),
() => Console.WriteLine("1##Completed()"));
Console.WriteLine("sleep 5sec");
Thread.Sleep(5000);
Console.WriteLine("dispose method call.");
subscription1.Dispose();
Console.WriteLine("subscribe2");
var subscription2 = source.Subscribe(
i => Console.WriteLine("2##OnNext({0})", i),
ex => Console.WriteLine("2##OnError({0})", ex.Message),
() => Console.WriteLine("2##Completed()"));
Console.WriteLine("dispose method call.");
subscription2.Dispose();
/*
subscribe1
process start.
process end.
1##OnNext(12)
1##Completed()
sleep 5sec
dispose method call.
subscribe2
process start.
process end.
2##OnNext(12)
2##Completed()
dispose method call.
*/
Interval
ReactiveX - Interval operator
Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
Interval 创建一个数据流:每隔指定时间就发送一个值,只有在取消订阅时才会结束。
- RxNET ```csharp var interval = Observable.Interval(TimeSpan.FromMilliseconds(250)); using (interval.Subscribe( Console.WriteLine, () => Console.WriteLine(“completed”))) Console.ReadLine();
/ 0 1 2 … /
<a name="b640F"></a>
### Just / Return / Of
[ReactiveX - Just operator](http://reactivex.io/documentation/operators/just.html)<br />[Reactive Extensions再入門 その3「IObservableのファクトリメソッド」](https://blog.okazuki.jp/entry/20111104/1320409976)<br />Just/Of 创建一个数据流:发送一个或多个值然后结束。<br />Return 创建一个数据流:发送一个值然后结束。<br />
- RxNET
```csharp
var singleValue = Observable.Return<string>("Value");
singleValue.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));
/*
Value
completed
*/
Range
ReactiveX - Range operator
Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
Range 创建一个数据流:发送某个特定区间内连续的整数序列然后结束。
- RxNET ```csharp var source = Observable.Range(1, 10); source.Subscribe( i => Console.WriteLine(“OnNext({0})”, i), ex => Console.WriteLine(“OnError({0})”, ex.Message), () => Console.WriteLine(“Completed()”));
/ OnNext(1) OnNext(2) … OnNext(9) OnNext(10) Completed() /
<a name="zz5yh"></a>
###
<a name="Tne8e"></a>
### Repeat
[ReactiveX - Repeat operator](http://reactivex.io/documentation/operators/repeat.html)<br />[Reactive Extensions再入門 その3「IObservableのファクトリメソッド」](https://blog.okazuki.jp/entry/20111104/1320409976)<br />Repeat 创建一个数据流:重复多次发送一个或多个值然后结束。<br />
- RxNET
```csharp
var source = Observable.Repeat(2, 5);
source.Subscribe(
i => Console.WriteLine("OnNext({0})", i),
ex => Console.WriteLine("OnError({0})", ex.Message),
() => Console.WriteLine("Completed()"));
/*
OnNext(2)
OnNext(2)
OnNext(2)
OnNext(2)
OnNext(2)
Completed()
*/
var source = Observable.Range(1, 3);
source = source.Repeat(3);
source.Subscribe(
i => Console.WriteLine("OnNext({0})", i),
ex => Console.WriteLine("OnError({0})", ex.Message),
() => Console.WriteLine("Completed()"));
/*
OnNext(1)
OnNext(2)
OnNext(3)
OnNext(1)
OnNext(2)
OnNext(3)
OnNext(1)
OnNext(2)
OnNext(3)
Completed()
*/
Start / FromCallable / ToAsync
ReactiveX - Start operator
Intro to Rx - Start
Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
Start / FromCallable 创建一个数据流:执行指定函数并发送函数的返回值,然后结束。
Start / FromCallable 缺省情况下会在新的线程中执行指定函数。
ToAsync 可以将一个普通函数转化为创建数据流的函数。
通过调用 ToAsync 的返回值函数,可以创建一个数据流:执行指定函数并发送函数的返回值,然后结束。
可以说 ToAsync 是 Start 的带参数版本。
- RxNET
```csharp
static void StartAction()
{
var start = Observable.Start(() =>
{
}); start.Subscribe( unit => Console.WriteLine(“Unit published”), () => Console.WriteLine(“Action completed”)); } static void StartFunc() { var start = Observable.Start(() => {Console.Write("Working away");
for (int i = 0; i < 10; i++)
{
Thread.Sleep(100);
Console.Write(".");
}
}); start.Subscribe( Console.WriteLine, () => Console.WriteLine(“Action completed”)); } StartAction(); Console.ReadLine(); StartFunc(); Console.ReadLine();Console.Write("Working away");
for (int i = 0; i < 10; i++)
{
Thread.Sleep(100);
Console.Write(".");
}
return "Published value";
/ Working away……….Unit published Action completed {1} Working away……….Published value Action completed /
```csharp
var source = new Func<int,int>(i =>
{
Console.WriteLine("background task start.");
Thread.Sleep(2000);
Console.WriteLine("background task end.");
return i;
}).ToAsync();
Console.WriteLine("source(1) call.");
var subscription1 = source(1).Subscribe(
i => Console.WriteLine("1##OnNext({0})", i),
ex => Console.WriteLine("1##OnError({0})", ex.Message),
() => Console.WriteLine("1##Completed()"));
Console.WriteLine("sleep 3sec.");
Thread.Sleep(3000);
Console.WriteLine("dispose method call.");
subscription1.Dispose();
Console.WriteLine("source(2) call.");
var subscription2 = source(2).Subscribe(
i => Console.WriteLine("2##OnNext({0})", i),
ex => Console.WriteLine("2##OnError({0})", ex.Message),
() => Console.WriteLine("2##Completed()"));
Console.WriteLine("sleep 3sec.");
Thread.Sleep(3000);
Console.WriteLine("dispose method call.");
subscription2.Dispose();
/*
source(1) call.
background task start.
sleep 3sec.
background task end.
1##OnNext(1)
1##Completed()
dispose method call.
source(2) call.
sleep 3sec.
background task start.
background task end.
2##OnNext(2)
2##Completed()
dispose method call.
*/
Timer
ReactiveX - Timer operator
Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
Timer 创建一个数据流:延迟指定时间发送一个值然后结束。
- RxNET ```csharp var timer = Observable.Timer(TimeSpan.FromSeconds(1)); timer.Subscribe( Console.WriteLine, () => Console.WriteLine(“completed”));
/ 0 completed / ```