目的:类似与Kafka官方提供了连接器,但是如果其它没有提供连接器就可以通过这种方式去解决
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>

代码演示

DataStream用于实时数据统计

  1. //创建流处理执行环境
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. //传入自定义的数据源
  4. DataStream<String> dataStream = env.addSource(new MySource());
  5. dataStream.print();
  6. env.execute();

自定义数据源

  1. public static class MySource implements SourceFunction<String> { //传入的泛型为接收的类型
  2. private boolean running = true; //循环结束标志位
  3. public void run(SourceContext<String> sourceContext) throws Exception {
  4. while (running) {
  5. sourceContext.collect("test");
  6. }
  7. Thread.sleep(1000);
  8. }
  9. public void cancel() {
  10. running = false;
  11. }
  12. }

自定义数据源.rar

结果

>前代表线程

  1. 4> test
  2. 4> test
  3. 4> test
  4. 4> test