目的:三种基本转换算子map、flatMap、filter
环境:jdk8 工具:idea
创建maven项目
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> dataStream = env.readTextFile("绝对路径");//对值进行处理后可以用不同格式返回DataStream<Integer> integerDataStream = dataStream.map(new MapFunction<String, Integer>() {public Integer map(String value) throws Exception {return value.length();}});//单个值的输入后可以进行多值输出DataStream<String> flatMap = dataStream.flatMap(new FlatMapFunction<String, String>() {public void flatMap(String value, Collector<String> out) throws Exception {String[] fields=value.split(" ");for (String field:fields){out.collect(field);}}});//进行过滤元素DataStream<String> stringDataStream=dataStream.filter(new FilterFunction<String>() {public boolean filter(String value) throws Exception {return value.contains("test");}});stringDataStream.print("filter");flatMap.print("flatMap");integerDataStream.print("map");
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.map:3> 10flatMap:3> helloflatMap:3> testfilter:3> hello testmap:4> 10flatMap:4> helloflatMap:4> javamap:2> 10flatMap:2> helloflatMap:2> word
