常见的维表Join方式有四种:

  • 预加载维表
  • 热存储维表
  • 广播维表
  • Temporal table function join

下面分别使用这四种方式来实现一个join的需求,这个需求是:
一个主流中数据是用户信息,字段包括用户姓名、城市id;
维表是城市数据,字段包括城市ID、城市名称。
要求用户表与城市表关联,输出为:用户名称、城市ID、城市名称。
用户表表结构如下:
Flink维表Join实践 - 图1
城市维表表结构如下:
Flink维表Join实践 - 图2

1. 预加载维表

通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在probe流map()方法中与维表数据进行关联。RichMapFunction中open方法里加载维表数据到内存的方式特点如下:

优点:实现简单。 缺点:因为数据存于内存,所以只适合小数据量并且维表数据更新频率不高的情况下。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况。

示例:

  1. package join;
  2. import org.apache.flink.api.common.functions.RichMapFunction;
  3. import org.apache.flink.api.common.typeinfo.TypeHint;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.api.java.tuple.Tuple3;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * 这个例子是从socket中读取的流,数据为用户名称和城市id,维表是城市id、城市名称,
  13. * 主流和维表关联,得到用户名称、城市id、城市名称
  14. * 这个例子采用在RichMapfunction类的open方法中将维表数据加载到内存
  15. **/
  16. public class JoinDemo1 {
  17. public static void main(String[] args) throws Exception {
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
  20. .map(p -> {
  21. //输入格式为:user,1000,分别是用户名称和城市编号
  22. String[] list = p.split(",");
  23. return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
  24. })
  25. .returns(new TypeHint<Tuple2<String, Integer>>() {
  26. });
  27. DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
  28. result.print();
  29. env.execute("joinDemo1");
  30. }
  31. static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
  32. //定义一个变量,用于保存维表数据在内存
  33. Map<Integer, String> dim;
  34. @Override
  35. public void open(Configuration parameters) throws Exception {
  36. //在open方法中读取维表数据,可以从数据中读取、文件中读取、接口中读取等等。
  37. dim = new HashMap<>();
  38. dim.put(1001, "beijing");
  39. dim.put(1002, "shanghai");
  40. dim.put(1003, "wuhan");
  41. dim.put(1004, "changsha");
  42. }
  43. @Override
  44. public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
  45. //在map方法中进行主流和维表的关联
  46. String cityName = "";
  47. if (dim.containsKey(value.f1)) {
  48. cityName = dim.get(value.f1);
  49. }
  50. return new Tuple3<>(value.f0, value.f1, cityName);
  51. }
  52. }
  53. }

2. 热存储维表

这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:

优点:维度数据量不受内存限制,可以存储很大的数据量。 缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。

2.1 使用cache来减轻访问压力

可以使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用guava Cache。
下面是一个例子:

  1. package join;
  2. import com.google.common.cache.*;
  3. import org.apache.flink.api.common.functions.RichMapFunction;
  4. import org.apache.flink.api.common.typeinfo.TypeHint;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.api.java.tuple.Tuple3;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. import java.util.concurrent.TimeUnit;
  13. public class JoinDemo2 {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
  17. .map(p -> {
  18. //输入格式为:user,1000,分别是用户名称和城市编号
  19. String[] list = p.split(",");
  20. return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
  21. })
  22. .returns(new TypeHint<Tuple2<String, Integer>>() {
  23. });
  24. DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
  25. result.print();
  26. env.execute("joinDemo1");
  27. }
  28. static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
  29. LoadingCache<Integer, String> dim;
  30. @Override
  31. public void open(Configuration parameters) throws Exception {
  32. //使用google LoadingCache来进行缓存
  33. dim = CacheBuilder.newBuilder()
  34. //最多缓存个数,超过了就根据最近最少使用算法来移除缓存
  35. .maximumSize(1000)
  36. //在更新后的指定时间后就回收
  37. .expireAfterWrite(10, TimeUnit.MINUTES)
  38. //指定移除通知
  39. .removalListener(new RemovalListener<Integer, String>() {
  40. @Override
  41. public void onRemoval(RemovalNotification<Integer, String> removalNotification) {
  42. System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
  43. }
  44. })
  45. .build(
  46. //指定加载缓存的逻辑
  47. new CacheLoader<Integer, String>() {
  48. @Override
  49. public String load(Integer cityId) throws Exception {
  50. String cityName = readFromHbase(cityId);
  51. return cityName;
  52. }
  53. }
  54. );
  55. }
  56. private String readFromHbase(Integer cityId) {
  57. //读取hbase
  58. //这里写死,模拟从hbase读取数据
  59. Map<Integer, String> temp = new HashMap<>();
  60. temp.put(1001, "beijing");
  61. temp.put(1002, "shanghai");
  62. temp.put(1003, "wuhan");
  63. temp.put(1004, "changsha");
  64. String cityName = "";
  65. if (temp.containsKey(cityId)) {
  66. cityName = temp.get(cityId);
  67. }
  68. return cityName;
  69. }
  70. @Override
  71. public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
  72. //在map方法中进行主流和维表的关联
  73. String cityName = "";
  74. if (dim.get(value.f1) != null) {
  75. cityName = dim.get(value.f1);
  76. }
  77. return new Tuple3<>(value.f0, value.f1, cityName);
  78. }
  79. }
  80. }

