1. DataStream 简介
Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。
2. Flink 程序
每个Flink程序由相同的基本部分组成,程序结构大致如下:
- 获取一个执行环境;
- 从指定数据源加载初始数据(Source);
- 指定数据相关的转换(Transform);
- 将计算结果输出到目标存储(Sink);
- 触发程序执行。
2.1 getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说, getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.2 createLocalEnvironment
返回本地执行环境。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
2.3 createRemoteEnvironment
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
3. Maven配置
亲测可用!!!
akka是用scala实现的,所以这里用到了scala实现的包
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>blink</artifactId><version>1.0-SNAPSHOT</version><properties><java.version>1.8</java.version><fastjson.version>1.2.75</fastjson.version><druid.version>1.2.5</druid.version><flink.version>1.13.1</flink.version><scala.binary.version>2.12</scala.binary.version><HikariCP.version>3.2.0</HikariCP.version><Impala.version>2.6.4</Impala.version><kafka.version>2.8.0</kafka.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId><version>${HikariCP.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.13</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build></project>
4. 示例程序
package com.nkong.blink.start;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import java.util.Properties;/*** 每5s统计一次窗口元素个数* @author nkong* @time 2022/01/25 14:38*/public class SocketInputJob {public static void main(String[] args) throws Exception {// 获取Flink执行环境// 并行度:默认是计算机的CPU逻辑核数final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从指定Socket读取数据DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new MyFlatMap()).keyBy(new MyKeySelector())// 滚动时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);dataStream.print();env.execute("SocketInputJob");}private static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String inStr, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] tokens = inStr.toLowerCase().split("\t", -1);for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<String, Integer>(token, 1));}}}}private static class MyKeySelector implements KeySelector<Tuple2<String, Integer>, Object> {@Overridepublic Object getKey(Tuple2<String, Integer> stringIntegerTuple) throws Exception {return stringIntegerTuple.f0;}}}
