:::tips

  • 使用 Rx.Observable.create() 创建响应式流。该流通过显式的通知每一个 Observer (或者说是 “Subscriber”) 数据事件onNext() 或者错误事件onError()。
  • 与 Java StreamAPI 相比,Observable 是可重用的。 :::

    RxJava

    1. public static void rxjava(){
    2. Observable<Object> observable = Observable.create(
    3. subscriber -> {
    4. for (int i = 0; i < 10; i++) {
    5. // 传输响应式流中的元素
    6. subscriber.onNext("rx1 -- " + i);
    7. }
    8. // 通知订阅者,响应式流结束
    9. subscriber.onCompleted();
    10. });
    11. observable.subscribe(
    12. item -> System.out.println("下一个元素是:" + item),
    13. ex -> System.err.println("异常信息:" + ex.getMessage()),
    14. () -> System.out.println("响应式流结束")
    15. );
    16. }

RxJava2

  1. public static void rxjava2(){
  2. Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
  5. for (int i = 0; i < 10; i++) {
  6. observableEmitter.onNext("rxjava2 - " + i);
  7. }
  8. observableEmitter.onComplete();
  9. }
  10. });
  11. observable.subscribe(
  12. System.out::println,
  13. System.err::println,
  14. () -> System.out.println("流结束")
  15. );
  16. }

RxJava3

  1. public static void rxjava3(){
  2. Observable.create((ObservableOnSubscribe<String>) emitter -> {
  3. for (int i = 0; i < 10; i++) {
  4. emitter.onNext("rxjava3 - " + i);
  5. }
  6. emitter.onComplete();
  7. }).subscribe(
  8. System.out::println,
  9. System.err::println,
  10. () -> System.out.println("流结束")
  11. );
  12. }