banner.webp
Flink具有非常灵活的分层API设计,其中核心层就是DataStream/DataSet API。由于新版已经实现了流批一体,DataSet API即将被弃用,官方推荐统一使用DataStream API处理流数据与批数据。DataStream本身是Flink中一个用来表示数据集合的类,我们编写的Flink代码其实就是基于这种数据类型的处理,所以这套核心API就以DataStream命名。在用法上有点类似于常规的Java集合,但又有所不同。我们在代码中往往并不关心集合中具体的数据,而只是用API定义出一连串操作来处理他们:我们称之为数据流的“转换”。
一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都是由以下几部分构成:

  • 获取执行环境(execution environment)
  • 读取数据源(source)
  • 定义基于数据的转换操作(transformations)
  • 定义计算结果的输出位置(sink)
  • 触发程序执行(execute)

1. 执行环境(Execution Environment)

  1. Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。
  2. 不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。只有获取了环境 上下文信息,才能将具体的任务调度到不同的 TaskManager 执行。

1.1 创建执行环境
  1. 编写Flink程序的第一步,就是创建执行环境。我们要获取的执行环境,是<br />StreamExecutionEnvironment 类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。
  • getExecutionEnvironment
  • createLocalEnvironment
  • createRemoteEnvironment
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. //会根据当前运行环境直接得到正确的结果
  1. StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
  2. //这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数
  1. StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
  2. "hadoop0", // JobManager 主机名
  3. 7777, // JobManager 进程端口号
  4. "test/wordcount.jar" // 提交给 JobManager 的 JAR 包
  5. );

1.2 执行模式(Execution Mode)
  1. 流批一体后,我们获取到的执行环境是StreamExecutionEnvironment,看名称它是做流处理的,那么我们如何去执行批处理呢?
  1. // 批处理环境
  2. ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
  3. // 流处理环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 那么我们不难看出:基于ExecutionEnvironment读入数据创建的数据集合,就是DataSet;对应的调用的一整套转换方法,就是DataSet API
  2. 由于Flink程序默认是STREAMING模式,我们这里重点介绍一下BATCH模式的配置,分别是命令行配置与代码配置
  1. flink run -Dexecution.runtime-mode=BATCH /test/wordcount.jar
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  1. 当然不建议在代码配置,使用命令行配置比较灵活,既可以使用流处理也可以使用批处理。

