目的:类似与Kafka官方提供了连接器,但是如果其它没有提供连接器就可以通过这种方式去解决
环境:jdk8 工具:idea
创建maven项目
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
代码演示
DataStream用于实时数据统计
//创建流处理执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//传入自定义的数据源DataStream<String> dataStream = env.addSource(new MySource());dataStream.print();env.execute();
自定义数据源
public static class MySource implements SourceFunction<String> { //传入的泛型为接收的类型private boolean running = true; //循环结束标志位public void run(SourceContext<String> sourceContext) throws Exception {while (running) {sourceContext.collect("test");}Thread.sleep(1000);}public void cancel() {running = false;}}
自定义数据源.rar (5 KB)
结果
>前代表线程
4> test4> test4> test4> test