2.2 使用异步IO来提高访问吞吐量

Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。
Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,不过目前很多系统都支持异步IO客户端。但是如果使用异步就要涉及到三个问题:

  • 超时:如果查询超时那么就认为是读写失败,需要按失败处理;
  • 并发数量:如果并发数量太多,就要触发Flink的反压机制来抑制上游的写入;
  • 返回顺序错乱:顺序错乱了要根据实际情况来处理,Flink支持两种方式:允许乱序、保证顺序。

Flink维表Join实践 - 图3
下面是一个实例,演示了试用异步IO来访问维表:

  1. package join;
  2. import org.apache.flink.api.common.typeinfo.TypeHint;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.api.java.tuple.Tuple3;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.datastream.AsyncDataStream;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.async.ResultFuture;
  10. import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
  11. import java.sql.DriverManager;
  12. import java.sql.PreparedStatement;
  13. import java.sql.ResultSet;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. import java.util.concurrent.TimeUnit;
  17. public class JoinDemo3 {
  18. public static void main(String[] args) throws Exception {
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
  21. .map(p -> {
  22. //输入格式为:user,1000,分别是用户名称和城市编号
  23. String[] list = p.split(",");
  24. return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
  25. })
  26. .returns(new TypeHint<Tuple2<String, Integer>>() {
  27. });
  28. DataStream<Tuple3<String,Integer, String>> orderedResult = AsyncDataStream
  29. //保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压
  30. .orderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
  31. .setParallelism(1);
  32. DataStream<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream
  33. //允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压
  34. .unorderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
  35. .setParallelism(1);
  36. orderedResult.print();
  37. unorderedResult.print();
  38. env.execute("joinDemo");
  39. }
  40. //定义个类,继承RichAsyncFunction,实现异步查询存储在mysql里的维表
  41. //输入用户名、城市ID,返回 Tuple3<用户名、城市ID,城市名称>
  42. static class JoinDemo3AyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
  43. // 链接
  44. private static String jdbcUrl = "jdbc:mysql://192.168.145.1:3306?useSSL=false";
  45. private static String username = "root";
  46. private static String password = "123";
  47. private static String driverName = "com.mysql.jdbc.Driver";
  48. java.sql.Connection conn;
  49. PreparedStatement ps;
  50. @Override
  51. public void open(Configuration parameters) throws Exception {
  52. super.open(parameters);
  53. Class.forName(driverName);
  54. conn = DriverManager.getConnection(jdbcUrl, username, password);
  55. ps = conn.prepareStatement("select city_name from tmp.city_info where id = ?");
  56. }
  57. @Override
  58. public void close() throws Exception {
  59. super.close();
  60. conn.close();
  61. }
  62. //异步查询方法
  63. @Override
  64. public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
  65. // 使用 city id 查询
  66. ps.setInt(1, input.f1);
  67. ResultSet rs = ps.executeQuery();
  68. String cityName = null;
  69. if (rs.next()) {
  70. cityName = rs.getString(1);
  71. }
  72. List list = new ArrayList<Tuple2<Integer, String>>();
  73. list.add(new Tuple3<>(input.f0,input.f1, cityName));
  74. resultFuture.complete(list);
  75. }
  76. //超时处理
  77. @Override
  78. public void timeout(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
  79. List list = new ArrayList<Tuple2<Integer, String>>();
  80. list.add(new Tuple3<>(input.f0,input.f1, ""));
  81. resultFuture.complete(list);
  82. }
  83. }
  84. }