2. 读取数据源(Source)

  1. 创建好执行环境之后,我们先要读取数据才能对数据做处理,Flink读取数据,是调用执行环境的addSource()方法
  1. DataStream<String> stream = env.addSource(...);
  1. 方法传入一个对象参数,需要实现 SourceFunction 接口;返回 DataStreamSource。这里的DataStreamSource 类继承自 SingleOutputStreamOperator 类,又进一步继承自 DataStream。所以很明显,读取数据的 source 操作是一个算子,得到的是一个数据流(DataStream)。
  2. 传入的参数是一个“源函数”(source function),需要实现 SourceFunction 接口。这是何方神圣,又该怎么实现呢? 自己去实现它显然不会是一件容易的事。好在 Flink 直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的 source function,通常情况下足以应对我们的实际需求。
  3. 我们这里构建一个实际应用场景。比如网站的访问操作,我们把它抽象成一个三元组(用户名、用户访问的url,用户访问url的时间戳),我们这里创建一个类,将用户行为包装成一个对象
  1. import java.sql.Timestamp;
  2. public class Event {
  3. public String user;
  4. public String url;
  5. public Long timestamp;
  6. public Event() {
  7. }
  8. public Event(String user, String url, Long timestamp) {
  9. this.user = user;
  10. this.url = url;
  11. this.timestamp = timestamp;
  12. }
  13. @Override
  14. public String toString() {
  15. return "Event{" +
  16. "user='" + user + '\'' +
  17. ", url='" + url + '\'' +
  18. ", timestamp=" + new Timestamp(timestamp) +
  19. '}';
  20. }
  1. 这里需要注意,我们定义的 Event,有这样几个特点:
  • 类是公有(public)的
  • 有一个无参的构造方法
  • 所有属性都是公有(public)的
  • 所有属性的类型都是可以序列化的
    Flink 会把这样的类作为一种特殊的 POJO 数据类型来对待,方便数据的解析和序列化。另外我们在类中还重写了 toString 方法,主要是为了测试输出显示更清晰。

2.1 从集合中读取数据
  1. 最简单的读取方式,就是在代码中创建一个Java集合,然后调用执行环境的fromCollection方法进行读取,一般用于测试。
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. //设置并行度
  3. env.setParallelism(1);
  4. ArrayList<Event> clicks = new ArrayList<>();
  5. clicks.add(new Event("Mary","./home",1000L));
  6. clicks.add(new Event("Bob","./cart",2000L));
  7. DataStream<Event> stream = env.fromCollection(clicks);
  1. 也可以不创建集合,直接列举
  1. DataStreamSource<Event> stream2 = env.fromElements(
  2. new Event("Mary", "./home", 1000L),
  3. new Event("Bob", "./cart", 2000L)
  4. );

2.2 从文件读取数据
  1. 在真正的实际应用中,不会直接将数据写在代码里。通常情况下我们会从存储介质中获取数据。比较常见的就是读取文件
  1. DataStream<String> stream = env.readTextFile("clicks.csv");

说明:

  • 参数可以是目录,也可以是文件;
  • 路径可以是相对路径,也可以是绝对路径;
  • 相对路径是从系统属性 user.dir 获取路径: idea 下是 project 的根目录, standalone 模式下是集群节点根目录;
  • 也可以从 hdfs 目录下读取, 使用路径 hdfs://…, 由于 Flink 没有提供 hadoop 相关依赖, 需要 pom 中添加相关依赖:
  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>${hadoop.version}</version>
  5. <scope>provided</scope>
  6. </dependency>

2.3 从Socket读取数据
  1. 无论从集合还是文件,我们读取的数据其实都是有界数据。在流处理场景中,数据往往都是无界的,一种比较简单的方式,就是读取socket文本流,但这种一般也用于测试
  1. DataStream<String> stream = env.socketTextStream("localhost", 7777);

2.4 从Kafka读取数据
  1. Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰和流处理是完全一致的。所以可以说 Kafka Flink 天生一对,是当前处理流式 数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选。
  2. 略微遗憾的是,与 Kafka 的连接比较复杂,Flink 内部并没有提供预实现的方法。所以我 们只能采用通用的 addSource 方式、实现一个 SourceFunction 了。好在KafkaFlink确实是非常契合,所以Flink官方提供了连接工具flink-connector-kafka 直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction
  3. 只需导入下面依赖
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  1. 然后调用env.addSource(),传入FlinkKafkaConsumer的对象实例即可
  1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import java.util.Properties;
  6. public class SourceKafkaTest {
  7. public static void main(String[] args) throws Exception {
  8. StreamExecutionEnvironment env =
  9. StreamExecutionEnvironment.getExecutionEnvironment();
  10. env.setParallelism(1);
  11. Properties properties = new Properties();
  12. properties.setProperty("bootstrap.servers", "hadoop0:9092");
  13. properties.setProperty("group.id", "consumer-group");
  14. properties.setProperty("key.deserializer",
  15. "org.apache.kafka.common.serialization.StringDeserializer");
  16. properties.setProperty("value.deserializer",
  17. "org.apache.kafka.common.serialization.StringDeserializer");
  18. properties.setProperty("auto.offset.reset", "latest");
  19. DataStreamSource<String> stream = env.addSource(new
  20. FlinkKafkaConsumer<String>(
  21. "clicks",
  22. new SimpleStringSchema(),
  23. properties
  24. ));
  25. stream.print("Kafka");
  26. env.execute();
  27. }
  28. }

创建 FlinkKafkaConsumer 时需要传入三个参数:

  • 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic 列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据 时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
  • 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消 息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以我们也可以自定义反序列化逻辑。
  • 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。

    当然你也可以自定义Source,这里暂不做过多讨论。

3. 转换算子(Transform)

3.1 基本转换算子

3.1.1 映射(map)
  1. map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
  2. 我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是 接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改变。
  3. 下面我们用不同的方式提取Event中的user字段功能。
  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. public class TransMapTest {
  5. public static void main(String[] args) throws Exception{
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. DataStreamSource<Event> stream = env.fromElements(
  10. new Event("Mary", "./home", 1000L),
  11. new Event("Bob", "./cart", 2000L)
  12. );
  13. // 传入匿名类,实现 MapFunction
  14. stream.map(new MapFunction<Event, String>() {
  15. @Override
  16. public String map(Event e) throws Exception {
  17. return e.user;
  18. }
  19. });
  20. // 传入 MapFunction 的实现类
  21. stream.map(new UserExtractor()).print();
  22. env.execute();
  23. }
  24. public static class UserExtractor implements MapFunction<Event, String> {
  25. @Override
  26. public String map(Event e) throws Exception {
  27. return e.user;
  28. }
  29. } }

3.1.2 过滤(filter)
  1. filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤 条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。
  2. 进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参 数需要实现 FilterFunction 接口,而 FilterFunction 内要实现 filter()方法,就相当于一个返回布尔类型的条件表达式。下面的代码会将数据流中用户 Mary 的浏览行为过滤出来
  1. import org.apache.flink.api.common.functions.FilterFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. public class TransFilterTest {
  5. public static void main(String[] args) throws Exception {
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. DataStreamSource<Event> stream = env.fromElements(
  10. new Event("Mary", "./home", 1000L),
  11. new Event("Bob", "./cart", 2000L)
  12. );
  13. // 传入匿名类实现 FilterFunction
  14. stream.filter(new FilterFunction<Event>() {
  15. @Override
  16. public boolean filter(Event e) throws Exception {
  17. return e.user.equals("Mary");
  18. }
  19. });
  20. // 传入 FilterFunction 实现类
  21. stream.filter(new UserFilter()).print();
  22. env.execute();
  23. }
  24. public static class UserFilter implements FilterFunction<Event> {
  25. @Override
  26. public boolean filter(Event e) throws Exception {
  27. return e.user.equals("Mary");
  28. }
  29. }
  30. }

3.1.3 扁平映射(flatMap)
  1. flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个 一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
  2. flatMap 操作会应用在每一个输入事件上面,FlatMapFunction 接口中定义了 flatMap 方法, 用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回 0 个、1 个或多个结果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调用,也可以不调用。所以 flatMap 方法也可以实现 map 方法和 filter 方法的功能,当返回结果 0 个的时候,就相当于对数据进行了过滤,当返回结果是 1 个的时候,相当于对数据进行了简单的转换操作。
  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.util.Collector;
  5. public class TransFlatmapTest {
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env =
  8. StreamExecutionEnvironment.getExecutionEnvironment();
  9. env.setParallelism(1);
  10. DataStreamSource<Event> stream = env.fromElements(
  11. new Event("Mary", "./home", 1000L),
  12. new Event("Bob", "./cart", 2000L)
  13. );
  14. stream.flatMap(new MyFlatMap()).print();
  15. env.execute();
  16. }
  17. public static class MyFlatMap implements FlatMapFunction<Event, String> {
  18. @Override
  19. public void flatMap(Event value, Collector<String> out) throws Exception {
  20. if (value.user.equals("Mary")) {
  21. out.collect(value.user);
  22. } else if (value.user.equals("Bob")) {
  23. out.collect(value.user);
  24. out.collect(value.url);
  25. }
  26. }
  27. }
  28. }

