目的:类似与Kafka官方提供了连接器,但是如果其它没有提供连接器就可以通过这种方式去解决
环境:jdk8 工具:idea
创建maven项目
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
DataStream用于实时数据统计
//创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//传入自定义的数据源
DataStream<String> dataStream = env.addSource(new MySource());
dataStream.print();
env.execute();
自定义数据源
public static class MySource implements SourceFunction<String> { //传入的泛型为接收的类型
private boolean running = true; //循环结束标志位
public void run(SourceContext<String> sourceContext) throws Exception {
while (running) {
sourceContext.collect("test");
}
Thread.sleep(1000);
}
public void cancel() {
running = false;
}
}
自定义数据源.rar (5 KB)
结果
>前代表线程
4> test
4> test
4> test
4> test