3. 广播维表

利用Flink的Broadcast State将维度数据流广播到下游做join操作。

优点:维度数据变更后可以即时更新到结果中。 缺点:数据保存在内存中,支持的维度数据量比较小。

示例:

  1. package join;
  2. import org.apache.flink.api.common.functions.RichMapFunction;
  3. import org.apache.flink.api.common.state.BroadcastState;
  4. import org.apache.flink.api.common.state.MapStateDescriptor;
  5. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
  6. import org.apache.flink.api.common.typeinfo.TypeHint;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.configuration.Configuration;
  10. import org.apache.flink.streaming.api.datastream.BroadcastStream;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
  14. import org.apache.flink.util.Collector;
  15. import java.util.ArrayList;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.Map;
  19. /**
  20. * 这个例子是从socket中读取的流,数据为用户名称和城市id,维表是城市id、城市名称,
  21. * 主流和维表关联,得到用户名称、城市id、城市名称
  22. * 这个例子采用 Flink 广播流的方式来做为维度
  23. **/
  24. public class JoinDemo4 {
  25. public static void main(String[] args) throws Exception {
  26. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  27. //定义主流
  28. DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
  29. .map(p -> {
  30. //输入格式为:user,1000,分别是用户名称和城市编号
  31. String[] list = p.split(",");
  32. return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
  33. })
  34. .returns(new TypeHint<Tuple2<String, Integer>>() {
  35. });
  36. //定义城市流
  37. DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n")
  38. .map(p -> {
  39. //输入格式为:城市ID,城市名称
  40. String[] list = p.split(",");
  41. return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
  42. })
  43. .returns(new TypeHint<Tuple2<Integer, String>>() {
  44. });
  45. //将城市流定义为广播流
  46. final MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor("broad1", Integer.class, String.class);
  47. BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc);
  48. DataStream result = textStream.connect(broadcastStream)
  49. .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() {
  50. //处理非广播流,关联维度
  51. @Override
  52. public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
  53. ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc);
  54. String cityName = "";
  55. if (state.contains(value.f1)) {
  56. cityName = state.get(value.f1);
  57. }
  58. out.collect(new Tuple3<>(value.f0, value.f1, cityName));
  59. }
  60. @Override
  61. public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
  62. System.out.println("收到广播数据:" + value);
  63. ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1);
  64. }
  65. });
  66. result.print();
  67. env.execute("joinDemo");
  68. }
  69. }

4. Temporal table function join

Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,传递一个时间参数,返回Temporal table这一指定时刻的视图。可以将维度数据流映射为Temporal table,主流与这个Temporal table进行关联,可以关联到某一个版本(历史上某一个时刻)的维度数据。

优点:维度数据量可以很大,维度数据更新及时,不依赖外部存储,可以关联不同版本的维度数据。 缺点:只支持在Flink SQL API中使用。