3.2 聚合算子(Aggregation)
  1. 直观上看,基本转换算子确实是在“转换”——因为它们都是基于当前数据,去做了处理 和输出。而在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的 信息。比如之前 word count 程序中,要对每个词出现的频次进行叠加统计。这种操作,计算 的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。

3.2.1 按键分区(keyBy)
  1. 对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区; 这个操作就是通过 keyBy 来完成的。keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑 上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。
  2. 基于不同的 key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了。
  3. 在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以 这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器 KeySelector),用于说明从数据中提取 key 的逻辑。
  4. 我们可以以 id 作为 key 做一个分区操作,代码实现如下:
  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.datastream.KeyedStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. public class TransKeyByTest {
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env =
  8. StreamExecutionEnvironment.getExecutionEnvironment();
  9. env.setParallelism(1);
  10. DataStreamSource<Event> stream = env.fromElements(
  11. new Event("Mary", "./home", 1000L),
  12. new Event("Bob", "./cart", 2000L)
  13. );
  14. // 使用 Lambda 表达式
  15. KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);
  16. // 使用匿名类实现 KeySelector
  17. KeyedStream<Event, String> keyedStream1 = stream.keyBy(new KeySelector<Event, String>() {
  18. @Override
  19. public String getKey(Event e) throws Exception {
  20. return e.user;
  21. }
  22. });
  23. stream.print();
  24. env.execute();
  25. }
  26. }
  1. 需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为 KeyedStreamKeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照 key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 类型。
  2. KeyedStream 也继承自 DataStream,所以基于它的操作也都归属于 DataStream API。但它跟之前的转换操作得到的 SingleOutputStreamOperator 不同,只是一个流的分区操作,并不是 一个转换算子。KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操 作(比如 sumreduce);而且它可以将当前算子任务的状态(state)也按照 key 进行划分、限定为仅对当前 key 有效。

3.2.2 简单聚合
  1. 有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们 内置实现了一些最基本、最简单的聚合 API,主要有以下几种:
  • sum():在输入流上,对指定的字段做叠加求和的操作。
  • min():在输入流上,对指定的字段求最小值。
  • max():在输入流上,对指定的字段求最大值。
  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。
  • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与 min()/minBy()完全一致。

    简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数; 但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。

    对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以 f0、f1、f2、…来命名的。

  1. import org.apache.flink.api.java.tuple.Tuple2;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. public class TransTupleAggreationTest {
  5. public static void main(String[] args) throws Exception {
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(
  10. Tuple2.of("a", 1),
  11. Tuple2.of("a", 3),
  12. Tuple2.of("b", 3),
  13. Tuple2.of("b", 4)
  14. );
  15. stream.keyBy(r -> r.f0).sum(1).print();
  16. stream.keyBy(r -> r.f0).sum("f1").print();
  17. stream.keyBy(r -> r.f0).max(1).print();
  18. stream.keyBy(r -> r.f0).max("f1").print();
  19. stream.keyBy(r -> r.f0).min(1).print();
  20. stream.keyBy(r -> r.f0).min("f1").print();
  21. stream.keyBy(r -> r.f0).maxBy(1).print();
  22. stream.keyBy(r -> r.f0).maxBy("f1").print();
  23. stream.keyBy(r -> r.f0).minBy(1).print();
  24. stream.keyBy(r -> r.f0).minBy("f1").print();
  25. env.execute();
  26. }
  27. }

而如果数据流的类型是 POJO 类,那么就只能通过字段名称来指定,不能通过位置来指定

  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. public class TransPojoAggregationTest {
  4. public static void main(String[] args) throws Exception {
  5. StreamExecutionEnvironment env =
  6. StreamExecutionEnvironment.getExecutionEnvironment();
  7. env.setParallelism(1);
  8. DataStreamSource<Event> stream = env.fromElements(
  9. new Event("Mary", "./home", 1000L),
  10. new Event("Bob", "./cart", 2000L)
  11. );
  12. stream.keyBy(e -> e.user).max("timestamp").print(); // 指定字段名称
  13. env.execute();
  14. }
  15. }
  1. 简单聚合算子返回的,同样是一个 SingleOutputStreamOperator,也就是从 KeyedStream 转换成了常规的 DataStream。所以可以这样理解:keyBy 和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不 变。
  2. 一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。 所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值 的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个 key 的数据流上。

