前置知识:观察者模式&java.util.Observable

RxJAVA2 = Observer + 异步处理

添加Maven依赖

  1. <dependency>
  2. <groupId>io.reactivex.rxjava2</groupId>
  3. <artifactId>rxjava</artifactId>
  4. <version>2.2.10</version>
  5. </dependency>

Consumer示例

  1. import io.reactivex.Observable;
  2. import io.reactivex.functions.Consumer;
  3. import io.reactivex.schedulers.Schedulers;
  4. public class Main {
  5. public static void main(String[] args) {
  6. Consumer<String> consumer = s -> {
  7. Thread.sleep(100L);
  8. System.out.println(Thread.currentThread().getName() + " : " + s);
  9. };
  10. Observable<String> observable = Observable.create(emitter -> {
  11. emitter.onNext("foo");
  12. emitter.onNext("foo2");
  13. emitter.onNext("foo3");
  14. });
  15. //异步调用
  16. observable.observeOn(Schedulers.newThread()).subscribe(consumer);
  17. //同步調用
  18. observable.subscribe(consumer);
  19. }
  20. }

Observer示例

  1. import io.reactivex.Observable;
  2. import io.reactivex.Observer;
  3. import io.reactivex.disposables.Disposable;
  4. public class Main2 {
  5. public static void main(String[] args) {
  6. Observer<String> observer = new Observer<String>() {
  7. @Override
  8. public void onSubscribe(Disposable disposable) {
  9. System.out.println("onSubscribe:" + disposable);
  10. }
  11. @Override
  12. public void onNext(String s) {
  13. System.out.println("onNext:" + s);
  14. // if ("foo3".equals(s)) {
  15. // throw new IllegalArgumentException("foo3");
  16. // }
  17. }
  18. @Override
  19. public void onError(Throwable throwable) {
  20. System.out.println("onError:" + throwable.getMessage());
  21. }
  22. @Override
  23. public void onComplete() {
  24. System.out.println("onComplete");
  25. }
  26. };
  27. Observable<String> observable = Observable.create(emitter -> {
  28. try {
  29. emitter.onNext("foo");
  30. emitter.onNext("foo2");
  31. emitter.onNext("foo3");
  32. emitter.onNext("foo4");
  33. } catch (Exception e) {
  34. emitter.onError(e);
  35. }
  36. emitter.onComplete();
  37. });
  38. //同步調用
  39. observable.subscribe(observer);
  40. }
  41. }

链式调用

  1. import io.reactivex.Observable;
  2. import io.reactivex.ObservableOnSubscribe;
  3. import io.reactivex.schedulers.Schedulers;
  4. public class Main3 {
  5. public static void main(String[] args) {
  6. Observable.create((ObservableOnSubscribe<String>) emitter -> {
  7. emitter.onNext("foo");
  8. emitter.onNext("foo2");
  9. emitter.onNext("foo3");
  10. })
  11. .observeOn(Schedulers.newThread())
  12. .subscribe(s -> System.out.println(s));
  13. for (;;){}
  14. }
  15. }

拓展阅读

RxJava2 只看这一篇文章就够了 —— 玉刚说

https://juejin.im/post/5b17560e6fb9a01e2862246f