4.1 ProcessingTime示例
  1. package join;
  2. import org.apache.flink.api.common.typeinfo.TypeHint;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.table.api.EnvironmentSettings;
  7. import org.apache.flink.table.api.Table;
  8. import org.apache.flink.table.api.java.StreamTableEnvironment;
  9. import org.apache.flink.table.functions.TemporalTableFunction;
  10. import org.apache.flink.types.Row;
  11. public class JoinDemo5 {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  15. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
  16. //定义主流
  17. DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
  18. .map(p -> {
  19. //输入格式为:user,1000,分别是用户名称和城市编号
  20. String[] list = p.split(",");
  21. return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
  22. })
  23. .returns(new TypeHint<Tuple2<String, Integer>>() {
  24. });
  25. //定义城市流
  26. DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n")
  27. .map(p -> {
  28. //输入格式为:城市ID,城市名称
  29. String[] list = p.split(",");
  30. return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
  31. })
  32. .returns(new TypeHint<Tuple2<Integer, String>>() {
  33. });
  34. //转变为Table
  35. Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ps.proctime");
  36. Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ps.proctime");
  37. //定义一个TemporalTableFunction
  38. TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ps", "city_id");
  39. //注册表函数
  40. tableEnv.registerFunction("dimCity", dimCity);
  41. //关联查询
  42. Table result = tableEnv
  43. .sqlQuery("select u.user_name,u.city_id,d.city_name from " + userTable + " as u " +
  44. ", Lateral table (dimCity(u.ps)) d " +
  45. "where u.city_id=d.city_id");
  46. //打印输出
  47. DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
  48. resultDs.print();
  49. env.execute("joinDemo");
  50. }
  51. }

4.2 EventTime示例
  1. package join;
  2. import org.apache.flink.api.java.tuple.Tuple3;
  3. import org.apache.flink.streaming.api.TimeCharacteristic;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
  7. import org.apache.flink.streaming.api.windowing.time.Time;
  8. import org.apache.flink.table.api.EnvironmentSettings;
  9. import org.apache.flink.table.api.Table;
  10. import org.apache.flink.table.api.java.StreamTableEnvironment;
  11. import org.apache.flink.table.functions.TemporalTableFunction;
  12. import org.apache.flink.types.Row;
  13. import java.sql.Timestamp;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. public class JoinDemo9 {
  17. public static void main(String[] args) throws Exception {
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. //指定是EventTime
  20. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  21. EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  22. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
  23. env.setParallelism(1);
  24. //主流,用户流, 格式为:user_name、city_id、ts
  25. List<Tuple3<String, Integer, Long>> list1 = new ArrayList<>();
  26. list1.add(new Tuple3<>("user1", 1001, 1L));
  27. list1.add(new Tuple3<>("user1", 1001, 10L));
  28. list1.add(new Tuple3<>("user2", 1002, 2L));
  29. list1.add(new Tuple3<>("user2", 1002, 15L));
  30. DataStream<Tuple3<String, Integer, Long>> textStream = env.fromCollection(list1)
  31. .assignTimestampsAndWatermarks(
  32. //指定水位线、时间戳
  33. new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.seconds(10)) {
  34. @Override
  35. public long extractTimestamp(Tuple3<String, Integer, Long> element) {
  36. return element.f2;
  37. }
  38. }
  39. );
  40. //定义城市流,格式为:city_id、city_name、ts
  41. List<Tuple3<Integer, String, Long>> list2 = new ArrayList<>();
  42. list2.add(new Tuple3<>(1001, "beijing", 1L));
  43. list2.add(new Tuple3<>(1001, "beijing2", 10L));
  44. list2.add(new Tuple3<>(1002, "shanghai", 1L));
  45. list2.add(new Tuple3<>(1002, "shanghai2", 5L));
  46. DataStream<Tuple3<Integer, String, Long>> cityStream = env.fromCollection(list2)
  47. .assignTimestampsAndWatermarks(
  48. //指定水位线、时间戳
  49. new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, String, Long>>(Time.seconds(10)) {
  50. @Override
  51. public long extractTimestamp(Tuple3<Integer, String, Long> element) {
  52. return element.f2;
  53. }
  54. });
  55. //转变为Table
  56. Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ts.rowtime");
  57. Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ts.rowtime");
  58. tableEnv.createTemporaryView("userTable", userTable);
  59. tableEnv.createTemporaryView("cityTable", cityTable);
  60. //定义一个TemporalTableFunction
  61. TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts", "city_id");
  62. //注册表函数
  63. tableEnv.registerFunction("dimCity", dimCity);
  64. //关联查询
  65. Table result = tableEnv
  66. .sqlQuery("select u.user_name,u.city_id,d.city_name,u.ts from userTable as u " +
  67. ", Lateral table (dimCity(u.ts)) d " +
  68. "where u.city_id=d.city_id");
  69. //打印输出
  70. DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
  71. resultDs.print();
  72. env.execute("joinDemo");
  73. }
  74. }