3.2.3 归约聚合(reduce)
  1. 如果说简单聚合是对一些特定统计需求的实现,那么 reduce 算子就是一个一般化的聚合 统计操作了。从大名鼎鼎的 MapReduce 开始,我们对 reduce 操作就不陌生:它可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
  2. 与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
  3. ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再 将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据, 这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果” 作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
  4. 其实,reduce 的语义是针对列表进行规约操作,运算规则由 ReduceFunction 中的 reduce方法来定义,而在 ReduceFunction 内部会维护一个初始值为空的累加器,注意累加器的类型和输入元素的类型相同,当第一条元素到来时,累加器的值更新为第一条元素的值,当新的元素到来时,新元素会和累加器进行累加操作,这里的累加操作就是 reduce 函数定义的运算规 则。然后将更新以后的累加器的值向下游输出。
  5. 我们可以单独定义一个函数类实现 ReduceFunction 接口,也可以直接传入一个匿名类。 当然,同样也可以通过传入 Lambda 表达式实现类似的功能。
  6. 与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStrema。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
  7. 看一下稍复杂的例子,我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个 用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。
  1. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  2. import java.util.Calendar;
  3. import java.util.Random;
  4. public class ClickSource implements SourceFunction<Event> {
  5. // 声明一个布尔变量,作为控制数据生成的标识位
  6. private Boolean running = true;
  7. @Override
  8. public void run(SourceContext<Event> ctx) throws Exception {
  9. Random random = new Random(); // 在指定的数据集中随机选取数据
  10. String[] users = {"Mary", "Alice", "Bob", "Cary"};
  11. String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
  12. "./prod?id=2"};
  13. while (running) {
  14. ctx.collect(new Event(
  15. users[random.nextInt(users.length)],
  16. urls[random.nextInt(urls.length)],
  17. Calendar.getInstance().getTimeInMillis()
  18. ));
  19. // 隔 1 秒生成一个点击事件,方便观测
  20. Thread.sleep(1000);
  21. }
  22. }
  23. @Override
  24. public void cancel() {
  25. running = false;
  26. }
  27. }
  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.common.functions.ReduceFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. public class TransReduceTest {
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env =
  8. StreamExecutionEnvironment.getExecutionEnvironment();
  9. env.setParallelism(1);
  10. // 这里的 ClickSource()使用了之前自定义数据源小节中的 ClickSource()
  11. env.addSource(new ClickSource())
  12. // 将 Event 数据类型转换成元组类型
  13. .map(new MapFunction<Event, Tuple2<String, Long>>() {
  14. @Override
  15. public Tuple2<String, Long> map(Event e) throws Exception {
  16. return Tuple2.of(e.user, 1L);
  17. }
  18. })
  19. .keyBy(r -> r.f0) // 使用用户名来进行分流
  20. .reduce(new ReduceFunction<Tuple2<String, Long>>() {
  21. @Override
  22. public Tuple2<String, Long> reduce(Tuple2<String, Long> value1,
  23. Tuple2<String, Long> value2) throws Exception {
  24. // 每到一条数据,用户 pv 的统计值加 1
  25. return Tuple2.of(value1.f0, value1.f1 + value2.f1);
  26. }
  27. })
  28. .keyBy(r -> true) // 为每一条数据分配同一个 key,将聚合结果发送到一条流中去
  29. .reduce(new ReduceFunction<Tuple2<String, Long>>() {
  30. @Override
  31. public Tuple2<String, Long> reduce(Tuple2<String, Long> value1,
  32. Tuple2<String, Long> value2) throws Exception {
  33. // 将累加器更新为当前最大的 pv 统计值,然后向下游发送累加器的值
  34. return value1.f1 > value2.f1 ? value1 : value2;
  35. }
  36. })
  37. .print();
  38. env.execute();
  39. }
  40. }
  1. reduce 同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以我们需要将 reduce 算子作用在一个有限 key 的流上。

3.3 用户自定义函数(UDF)

3.3.1 函数类
  1. 对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口, 来完成处理逻辑的定义。Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类, 例如 MapFunctionFilterFunctionReduceFunction 等。所以最简单直接的方式,就是自定义一个函数类,实现对应的接口。
  2. 下面例子实现了FilterFunction接口,用来筛选url中包含“home”的事件。
  1. import org.apache.flink.api.common.functions.FilterFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. public class TransFunctionUDFTest {
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env =
  8. StreamExecutionEnvironment.getExecutionEnvironment();
  9. env.setParallelism(1);
  10. DataStreamSource<Event> clicks = env.fromElements(
  11. new Event("Mary", "./home", 1000L),
  12. new Event("Bob", "./cart", 2000L)
  13. );
  14. DataStream<Event> stream = clicks.filter(new FlinkFilter());
  15. stream.print();
  16. env.execute();
  17. }
  18. public static class FlinkFilter implements FilterFunction<Event> {
  19. @Override
  20. public boolean filter(Event value) throws Exception {
  21. return value.url.contains("home");
  22. }
  23. }
  24. }

也可以通过匿名函数类实现

  1. DataStream<String> stream = clicks.filter(new FilterFunction<Event>() {
  2. @Override
  3. public boolean filter(Event value) throws Exception {
  4. return value.url.contains("home");
  5. }
  6. });

