依赖

  1. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.15.0</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-clients</artifactId>
  11. <version>1.15.0</version>
  12. </dependency>
  13. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-streaming-java</artifactId>
  17. <version>1.15.0</version>
  18. </dependency>
  19. <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
  20. <dependency>
  21. <groupId>commons-io</groupId>
  22. <artifactId>commons-io</artifactId>
  23. <version>2.11.0</version>
  24. </dependency>
  25. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  26. <dependency>
  27. <groupId>mysql</groupId>
  28. <artifactId>mysql-connector-java</artifactId>
  29. <version>8.0.29</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>com.alibaba</groupId>
  33. <artifactId>fastjson</artifactId>
  34. <version>1.2.73</version>
  35. </dependency>

批处理

  1. hello word
  2. hello count
  3. abc 123
  4. 212 abc
  5. hello scala
  6. hello java
  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.util.Collector;
  6. /**
  7. * 单词统计(批数据处理)
  8. */
  9. public class WordCount {
  10. public static void main(String[] args) throws Exception {
  11. // 输入路径和出入路径通过参数传入,约定第一个参数为输入路径,第二个参数为输出路径
  12. String inPath = "/Users/xiajiandong/Desktop/abc.txt";
  13. //文件先不要创建
  14. String outPath = "/Users/xjd/Desktop/study/java/simpleproject/src/main/resources/res.txt";
  15. deleteIfExists(new File(outPath));
  16. // 获取Flink批处理执行环境
  17. ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
  18. // 获取文件中内容
  19. DataSet<String> text = executionEnvironment.readTextFile(inPath);
  20. // 对数据进行处理, flatMap每行元素做一个操作
  21. DataSet<Tuple2<String, Integer>> dataSet = text.flatMap(new LineSplitter())
  22. .groupBy(0) //按照第一个位置的word分组
  23. .sum(1); //按照第二个位置上的数据求和
  24. //控制台打印
  25. dataSet.print();
  26. dataSet.writeAsCsv(outPath, "\n", "").setParallelism(1);
  27. // 触发执行程序
  28. executionEnvironment.execute("wordcount batch process");
  29. }
  30. //FlatMapFunction里面的 String就是每行的输入类型, Tuple2就是map 输出类型
  31. static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  32. @Override
  33. public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
  34. //把文本打散为一个个单词
  35. for (String word : line.split(" ")) { //hello world => (hello, 1), (world, 1)
  36. collector.collect(new Tuple2<>(word, 1));
  37. }
  38. }
  39. }
  40. /**
  41. * 删除文件或文件夹
  42. */
  43. public static void deleteIfExists(File file) throws IOException {
  44. if (file.exists()) {
  45. if (file.isFile()) {
  46. if (!file.delete()) {
  47. throw new IOException("Delete file failure,path:" + file.getAbsolutePath());
  48. }
  49. } else {
  50. File[] files = file.listFiles();
  51. if (files != null && files.length > 0) {
  52. for (File temp : files) {
  53. deleteIfExists(temp);
  54. }
  55. }
  56. if (!file.delete()) {
  57. throw new IOException("Delete file failure,path:" + file.getAbsolutePath());
  58. }
  59. }
  60. }
  61. }
  62. }

流处理

nc模拟, 要先创建这个

  1. # 连接到本地的7777端口, 然后不断的发数据
  2. nc -lk 7777
  3. hello
  4. hi
  5. ge
  6. world
  7. hi
  8. hi
  9. word
  10. hello
  11. hello
  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.util.Collector;
  7. public class WordCountStream {
  8. public static void main(String[] args) throws Exception {
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. //获取流式数据
  11. DataStreamSource<String> dataStreamSource = env.socketTextStream("127.0.0.1", 7777);
  12. SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  13. @Override
  14. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  15. for (String word : s.split(" ")) {
  16. collector.collect(Tuple2.of(word, 1));
  17. }
  18. }
  19. });
  20. //key用第一个字段
  21. SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
  22. //打印到控制台
  23. result.print();
  24. env.execute("wordcount stream process");
  25. }
  26. }

输出, 前面编号是并行执行的线程编号

  1. 5> (world,1)
  2. 3> (hello,1)
  3. 1> (scala,1)
  4. 3> (hello,2)
  5. 3> (hello,3)
  6. 2> (java,1)
  7. 3> (hello,4)
  8. 5> (world,2)

设置并行度

可以设置并行度

  1. // 环境那设置
  2. env.setParallelism(8);

参数外部提取

image.png

  1. // 用parameter tool工具从程序启动参数中提取配置项
  2. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  3. String host = parameterTool.get("host");
  4. int port = parameterTool.getInt("port");
  5. //获取流式数据
  6. DataStreamSource<String> dataStreamSource = env.socketTextStream(host, port);