目的:三种基本转换算子map、flatMap、filter
环境:jdk8 工具:idea
创建maven项目
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.readTextFile("绝对路径");
//对值进行处理后可以用不同格式返回
DataStream<Integer> integerDataStream = dataStream.map(new MapFunction<String, Integer>() {
public Integer map(String value) throws Exception {
return value.length();
}
});
//单个值的输入后可以进行多值输出
DataStream<String> flatMap = dataStream.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields=value.split(" ");
for (String field:fields){
out.collect(field);
}
}
});
//进行过滤元素
DataStream<String> stringDataStream=dataStream.filter(new FilterFunction<String>() {
public boolean filter(String value) throws Exception {
return value.contains("test");
}
});
stringDataStream.print("filter");
flatMap.print("flatMap");
integerDataStream.print("map");
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
map:3> 10
flatMap:3> hello
flatMap:3> test
filter:3> hello test
map:4> 10
flatMap:4> hello
flatMap:4> java
map:2> 10
flatMap:2> hello
flatMap:2> word