为了方法更加通用,我们还可以将用于过滤关键字“home”抽象出来作为类的属性,调用构造方法时传进去。

  1. DataStream<Event> stream = clicks.filter(new KeyWordFilter("home"));
  2. public static class KeyWordFilter implements FilterFunction<Event> {
  3. private String keyWord;
  4. KeyWordFilter(String keyWord) {
  5. this.keyWord = keyWord;
  6. }
  7. @Override
  8. public boolean filter(Event value) throws Exception {
  9. return value.url.contains(this.keyWord);
  10. }
  11. }

3.3.2 匿名函数(Lambda)
  1. 匿名函数(Lambda 表达式)是 Java 8 引入的新特性,方便我们更加快速清晰地写代码。Lambda 表达式允许以简洁的方式实现函数,以及将函数作为参数来进行传递,而不必声明额外的(匿名)类。
  2. Flink 的所有算子都可以使用 Lambda 表达式的方式来进行编码,但是,当 Lambda 达式使用 Java 的泛型时,我们需要显式的声明类型信息。
  3. 下例演示了如何使用 Lambda 表达式来实现一个简单的 map() 函数,我们使用 Lambda 表达式来计算输入的平方。在这里,我们不需要声明 map() 函数的输入 i 和输出参数的数据类型,因为 Java 编译器会对它们做出类型推断。
  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. public class TransFunctionLambdaTest {
  5. public static void main(String[] args) throws Exception {
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. DataStreamSource<Event> clicks = env.fromElements(
  10. new Event("Mary", "./home", 1000L),
  11. new Event("Bob", "./cart", 2000L)
  12. );
  13. //map 函数使用 Lambda 表达式,返回简单类型,不需要进行类型声明
  14. DataStream<String> stream1 = clicks.map(event -> event.url);
  15. stream1.print();
  16. env.execute();
  17. }
  18. }
  1. 由于 OUT String 类型而不是泛型,所以 Flink 可以从函数签名 OUT map(IN value) 的实现中自动提取出结果的类型信息。
  2. 但是对于像 flatMap() 这样的函数,它的函数签名 void flatMap(IN value, Collector out) Java 编译器编译成了 void flatMap(IN value, Collector out),也就是说将 Collector 的泛型信息擦除掉了。这样 Flink 就无法自动推断输出的类型信息了。
  1. // flatMap 使用 Lambda 表达式,抛出异常
  2. DataStream<String> stream2 = clicks.flatMap((event, out) -> {
  3. out.collect(event.url);
  4. });
  5. stream2.print();

在这种情况下,我们需要显式地指定类型信息,否则输出将被视为 Object 类型

  1. // flatMap 使用 Lambda 表达式,必须通过 returns 明确声明返回类型
  2. DataStream<String> stream2 = clicks.flatMap((Event event, Collector<String> out) -> {
  3. out.collect(event.url);
  4. }).returns(Types.STRING);
  5. stream2.print();

当使用 map() 函数返回 Flink 自定义的元组类型时也会发生类似的问题。下例中的函数签 名 Tuple2 map(Event value) 被类型擦除为 Tuple2 map(Event value)。

  1. //使用 map 函数也会出现类似问题,以下代码会报错
  2. DataStream<Tuple2<String, Long>> stream3 = clicks
  3. .map( event -> Tuple2.of(event.user, 1L) );
  4. stream3.print();

一般来说,这个问题可以通过多种方式解决

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. public class ReturnTypeResolve {
  8. public static void main(String[] args) throws Exception {
  9. StreamExecutionEnvironment env =
  10. StreamExecutionEnvironment.getExecutionEnvironment();
  11. env.setParallelism(1);
  12. DataStreamSource<Event> clicks = env.fromElements(
  13. new Event("Mary", "./home", 1000L),
  14. new Event("Bob", "./cart", 2000L)
  15. );
  16. // 想要转换成二元组类型,需要进行以下处理
  17. // 1) 使用显式的 ".returns(...)"
  18. DataStream<Tuple2<String, Long>> stream3 = clicks
  19. .map(event -> Tuple2.of(event.user, 1L))
  20. .returns(Types.TUPLE(Types.STRING, Types.LONG));
  21. stream3.print();
  22. // 2) 使用类来替代 Lambda 表达式
  23. clicks.map(new MyTuple2Mapper())
  24. .print();
  25. // 3) 使用匿名类来代替 Lambda 表达式
  26. clicks.map(new MapFunction<Event, Tuple2<String, Long>>() {
  27. @Override
  28. public Tuple2<String, Long> map(Event value) throws Exception {
  29. return Tuple2.of(value.user, 1L);
  30. }
  31. }).print();
  32. env.execute();
  33. }
  34. // 自定义 MapFunction 的实现类
  35. public static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String,
  36. Long>> {
  37. @Override
  38. public Tuple2<String, Long> map(Event value) throws Exception {
  39. return Tuple2.of(value.user, 1L);
  40. }
  41. }
  42. }