结果输出为:

  1. user1,1001,beijing,1970-01-01T00:00:00.001
  2. user1,1001,beijing2,1970-01-01T00:00:00.010
  3. user2,1002,shanghai,1970-01-01T00:00:00.002
  4. user2,1002,shanghai2,1970-01-01T00:00:00.015

通过结果可以看到,根据主流中的EventTime的时间,去维表流中取响应时间版本的数据。

4.3 Kafka Source的EventTime实例
  1. package join.temporaltablefunctionjoin;
  2. import lombok.Data;
  3. import org.apache.flink.streaming.api.TimeCharacteristic;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
  7. import org.apache.flink.streaming.api.windowing.time.Time;
  8. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  9. import org.apache.flink.table.api.EnvironmentSettings;
  10. import org.apache.flink.table.api.Table;
  11. import org.apache.flink.table.api.java.StreamTableEnvironment;
  12. import org.apache.flink.table.functions.TemporalTableFunction;
  13. import org.apache.flink.types.Row;
  14. import java.io.Serializable;
  15. import java.util.Properties;
  16. public class JoinDemo10 {
  17. public static void main(String[] args) throws Exception {
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. //指定是EventTime
  20. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  21. EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  22. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
  23. env.setParallelism(1);
  24. //Kafka的ip和要消费的topic,//Kafka设置
  25. String kafkaIPs = "192.168.***.**1:9092,192.168.***.**2:9092,192.168.***.**3:9092";
  26. Properties props = new Properties();
  27. props.setProperty("bootstrap.servers", kafkaIPs);
  28. props.setProperty("group.id", "group.cyb.2");
  29. //读取用户信息Kafka
  30. FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(), props);
  31. userConsumer.setStartFromEarliest();
  32. userConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserInfo>(Time.seconds(0)) {
  33. @Override
  34. public long extractTimestamp(UserInfo userInfo) {
  35. return userInfo.getTs();
  36. }
  37. });
  38. //读取城市维度信息Kafka
  39. FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props);
  40. cityConsumer.setStartFromEarliest();
  41. cityConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CityInfo>(Time.seconds(0)) {
  42. @Override
  43. public long extractTimestamp(CityInfo cityInfo) {
  44. return cityInfo.getTs();
  45. }
  46. });
  47. //主流,用户流, 格式为:user_name、city_id、ts
  48. Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer),"userName,cityId,ts.rowtime" );
  49. //定义城市维度流,格式为:city_id、city_name、ts
  50. Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer),"cityId,cityName,ts.rowtime");
  51. tableEnv.createTemporaryView("userTable", userTable);
  52. tableEnv.createTemporaryView("cityTable", cityTable);
  53. //定义一个TemporalTableFunction
  54. TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts", "cityId");
  55. //注册表函数
  56. tableEnv.registerFunction("dimCity", dimCity);
  57. Table u = tableEnv.sqlQuery("select * from userTable");
  58. u.printSchema();
  59. tableEnv.toAppendStream(u, Row.class).print("用户流接收到:");
  60. Table c = tableEnv.sqlQuery("select * from cityTable");
  61. c.printSchema();
  62. tableEnv.toAppendStream(c, Row.class).print("城市流接收到:");
  63. //关联查询
  64. Table result = tableEnv
  65. .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
  66. "from userTable as u " +
  67. ", Lateral table (dimCity(u.ts)) d " +
  68. "where u.cityId=d.cityId");
  69. //打印输出
  70. DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
  71. resultDs.print("\t\t关联输出:");
  72. env.execute("joinDemo");
  73. }
  74. }
  1. package join.temporaltablefunctionjoin;
  2. import java.io.Serializable;
  3. @Data
  4. public class UserInfo implements Serializable {
  5. private String userName;
  6. private Integer cityId;
  7. private Long ts;
  8. }
  1. package join.temporaltablefunctionjoin;
  2. import java.io.Serializable;
  3. @Data
  4. public class CityInfo implements Serializable {
  5. private Integer cityId;
  6. private String cityName;
  7. private Long ts;
  8. }
  1. package join.temporaltablefunctionjoin;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.TypeReference;
  4. import org.apache.flink.api.common.typeinfo.TypeHint;
  5. import org.apache.flink.api.common.typeinfo.TypeInformation;
  6. import org.apache.flink.api.common.serialization.DeserializationSchema;
  7. import java.io.IOException;
  8. import java.nio.charset.StandardCharsets;
  9. public class UserInfoSchema implements DeserializationSchema<UserInfo> {
  10. @Override
  11. public UserInfo deserialize(byte[] message) throws IOException {
  12. String jsonStr = new String(message, StandardCharsets.UTF_8);
  13. UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {});
  14. return data;
  15. }
  16. @Override
  17. public boolean isEndOfStream(UserInfo nextElement) {
  18. return false;
  19. }
  20. @Override
  21. public TypeInformation<UserInfo> getProducedType() {
  22. return TypeInformation.of(new TypeHint<UserInfo>() {
  23. });
  24. }
  25. }
  1. package join.temporaltablefunctionjoin;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.TypeReference;
  4. import org.apache.flink.api.common.serialization.DeserializationSchema;
  5. import org.apache.flink.api.common.typeinfo.TypeHint;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import java.io.IOException;
  8. import java.nio.charset.StandardCharsets;
  9. public class CityInfoSchema implements DeserializationSchema<CityInfo> {
  10. @Override
  11. public CityInfo deserialize(byte[] message) throws IOException {
  12. String jsonStr = new String(message, StandardCharsets.UTF_8);
  13. CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {});
  14. return data;
  15. }
  16. @Override
  17. public boolean isEndOfStream(CityInfo nextElement) {
  18. return false;
  19. }
  20. @Override
  21. public TypeInformation<CityInfo> getProducedType() {
  22. return TypeInformation.of(new TypeHint<CityInfo>() {
  23. });
  24. }
  25. }

