配置jar包
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.2</version> </dependency>
数据汇总计算
package top.windtown;import org.apache.flink.api.common.operators.Order;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;/** * @author bill * @date 2021/9/19 */public class WordCount { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> source = env.readTextFile("C:\\code\\flinkdemo\\src\\main\\resources\\txt\\panda.txt"); DataSet<Tuple2<String, Integer>> sum = source.flatMap(new MyFlatFunction()) .groupBy(0) .sum(1) .sortPartition(1, Order.DESCENDING); sum.print(); }}
流式计算
package top.windtown;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @author bill * @date 2021/9/19 */public class StreamWorkCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(8);// String filePath = "C:\\code\\flinkdemo\\src\\main\\resources\\txt\\panda.txt";// DataStream<String> source = env.readTextFile(filePath); DataStreamSource<String> source = env.socketTextStream("192.168.3.101", 7777); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap(new MyFlatFunction()) .keyBy(it -> it.getField(0)) .sum(1); sum.print(); env.execute(); }}