3.3.3 富函数类
  1. “富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunctionRichFilterFunction RichReduceFunction 等。
  2. 既然“富”,那么它一定会比常规的函数类提供更多、更丰富的功能。与常规函数类的不 同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
  3. 一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在 map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以 我们可以在 open()中建立连接,在 map()中读写数据,而在 close()中关闭连接。所以我们推荐的最佳实践如下:
  1. public class MyFlatMap extends RichFlatMapFunction<IN, OUT>> {
  2. @Override
  3. public void open(Configuration configuration) {
  4. // 做一些初始化工作
  5. // 例如建立一个和 MySQL 的连接
  6. }
  7. @Override
  8. public void flatMap(IN in, Collector<OUT out) {
  9. // 对数据库进行读写
  10. }
  11. @Override
  12. public void close() {
  13. // 清理工作,关闭和 MySQL 数据库的连接。
  14. } }

4. 输出算子(Sink)

  1. Flink 中,如果我们希望将数据写入外部系统,其实并不是一件难事。我们知道所有算 子都可以通过实现函数类来自定义处理逻辑,所以只要有读写客户端,与外部系统的交互在任 何一个处理算子中都可以实现。例如在 MapFunction 中,我们完全可以构建一个到 Redis 的连接,然后将当前处理的结果保存到 Redis 中。如果考虑到只需建立一次连接,我们也可以利用 RichMapFunction,在 open() 生命周期中做连接操作。
  2. 这样看起来很方便,却会带来很多问题。Flink 作为一个快速的分布式实时流处理系统, 对稳定性和容错性要求极高。一旦出现故障,我们应该有能力恢复之前的状态,保障处理结果 的正确性。这种性质一般被称作“状态一致性”。Flink 内部提供了一致性检查点(checkpoint 来保障我们可以回滚到正确的状态;但如果我们在处理过程中任意读写外部系统,发生故障后就很难回退到从前了。
  3. 为了避免这样的问题,Flink DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外 部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 子完成的。 Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论 怎样理解,Sink Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。
  4. Flink 官方提供了一部分的框架的 Sink 连接器。如图所示,列出了 Flink 官方目前支持的第三方系统连接器

1.png

  1. 我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source/sink 两端都能连 接,可读可写;而对于 Elasticsearch、文件系统(FileSystem)、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。
  2. Flink 官方之外,Apache Bahir 作为给 Spark Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器。
  3. 除此之外,就需要用户自定义实现sink连接器了。

4.1 输出到文件
  1. 最简单的输出方式,当然就是写入文件了。对应着读取文件作为输入数据源,Flink 本来 也有一些非常简单粗暴的输出到文件的预实现方法:如 writeAsText()、writeAsCsv(),可以直接将输出结果保存到文本文件或 Csv 文件。但我们知道,这种方式是不支持同时写入一份文 件的;所以我们往往会将最后的 Sink 操作并行度设为 1,这就大大拖慢了系统效率;而且对于故障恢复后的状态一致性,也没有任何保证。所以目前这些简单的方法已经要被弃用。
  2. Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类 RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink 支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分 区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶” 的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保 存的文件,记录的都是 1 小时的输出数据。
  3. StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用 StreamingFileSink 的静态方法:
  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)
  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)

    在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径 (basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。 下面我们就以行编码为例,将一些测试数据直接写入文件:

  1. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
  2. import org.apache.flink.core.fs.Path;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
  6. import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
  7. import java.util.concurrent.TimeUnit;
  8. public class SinkToFileTest {
  9. public static void main(String[] args) throws Exception {
  10. StreamExecutionEnvironment env =
  11. StreamExecutionEnvironment.getExecutionEnvironment();
  12. env.setParallelism(4);
  13. DataStreamSource<Event> stream = env.fromElements(new Event("Mary",
  14. "./home", 1000L),
  15. new Event("Bob", "./cart", 2000L),
  16. new Event("Alice", "./prod?id=100", 3000L),
  17. new Event("Alice", "./prod?id=200", 3500L),
  18. new Event("Bob", "./prod?id=2", 2500L),
  19. new Event("Alice", "./prod?id=300", 3600L),
  20. new Event("Bob", "./home", 3000L),
  21. new Event("Bob", "./prod?id=1", 2300L),
  22. new Event("Bob", "./prod?id=3", 3300L));
  23. StreamingFileSink<String> fileSink = StreamingFileSink
  24. .<String>forRowFormat(new Path("./output"),
  25. new SimpleStringEncoder<>("UTF-8"))
  26. .withRollingPolicy(
  27. DefaultRollingPolicy.builder()
  28. .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)
  29. )
  30. .withInactivityInterval(TimeUnit.MINUTES.toMillis(5
  31. ))
  32. .withMaxPartSize(1024 * 1024 * 1024)
  33. .build())
  34. .build();
  35. // 将 Event 转换成 String 写入文件
  36. stream.map(Event::toString).addSink(fileSink);
  37. env.execute();
  38. }
  39. }
  1. 这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策 略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以 我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面 的代码设置了在以下 3 种情况下,我们就会滚动分区文件:
  • 至少包含 15 分钟的数据
  • 最近 5 分钟没有收到新的数据
  • 文件大小已达到 1 GB

