1、just方式创建响应式流

  1. public static void main(String[] args) {
  2. // 创建响应式流
  3. Observable<String> just = Observable.just("1", "2", "3", "4", "5");
  4. just.subscribe(
  5. item -> System.out.println("下一个元素:" + item),
  6. ex -> System.err.println("异常信息:" + ex.getMessage()),
  7. () -> System.out.println("流结束")
  8. );
  9. }

2、通过数组创建响应式流

  1. public static void main(String[] args) {
  2. Observable<Integer> integerObservable = Observable.fromArray(new Integer[]{1,2,3,4,5});
  3. integerObservable.subscribe(
  4. System.out::println,
  5. System.err::println,
  6. () -> System.out.println("流结束")
  7. );
  8. }

3、通过集合创建响应式流

  1. public static void main(String[] args) {
  2. var list = new ArrayList<>();
  3. for (int i = 0; i < 10; i++) {
  4. list.add(i);
  5. }
  6. //Observable<Object> objectObservable = Observable.fromIterable(Collections.emptyList());
  7. Observable<Object> objectObservable = Observable.fromIterable(list);
  8. objectObservable.subscribe(
  9. System.out::println,
  10. System.err::println
  11. );
  12. }

3、通过Callable创建响应式流

  1. public static void main(String[] args) {
  2. Observable.fromCallable(() -> "hello")
  3. .subscribe(System.out::println);
  4. }

3、通过线程池提交任务创建响应式流

  1. public static void main(String[] args) {
  2. Future<String> future = Executors.newCachedThreadPool().submit(() -> "hello");
  3. // 从callable创建响应式流,订阅,消费
  4. Observable.fromFuture(future).subscribe(System.out::println);
  5. }