1. DataStream 简介

Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。

2. Flink 程序

每个Flink程序由相同的基本部分组成,程序结构大致如下:

  1. 获取一个执行环境;
  2. 从指定数据源加载初始数据(Source);
  3. 指定数据相关的转换(Transform);
  4. 将计算结果输出到目标存储(Sink);
  5. 触发程序执行。

image.png

2.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说, getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2.2 createLocalEnvironment

返回本地执行环境。

  1. LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

2.3 createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);

3. Maven配置

亲测可用!!!

akka是用scala实现的,所以这里用到了scala实现的包

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>blink</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <java.version>1.8</java.version>
  11. <fastjson.version>1.2.75</fastjson.version>
  12. <druid.version>1.2.5</druid.version>
  13. <flink.version>1.13.1</flink.version>
  14. <scala.binary.version>2.12</scala.binary.version>
  15. <HikariCP.version>3.2.0</HikariCP.version>
  16. <Impala.version>2.6.4</Impala.version>
  17. <kafka.version>2.8.0</kafka.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-java</artifactId>
  23. <version>${flink.version}</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.flink</groupId>
  27. <artifactId>flink-streaming-java_2.11</artifactId>
  28. <version>${flink.version}</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.flink</groupId>
  32. <artifactId>flink-scala_${scala.binary.version}</artifactId>
  33. <version>${flink.version}</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
  38. <version>${flink.version}</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.apache.flink</groupId>
  42. <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  43. <version>${flink.version}</version>
  44. </dependency>
  45. <dependency>
  46. <groupId>org.apache.flink</groupId>
  47. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  48. <version>${flink.version}</version>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.apache.flink</groupId>
  52. <artifactId>flink-table-common</artifactId>
  53. <version>${flink.version}</version>
  54. </dependency>
  55. <dependency>
  56. <groupId>org.apache.flink</groupId>
  57. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  58. <version>${flink.version}</version>
  59. </dependency>
  60. <dependency>
  61. <groupId>org.apache.flink</groupId>
  62. <artifactId>flink-clients_2.11</artifactId>
  63. <version>${flink.version}</version>
  64. </dependency>
  65. <dependency>
  66. <groupId>org.apache.flink</groupId>
  67. <artifactId>flink-table-planner-blink_2.12</artifactId>
  68. <version>${flink.version}</version>
  69. </dependency>
  70. <dependency>
  71. <groupId>org.apache.flink</groupId>
  72. <artifactId>flink-json</artifactId>
  73. <version>${flink.version}</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>com.alibaba.ververica</groupId>
  77. <artifactId>flink-connector-mysql-cdc</artifactId>
  78. <version>1.4.0</version>
  79. </dependency>
  80. <dependency>
  81. <groupId>org.apache.flink</groupId>
  82. <artifactId>flink-runtime_2.11</artifactId>
  83. <version>${flink.version}</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.apache.flink</groupId>
  87. <artifactId>flink-connector-kafka_2.11</artifactId>
  88. <version>${flink.version}</version>
  89. </dependency>
  90. <dependency>
  91. <groupId>org.apache.flink</groupId>
  92. <artifactId>flink-sql-connector-kafka_2.11</artifactId>
  93. <version>${flink.version}</version>
  94. </dependency>
  95. <dependency>
  96. <groupId>org.apache.flink</groupId>
  97. <artifactId>flink-connector-jdbc_2.11</artifactId>
  98. <version>${flink.version}</version>
  99. </dependency>
  100. <dependency>
  101. <groupId>com.zaxxer</groupId>
  102. <artifactId>HikariCP</artifactId>
  103. <version>${HikariCP.version}</version>
  104. </dependency>
  105. <dependency>
  106. <groupId>mysql</groupId>
  107. <artifactId>mysql-connector-java</artifactId>
  108. <version>8.0.13</version>
  109. </dependency>
  110. <dependency>
  111. <groupId>org.slf4j</groupId>
  112. <artifactId>slf4j-api</artifactId>
  113. <version>1.7.25</version>
  114. </dependency>
  115. <dependency>
  116. <groupId>org.slf4j</groupId>
  117. <artifactId>slf4j-log4j12</artifactId>
  118. <version>1.7.25</version>
  119. </dependency>
  120. <dependency>
  121. <groupId>com.google.code.gson</groupId>
  122. <artifactId>gson</artifactId>
  123. <version>2.8.2</version>
  124. </dependency>
  125. </dependencies>
  126. <build>
  127. <plugins>
  128. <plugin>
  129. <groupId>org.apache.maven.plugins</groupId>
  130. <artifactId>maven-assembly-plugin</artifactId>
  131. <version>3.1.0</version>
  132. <configuration>
  133. <descriptorRefs>
  134. <descriptorRef>jar-with-dependencies</descriptorRef>
  135. </descriptorRefs>
  136. </configuration>
  137. <executions>
  138. <execution>
  139. <id>make-assembly</id>
  140. <phase>package</phase>
  141. <goals>
  142. <goal>single</goal>
  143. </goals>
  144. </execution>
  145. </executions>
  146. </plugin>
  147. <plugin>
  148. <groupId>org.apache.maven.plugins</groupId>
  149. <artifactId>maven-compiler-plugin</artifactId>
  150. <configuration>
  151. <source>8</source>
  152. <target>8</target>
  153. </configuration>
  154. </plugin>
  155. </plugins>
  156. </build>
  157. </project>

4. 示例程序

  1. package com.nkong.blink.start;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.api.java.functions.KeySelector;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  9. import org.apache.flink.streaming.api.windowing.time.Time;
  10. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  11. import org.apache.flink.util.Collector;
  12. import java.util.Properties;
  13. /**
  14. * 每5s统计一次窗口元素个数
  15. * @author nkong
  16. * @time 2022/01/25 14:38
  17. */
  18. public class SocketInputJob {
  19. public static void main(String[] args) throws Exception {
  20. // 获取Flink执行环境
  21. // 并行度:默认是计算机的CPU逻辑核数
  22. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. // 从指定Socket读取数据
  24. DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
  25. DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new MyFlatMap())
  26. .keyBy(new MyKeySelector())
  27. // 滚动时间窗口
  28. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  29. .sum(1);
  30. dataStream.print();
  31. env.execute("SocketInputJob");
  32. }
  33. private static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
  34. @Override
  35. public void flatMap(String inStr, Collector<Tuple2<String, Integer>> collector) throws Exception {
  36. String[] tokens = inStr.toLowerCase().split("\t", -1);
  37. for (String token : tokens) {
  38. if (token.length() > 0) {
  39. collector.collect(new Tuple2<String, Integer>(token, 1));
  40. }
  41. }
  42. }
  43. }
  44. private static class MyKeySelector implements KeySelector<Tuple2<String, Integer>, Object> {
  45. @Override
  46. public Object getKey(Tuple2<String, Integer> stringIntegerTuple) throws Exception {
  47. return stringIntegerTuple.f0;
  48. }
  49. }
  50. }