4.2 输出到Kafka
  1. 和输入源类似,直接看代码吧
  1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  5. import java.util.Properties;
  6. public class SinkToKafkaTest {
  7. public static void main(String[] args) throws Exception {
  8. StreamExecutionEnvironment env =
  9. StreamExecutionEnvironment.getExecutionEnvironment();
  10. env.setParallelism(1);
  11. Properties properties = new Properties();
  12. properties.put("bootstrap.servers", "hadoop0:9092");
  13. DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");
  14. stream
  15. .addSink(new FlinkKafkaProducer<String>(
  16. "clicks",
  17. new SimpleStringSchema(),
  18. properties
  19. ));
  20. env.execute();
  21. }
  22. }
  1. FlinkKafkaProducer 继承了抽象类 TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提 交提供了 Flink Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。关于这部分内容,我们后续再谈。
  2. 可以启动一个消费者查看是否收到了数据
  1. /kafka-console-consumer.sh --bootstrap-server hadoop0:9092 --topic clicks

4.3 输出到Redis
  1. Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具。但版本升级略显滞后,目前连接器版本为 1.0,支持的 Scala 版本最新到 2.11。由于我们的测试不涉及到 Scala 的相关版本变化,所以并不影响使用。
  1. <dependency>
  2. <groupId>org.apache.bahir</groupId>
  3. <artifactId>flink-connector-redis_2.11</artifactId>
  4. <version>1.0</version>
  5. </dependency>
  1. 连接器为我们提供了一个 RedisSink,它继承了抽象类 RichSinkFunction,这就是已经实现好的向 Redis 写入数据的 SinkFunction。我们可以直接将 Event 数据输出到 Redis
  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.streaming.connectors.redis.RedisSink;
  3. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  4. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  5. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  6. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  7. public class SinkToRedisTest {
  8. public static void main(String[] args) throws Exception {
  9. StreamExecutionEnvironment env =
  10. StreamExecutionEnvironment.getExecutionEnvironment();
  11. env.setParallelism(1);
  12. // 创建一个到 redis 连接的配置
  13. FlinkJedisPoolConfig conf = new
  14. FlinkJedisPoolConfig.Builder().setHost("hadoop0").build();
  15. env.addSource(new ClickSource())
  16. .addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
  17. env.execute();
  18. }
  19. }
  20. class MyRedisMapper implements RedisMapper<Event> {
  21. @Override
  22. public String getKeyFromData(Event e) {
  23. return e.user;
  24. }
  25. @Override
  26. public String getValueFromData(Event e) {
  27. return e.url;
  28. }
  29. @Override
  30. public RedisCommandDescription getCommandDescription() {
  31. return new RedisCommandDescription(RedisCommand.HSET, "clicks");
  32. }
  33. }

在这里我们可以看到,保存到 Redis 时调用的命令是 HSET,所以是保存为哈希表(hash),表名为“clicks”;保存的数据以 user 为 key,以 url 为 value,每来一条数据就会做一次转换。

查看Redis是否收到数据

image.png

4.4 输出到Elasticsearch
  1. Flink ElasticSearch 专门提供了官方的 Sink 连接器,Flink 1.13 支持当前最新版本的 ElasticSearch
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  5. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  6. import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
  7. import org.apache.http.HttpHost;
  8. import org.elasticsearch.action.index.IndexRequest;
  9. import org.elasticsearch.client.Requests;
  10. import java.util.ArrayList;
  11. import java.util.HashMap;
  12. public class SinkToEsTest {
  13. public static void main(String[] args) throws Exception {
  14. StreamExecutionEnvironment env =
  15. StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setParallelism(1);
  17. DataStreamSource<Event> stream = env.fromElements(
  18. new Event("Mary", "./home", 1000L),
  19. new Event("Bob", "./cart", 2000L),
  20. new Event("Alice", "./prod?id=100", 3000L),
  21. new Event("Alice", "./prod?id=200", 3500L),
  22. new Event("Bob", "./prod?id=2", 2500L),
  23. new Event("Alice", "./prod?id=300", 3600L),
  24. new Event("Bob", "./home", 3000L),
  25. new Event("Bob", "./prod?id=1", 2300L),
  26. new Event("Bob", "./prod?id=3", 3300L));
  27. ArrayList<HttpHost> httpHosts = new ArrayList<>();
  28. httpHosts.add(new HttpHost("hadoop0", 9200, "http"));
  29. // 创建一个 ElasticsearchSinkFunction
  30. ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new
  31. ElasticsearchSinkFunction<Event>() {
  32. @Override
  33. public void process(Event element, RuntimeContext ctx, RequestIndexer
  34. indexer) {
  35. HashMap<String, String> data = new HashMap<>();
  36. data.put(element.user, element.url);
  37. IndexRequest request = Requests.indexRequest()
  38. .index("clicks")
  39. .type("type") // Es 6 必须定义 type
  40. .source(data);
  41. indexer.add(request);
  42. }
  43. };
  44. stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts,
  45. elasticsearchSinkFunction).build());
  46. env.execute();
  47. }
  48. }

4.5 输出到MySQL(JDBC)
  1. 关系型数据库有着非常好的结构化数据设计、方便的 SQL 查询,是很多企业中业务数据 存储的主要形式。MySQL 就是其中的典型代表。尽管在大数据处理中直接与 MySQL 交互的 场景不多,但最终处理的计算结果是要给外部应用消费使用的,而外部应用读取的数据存储往 往就是 MySQL。所以我们也需要知道如何将数据输出到 MySQL 这样的传统数据库。

