环境配置
依赖
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.12.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.1.5</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
Flink基础
Source
SensorReading类
public class SensorReading {
// 属性:id,时间戳,温度值
private String id;
private Long timestamp;
private Double temperature;
}
从集合中读取数据
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 1.Source:从集合读取数据
DataStream<SensorReading> sensorDataStream = env.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
)
);
// 2.打印
sensorDataStream.print();
// 3.执行
env.execute();
}
从文件中读取
DataStream<String> dataStream = env.readTextFile("YOUR_FILE_PATH ");
从Kafka中读取
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
// 从文件读取数据
DataStream<String> dataStream = env.addSource( new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
// 打印输出
dataStream.print();
env.execute();
}
自定义Source
DataStream<SensorReading> dataStream = env.addSource( new MySensor());
class MySensor implements SourceFunction<SensorReading> {
private boolean running = true;
public void run(SourceContext<SensorReading> ctx) throws Exception {
Random random = new Random();
HashMap<String, Double> sensorTempMap = new HashMap<String, Double>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
}
while (running) {
for (String sensorId : sensorTempMap.keySet()) {
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId, newTemp);
ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(),
newTemp));
}
Thread.sleep(1000L);
}
}
public void cancel() {
this.running = false;
}
}
Transform
map
DataStream<Integer> mapStram = dataStream.map(new MapFunction<SensorReading, Integer>() {
@Override
public Integer map(SensorReading value) throws Exception {
return value.getId().length();
}
});
public static void main(String[] args) throws Exception {
//1. 获取Oracle数据
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.hostname("172.18.1.157")
.port(1521)
.database("helowin")
.schemaList("test")
.tableList("test.bqh10")
.username("test")
.password("test")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stringDataStreamSource = env.addSource(sourceFunction).setParallelism(1);
//2,格式化输出流
DataStream<String> dataStream = (DataStream<String>) stringDataStreamSource.map(line -> {
JSONObject data = JSON.parseObject(line);
String A = (String) JSON.parseObject(String.valueOf(data.get("after"))).get("A");
String B = (String) JSON.parseObject(String.valueOf(data.get("after"))).get("B");
return new BQH10(A,B).toString();
});
dataStream.print().setParallelism(1);
//写入kafka
dataStream.addSink( new FlinkKafkaProducer011<String>("hadoop102:9092", "sinktest", new SimpleStringSchema()));
env.execute();
}
flatMap
按照逗号分隔把csv文件的每一个cell提取出来
DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String,
String>() {
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields = value.split(",");
for( String field: fields )
out.collect(field);
}
});
Filter
DataStream<Interger> filterStream = dataStream.filter(new FilterFunction<String>()
{
public boolean filter(String value) throws Exception {
return value == 1;
}
});
KeyBy、Reduce
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 1.Source:从集合读取数据
DataStream<SensorReading> dataStream = env.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_2", 1547718201L, 15.4),
new SensorReading("sensor_3", 1547718202L, 6.7),
new SensorReading("sensor_2", 1547718205L, 38.1),
new SensorReading("sensor_1", 1547718206L, 38.1)
)
).setParallelism(1);
dataStream.keyBy("id").reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(),
value2.getTimestamp(),
Math.min(value1.getTemperature(), value1.getTemperature()));
}
}).print().setParallelism(1);
// 3.执行
env.execute();
}
Split、Select 分流查询
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 1.Source:从集合读取数据
DataStream<SensorReading> dataStream = env.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_2", 1547718201L, 15.4),
new SensorReading("sensor_3", 1547718202L, 6.7),
new SensorReading("sensor_2", 1547718205L, 38.1),
new SensorReading("sensor_1", 1547718206L, 38.1)
)
).setParallelism(1);
SplitStream<SensorReading> splitStream=dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading sensorReading) {
return sensorReading.getTemperature()>20? Collections.singletonList("high"):Collections.singletonList("low");
}
});
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
highTempStream.print();
// 3.执行
env.execute();
}
Connect、CoMap合并流格式化输出
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 1.Source:从集合读取数据
DataStream<SensorReading> dataStream = env.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_2", 1547718201L, 15.4),
new SensorReading("sensor_3", 1547718202L, 6.7),
new SensorReading("sensor_2", 1547718205L, 38.1),
new SensorReading("sensor_1", 1547718206L, 38.1)
)
).setParallelism(1);
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading sensorReading) {
return sensorReading.getTemperature() > 20 ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
highTempStream.print();
// 合流 connect
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
//CoMapFunction<IN1, IN2, OUT>
DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "healthy");
}
});
resultStream.print();
// 3.执行
env.execute();
}
Union
DataStream<SensorReading> unionStream = highTempStream.union(lowTempStream);
1. Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap
中再去调整成为一样的。
2. Connect 只能操作两个流,Union 可以操作多个。
UDF
Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如 MapFunction, FilterFunction, ProcessFunction 等等。
DataStream<String> flinkTweets = tweets.filter(new FlinkFilter());
public static class FlinkFilter implements FilterFunction<String> {
@Override
public boolean filter(String value) throws Exception {
return value.contains("flink");
}
}
富函数
“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都
有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一
些生命周期方法,所以可以实现更复杂的功能。
⚫ RichMapFunction
⚫ RichFlatMapFunction
⚫ RichFilterFunction
Rich Function 有一个生命周期的概念。典型的生命周期方法有:
⚫ open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter
被调用之前 open()会被调用。
⚫ close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
⚫ getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函
数执行的并行度,任务的名字,以及 state 状态
public static class MyMapFunction extends RichMapFunction<SensorReading,
Tuple2<Integer, String>> {
@Override
public Tuple2<Integer, String> map(SensorReading value) throws Exception {
return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(),
value.getId());
}
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("my map open");
// 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接
}
@Override
public void close() throws Exception {
System.out.println("my map close");
// 以下做一些清理工作,例如断开和 HDFS 的连接
}
}
Sink
kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
dataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092",
"test", new SimpleStringSchema()))
自定义sink
mysql
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
Connection conn = null;
PreparedStatement insertStmt = null;
PreparedStatement updateStmt = null;
// open 主要是创建连接
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test",
"root", "123456");
// 创建预编译器,有占位符,可传入参数
insertStmt = conn.prepareStatement("INSERT INTO sensor_temp (id, temp) VALUES
(?, ?)");
updateStmt = conn.prepareStatement("UPDATE sensor_temp SET temp = ? WHERE id
= ?");
}
// 调用连接,执行 sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 执行更新语句,注意不要留 super
updateStmt.setDouble(1, value.getTemperature());
updateStmt.setString(2, value.getId());
updateStmt.execute();
// 如果刚才 update 语句没有更新,那么插入
if (updateStmt.getUpdateCount() == 0) {
insertStmt.setString(1, value.getId());
insertStmt.setDouble(2, value.getTemperature());
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
insertStmt.close();
updateStmt.close();
conn.close();
}
}
Hbase
class HBaseSink extends RichSinkFunction<BQH10> {
private org.apache.hadoop.conf.Configuration configuration; //上下文环境
private org.apache.hadoop.hbase.client.Connection connection = null; //连接到hbase
private BufferedMutator userMutator;
private int count=0;
@Override//初始化工作,建立与数据库的连接
public void open(Configuration parameters) throws Exception {
super.open(parameters);
configuration = HBaseConfiguration.create();
// configuration.set("hbase.master", "172.18.1.114:60020");
configuration.set("hbase.zookeeper.quorum", "172.18.1.114,172.18.1.116,172.18.1.117");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
try {
connection = ConnectionFactory.createConnection(configuration);
System.out.println(connection);
} catch (IOException e) {
e.printStackTrace();
}
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("bqh10"));
params.writeBufferSize(1024 * 1024);
userMutator = connection.getBufferedMutator(params);
}
@Override
public void close() throws Exception {
super.close();
//todo 关闭连接
if (userMutator!=null) {
userMutator.flush();
userMutator.close();
}
if (connection!=null) {
connection.close();
}
}
@Override
public void invoke(BQH10 value, Context context) throws Exception {
String RowKey= value.getA();
String Key="B";
String Value = value.getB();
// System.out.println("Column Family=f1, RowKey=" + RowKey + ", Key=" + Key + " ,Value=" + Value);
Put put = new Put(RowKey.getBytes());
put.addColumn("f1".getBytes(), Key.getBytes(), Value.getBytes());
userMutator.mutate(put);
if (count >= 500){
userMutator.flush();
System.out.println("写入成功");
count = 0;
}
count = count + 1;
}
}
window
Window 可以分成两类:
➢ CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
➢ TimeWindow:按照时间生成 Window。
对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling
Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
TimeWindow
滚动窗口
DataStream<Tuple2<String, Double>> minTempPerWindowStream = dataStream
.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws
Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
})
.keyBy(data -> data.f0)
.timeWindow( Time.seconds(15) )
.minBy(1);
滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参
数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,
每一次计算的 window 范围是 15s 内的所有元素。
DataStream<SensorReading> minTempPerWindowStream = dataStream
.keyBy(SensorReading::getId)
.timeWindow( Time.seconds(15), Time.seconds(5) )
.minBy("temperature");
CountWindow
滚动窗口
DataStream<SensorReading> minTempPerWindowStream = dataStream .keyBy(SensorReading::getId) .countWindow( 5 ) .minBy("temperature");滑动窗口
DataStream<SensorReading> minTempPerWindowStream = dataStream .keyBy(SensorReading::getId) .countWindow( 10,2 ) .minBy("temperature");分组窗口
分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由 as 子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用。
Table table = input .window([w: GroupWindow] as "w") // 定义窗口,别名 w .groupBy("w, a") // 以属性 a 和窗口 w 作为分组的 key .select("a, b.sum") // 聚合字段 b 的值,求和 或者,还可以把窗口的相关信息,作为字段添加到结果表中: Table table = input .window([w: GroupWindow] as "w") .groupBy("w, a") .select("a, w.start, w.end, w.rowtime, b.count")滚动窗口
滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法:
⚫ over:定义窗口长度
⚫ on:用来分组(按时间间隔)或者排序(按行数)的时间字段
⚫ as:别名,必须出现在后面的 groupBy 中// Tumbling Event-time Window .window(Tumble.over("10.minutes").on("rowtime").as("w")) // Tumbling Processing-time Window .window(Tumble.over("10.minutes").on("proctime").as("w")) // Tumbling Row-count Window .window(Tumble.over("10.rows").on("proctime").as("w"))滑动窗口
滑动窗口(Sliding windows)要用 Slide 类来定义,另外还有四个方法:
⚫ over:定义窗口长度
⚫ every:定义滑动步长
⚫ on:用来分组(按时间间隔)或者排序(按行数)的时间字段
⚫ as:别名,必须出现在后面的 groupBy 中// Sliding Event-time Window .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w")) // Sliding Processing-time window .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w")) // Sliding Row-count window .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))会话窗口
会话窗口(Session windows)要用 Session 类来定义,另外还有三个方法:
⚫ withGap:会话时间间隔
⚫ on:用来分组(按时间间隔)或者排序(按行数)的时间字段
⚫ as:别名,必须出现在后面的 groupBy 中// Session Event-time Window .window(Session.withGap.("10.minutes").on("rowtime").as("w")) // Session Processing-time Window .window(Session.withGap.("10.minutes").on(“proctime").as("w"))
时间语义与 Wartermark
Wartermark相当于将数据延迟处理的时间
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的 日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
Ingestion Time:是数据进入 Flink 的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器
相关,默认的时间属性就是 Processing Time。
EventTime
在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。 如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所 示:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Watermark 的引入
能大致估算出数据流中的事件的最大延迟时间,可以用以下代码
//水位线插入时间1s
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.milliseconds(1000L)) {
@Override //时间里面带有时间戳
public long extractTimestamp(SensorReading element) {
return element.getTimestamp()*1000;
}
});
Event Time 的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事 件时间是什么(数据源里的数据没有时间戳的话,就只能使用 Processing Time 了)。 我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就 是分配时间戳的接口。Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<SensorReading> dataStream = env.addSource(new SensorSource())
.assignTimestampsAndWatermarks(new MyAssigner());
自定义从事件中提取时间戳
Assigner with periodic watermarks
周期性的生成 watermark:系统会周期性的将 watermark 插入到流中(水位线也
是一种特殊的事件!)。默认周期是 200 毫秒。可以使用
ExecutionConfig.setAutoWatermarkInterval()方法进行设置。
// 每隔 5 秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);
产生 watermark 的逻辑:每隔 5 秒钟,Flink 会调用
AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了
水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的 watermark。 例子,自定义一个周期性的时间戳抽取:
// 自定义周期性时间戳分配器
public static class MyPeriodicAssigner implements
AssignerWithPeriodicWatermarks<SensorReading> {
private Long bound = 60 * 1000L; // 延迟一分钟
private Long maxTs = Long.MIN_VALUE; // 当前最大时间戳
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTs - bound);
}
@Override
public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
maxTs = Math.max(maxTs, element.getTimestamp());
return element.getTimestamp();
}
}
一种简单的特殊情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以使用 AscendingTimestampExtractor,这个类会直接使用数据的时间戳生成 watermark。
DataStream<SensorReading> dataStream = …
dataStream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<SensorReading>() {
@Override
public long extractAscendingTimestamp(SensorReading element) {
return element.getTimestamp() * 1000;
}
});
Assigner with punctuated watermarks
间断式地生成 watermark。和周期性生成的方式不同,这种方式不是固定时间的,
而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给
sensor_1 的传感器的数据流插入 watermark:
public static class MyPunctuatedAssigner implements
AssignerWithPunctuatedWatermarks<SensorReading> {
private Long bound = 60 * 1000L; // 延迟一分钟
@Nullable
@Override
public Watermark checkAndGetNextWatermark(SensorReading lastElement, long
extractedTimestamp) {
if (lastElement.getId().equals("sensor_1"))
return new Watermark(extractedTimestamp - bound);
else
return null;
}
@Override
public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
return element.getTimestamp();
}
}
Table API与SQL
引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
csv环境
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
source
连接外部文件
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接
.withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
) // 定义表结构
.createTemporaryTable("inputTable"); // 创建临时表
连接Kafka
tableEnv.connect(new Kafka()
.version("0.11") // 定义 kafka 的版本
.topic("sensor") // 定义主题
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092") )
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable");
fromDataStream
DataStream<String> inputStream = env.readTextFile("sensor.txt");
DataStream<SensorReading> dataStream = inputStream
.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new
Double(fields[2]));
});
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");
创建临时视图
基于数据流
tableEnv.createTemporaryView("sensorView", dataStream); tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");基于Table
tableEnv.createTemporaryView("sensorView", sensorTable);sink
输出到文件
// 注册输出表 tableEnv.connect( new FileSystem().path("…\\resources\\out.txt") ) // 定义到文件系统的连接 .withFormat(new Csv()) // 定义格式化方法,Csv 格式 .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("temp", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("outputTable"); // 创建临时表 resultSqlTable.insertInto("outputTable");在流处理过程中,表的处理并不像传统定义的那样简单。 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行 转换。与外部系统交换的消息类型,由更新模式(update mode)指定。 Flink Table API 中的更新模式有以下三种:
1)追加模式(Append Mode)
在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。
2)撤回模式(Retract Mode)
在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。
⚫ 插入(Insert)会被编码为添加消息;
⚫ 删除(Delete)则编码为撤回消息;
⚫ 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)
的添加消息。 在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。
3)Upsert(更新插入)模式
在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息, 外部连接器需要知道这个唯一 key 的属性。
⚫ 插入(Insert)和更新(Update)都被编码为 Upsert 消息;
⚫ 删除(Delete)编码为 Delete 信息。
这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率会更高。输出到kafka
// 输出到 kafka tableEnv.connect( new Kafka() .version("0.11") .topic("sinkTest") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("temp", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable"); //resultTable为准备插入的数据 resultTable.insertInto("kafkaOutputTable");输出到Mysql
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>1.10.1</version> </dependency>// 输出到 Mysql String sinkDDL= "create table jdbcOutputTable (" + " id varchar(20) not null, " + " cnt bigint not null " + ") with (" + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://localhost:3306/test', " + " 'connector.table' = 'sensor_count', " + " 'connector.driver' = 'com.mysql.jdbc.Driver', " + " 'connector.username' = 'root', " + " 'connector.password' = '123456' )"; tableEnv.sqlUpdate(sinkDDL); // 执行 DDL 创建表 aggResultSqlTable.insertInto("jdbcOutputTable");将表转换成DataStream
表可以转换为 DataStream 或 DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了。 将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是 Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式。
Table API 中表到 DataStream 有两种模式:
⚫ 追加模式(Append Mode)
用于表只会被插入(Insert)操作更改的场景。
⚫ 撤回模式(Retract Mode)
用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。
得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底 是新增的数据(Insert),还是被删除的数据(老数据, Delete)。DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class); DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable, Row.class); resultStream.print("result"); aggResultStream.print("aggResult");所以,没有经过 groupby 之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream。
时间特性
基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。时间属性,可以是每个表 schema 的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。时间属性的行为类似于常规时间戳,可以访问,并且进行计算。
处理时间
处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概
念。它既不需要提取时间戳,也不需要生成 watermark。定义处理时间属性有三种方法:在 DataStream 转化时直接指定;在定义 Table Schema时指定;在创建表的 DDL 中指定。
1) DataStream 转化成 Table 时指定
由 DataStream 转换成表时,可以在后面指定字段名来定义 Schema。在定义 Schema 期间,可以使用.proctime,定义处理时间字段。注意,这个 proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在schema 定义的末尾定义它。// 定义好 DataStream DataStream<String> inputStream = env.readTextFile("\\sensor.txt") DataStream<SensorReading> dataStream = inputStream .map( line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); } ); // 将 DataStream 转换为 Table,并指定时间字段 Table sensorTable = tableEnv.fromDataStream(dataStream, "id, temperature, timestamp, pt.proctime");2) 定义 Table Schema 时指定
这种方法其实也很简单,只要在定义 Schema 的时候,加上一个新的字段,并指定成
proctime 就可以了。代码如下:tableEnv.connect( new FileSystem().path("..\\sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) .field("pt", DataTypes.TIMESTAMP(3)) .proctime() // 指定 pt 字段为处理时间 ) // 定义表结构 .createTemporaryTable("inputTable"); // 创建临时表3) 创建表的 DDL 中指定
在创建表的 DDL 中,增加一个字段并指定成 proctime,也可以指定当前的时间字段。代码如下:String sinkDDL = "create table dataTable (" + " id varchar(20) not null, " + " ts bigint, " + " temperature double, " + " pt AS PROCTIME() " + ") with (" + " 'connector.type' = 'filesystem', " + " 'connector.path' = '/sensor.txt', " + " 'format.type' = 'csv')"; tableEnv.sqlUpdate(sinkDDL);事件时间
事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱
序事件或者延迟事件时,也可以获得正确的结果。为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。
1) DataStream 转化成 Table 时指定
在DataStream转换成Table,schema的定义期间,使用.rowtime可以定义事件时间属性。 注意,必须在转换的数据流中分配时间戳和 watermark。 在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime 字段名是否存在于数据流的架构中,timestamp 字段可以:
⚫ 作为新字段追加到 schema
⚫ 替换现有字段 ,在这两种情况下,定义的事件时间戳字段,都将保存 DataStream 中事件时间戳的值。DataStream<String> inputStream = env.readTextFile("\\sensor.txt") DataStream<SensorReading> dataStream = inputStream .map( line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); } ) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp() * 1000L; } }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");2) 定义 Table Schema 时指定
这种方法只要在定义 Schema 的时候,将事件时间字段,并指定成 rowtime 就可以了。 代码如下:tableEnv.connect( new FileSystem().path("sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .rowtime( new Rowtime() .timestampsFromField("timestamp") // 从字段中提取时间戳 .watermarksPeriodicBounded(1000) // watermark 延迟 1 秒 ) .field("temperature", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("inputTable"); // 创建临时表3) 创建表的 DDL 中指定
事件时间属性,是使用 CREATE TABLE DDL 中的 WARDMARK 语句定义的。watermark 语句,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事件时间属性。代码如下:String sinkDDL = "create table dataTable (" + " id varchar(20) not null, " + " ts bigint, " + " temperature double, " + " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " + " watermark for rt as rt - interval '1' second" + ") with (" + " 'connector.type' = 'filesystem', " + " 'connector.path' = '/sensor.txt', " + " 'format.type' = 'csv')"; tableEnv.sqlUpdate(sinkDDL);
