配置jar包
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.2</version>
</dependency>
数据汇总计算
package top.windtown;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* @author bill
* @date 2021/9/19
*/
public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source = env.readTextFile("C:\\code\\flinkdemo\\src\\main\\resources\\txt\\panda.txt");
DataSet<Tuple2<String, Integer>> sum = source.flatMap(new MyFlatFunction())
.groupBy(0)
.sum(1)
.sortPartition(1, Order.DESCENDING);
sum.print();
}
}
流式计算
package top.windtown;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author bill
* @date 2021/9/19
*/
public class StreamWorkCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
// String filePath = "C:\\code\\flinkdemo\\src\\main\\resources\\txt\\panda.txt";
// DataStream<String> source = env.readTextFile(filePath);
DataStreamSource<String> source = env.socketTextStream("192.168.3.101", 7777);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap(new MyFlatFunction())
.keyBy(it -> it.getField(0))
.sum(1);
sum.print();
env.execute();
}
}