依次向user和city两个topic中写入数据:

  1. 用户信息格式:{“userName”:“user1”,“cityId”:1,“ts”:11}
  2. 城市维度格式:{“cityId”:1,“cityName”:“nanjing”,“ts”:15}

测试得到的输出如下:

  1. 城市流接收到:> 1,beijing,1970-01-01T00:00
  2. 用户流接收到:> user1,1,1970-01-01T00:00
  3. 关联输出:> user1,1,beijing,1970-01-01T00:00
  4. 城市流接收到:> 1,shanghai,1970-01-01T00:00:00.005
  5. 用户流接收到:> user1,1,1970-01-01T00:00:00.001
  6. 关联输出:> user1,1,beijing,1970-01-01T00:00:00.001
  7. 用户流接收到:> user1,1,1970-01-01T00:00:00.004
  8. 关联输出:> user1,1,beijing,1970-01-01T00:00:00.004
  9. 用户流接收到:> user1,1,1970-01-01T00:00:00.005
  10. 关联输出:> user1,1,shanghai,1970-01-01T00:00:00.005
  11. 用户流接收到:> user1,1,1970-01-01T00:00:00.007
  12. 用户流接收到:> user1,1,1970-01-01T00:00:00.009
  13. 城市流接收到:> 1,shanghai,1970-01-01T00:00:00.007
  14. 关联输出:> user1,1,shanghai,1970-01-01T00:00:00.007
  15. 城市流接收到:> 1,wuhan,1970-01-01T00:00:00.010
  16. 关联输出:> user1,1,shanghai,1970-01-01T00:00:00.009
  17. 用户流接收到:> user1,1,1970-01-01T00:00:00.011
  18. 城市流接收到:> 1,nanjing,1970-01-01T00:00:00.015
  19. 关联输出:> user1,1,wuhan,1970-01-01T00:00:00.011

5. 四种维表关联方式总结

Flink维表Join实践 - 图4