添加依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>mysql</groupId>
  8. <artifactId>mysql-connector-java</artifactId>
  9. <version>5.1.47</version>
  10. </dependency>

建表语句

  1. create table clicks(
  2. user varchar(20) not null,
  3. url varchar(100) not null);
  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  3. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  4. import org.apache.flink.connector.jdbc.JdbcSink;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. public class SinkToMySQL {
  7. public static void main(String[] args) throws Exception {
  8. StreamExecutionEnvironment env =
  9. StreamExecutionEnvironment.getExecutionEnvironment();
  10. env.setParallelism(1);
  11. DataStreamSource<Event> stream = env.fromElements(
  12. new Event("Mary", "./home", 1000L),
  13. new Event("Bob", "./cart", 2000L),
  14. new Event("Alice", "./prod?id=100", 3000L),
  15. new Event("Alice", "./prod?id=200", 3500L),
  16. new Event("Bob", "./prod?id=2", 2500L),
  17. new Event("Alice", "./prod?id=300", 3600L),
  18. new Event("Bob", "./home", 3000L),
  19. new Event("Bob", "./prod?id=1", 2300L),
  20. new Event("Bob", "./prod?id=3", 3300L));
  21. stream.addSink(
  22. JdbcSink.sink(
  23. "INSERT INTO clicks (user, url) VALUES (?, ?)",
  24. (statement, r) -> {
  25. statement.setString(1, r.user);
  26. statement.setString(2, r.url);
  27. },
  28. JdbcExecutionOptions.builder()
  29. .withBatchSize(1000)
  30. .withBatchIntervalMs(200)
  31. .withMaxRetries(5)
  32. .build(),
  33. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  34. .withUrl("jdbc:mysql://localhost:3306/test")
  35. // 对于 MySQL 8.0以上,用"com.mysql.cj.jdbc.Driver"
  36. .withDriverName("com.mysql.jdbc.Driver")
  37. .withUsername("root")
  38. .withPassword("123456")
  39. .build()
  40. )
  41. );
  42. env.execute();
  43. }
  44. }

运行main方法,在MySQL中查看

2.png

4.6 自定义输出
  1. 如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,又该怎么办呢? Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外 部存储。之前与外部系统的连接,其实都是连接器帮我们实现了 SinkFunction,现在既然没有 现成的,我们就只好自力更生了。例如,Flink 并没有提供 HBase 的连接器,所以需要我们自己写。
  2. 在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可 以实现将流里的数据发送出去的逻辑。
  3. 我们这里使用了 SinkFunction 的富函数版本,因为这里我们又使用到了生命周期的概念,创建 HBase 的连接以及关闭 HBase 的连接需要分别放在 open()方法和 close()方法中。
  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-client</artifactId>
  4. <version>${hbase.version}</version>
  5. </dependency>
  1. import org.apache.flink.configuration.Configuration;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.TableName;
  6. import org.apache.hadoop.hbase.client.Connection;
  7. import org.apache.hadoop.hbase.client.ConnectionFactory;
  8. import org.apache.hadoop.hbase.client.Put;
  9. import org.apache.hadoop.hbase.client.Table;
  10. import java.nio.charset.StandardCharsets;
  11. public class SinkCustomtoHBase {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env =
  14. StreamExecutionEnvironment.getExecutionEnvironment();
  15. env.setParallelism(1);
  16. env.fromElements("hello", "world")
  17. .addSink(
  18. new RichSinkFunction<String>() {
  19. public org.apache.hadoop.conf.Configuration configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径导入
  20. public Connection connection; // 管理 Hbase 连接
  21. @Override
  22. public void open(Configuration parameters) throws Exception {
  23. super.open(parameters);
  24. configuration = HBaseConfiguration.create();
  25. configuration.set("hbase.zookeeper.quorum", "hadoop0:2181");
  26. connection = ConnectionFactory.createConnection(configuration);
  27. }
  28. @Override
  29. public void invoke(String value, Context context) throws
  30. Exception {
  31. Table table = connection.getTable(TableName.valueOf("test")); // 表名为 test
  32. Put put = new Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkey
  33. put.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名
  34. , value.getBytes(StandardCharsets.UTF_8) // 写入的数据
  35. , "1".getBytes(StandardCharsets.UTF_8)); // 写入的数据
  36. table.put(put); // 执行 put 操作
  37. table.close(); // 将表关闭
  38. }
  39. @Override
  40. public void close() throws Exception {
  41. super.close();
  42. connection.close(); // 关闭连接
  43. }
  44. }
  45. );
  46. env.execute();
  47. }
  48. }

首先提前在HBase中创建test表

  1. create 'test','rowkey','info'

然后执行main方法,在HBase中查看

3.png

小结:这里对于转换算子只是一个简单介绍,Flink 中的操作远远不止这些,还有窗口(Window)、多流转换、底层的处理函数(Process Function)以及状态编程等更加高级的用法。 另外由于涉及读写外部系统,我们不只一次地提到了“精确一次(exactly once)”的状态一致性,这也是 Flink 的高级特性之一。关于这些内容,我们将在后续实践中慢慢了解。

本篇大篇幅文字与例子均来自:尚硅谷《剑指大数据-Flink学习精要(Java版)》,欢迎大家购买正版阅读~,如有侵权请联系作者删除。