概述

DataSource 顾名思义就是数据源,Flink 官方提供了多种数据获取方式,用于帮助开发者简单的快速的构建输入流。
在 Flink 中 可以使用StreamExecutionEnvironment._getExecutionEnvironment_().addSource(SourceFunction) 来添加数据源,当然,你也可以通过自己定义 SourceFunction 来自自定义 source ,目前 Flink 提供两种自定义的SourceFunction :

  • 非并行:定义 SourceFunction 实现非并行的 source
  • 并行:通过实现ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source

内置Data Source

StreamExcutionEnvironment 已实现的数据源
image.png

1.基于 Socket

socketTextStream 方法用于构建 Socket 的数据流,socketTextStream 有以下四个参数

  • hostname: 主机名;
  • port: 端口号,设置为 0 时,表示端口号自动分配;
  • delimiter:分隔符,用于分隔每条记录的分隔符;
  • maxRetry:程序的最大重试间隔,当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试

示例如下:

  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. public class socketTest throws Exception{
  4. public static void main(String[] args) {
  5. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. DataStreamSource<String> dataStream = env
  7. .socketTextStream("localhost", 9999,"/n",3); // 监听 localhost 的 9999 端口过来的数据,分隔符为"\n",最大重试间隔为3s
  8. env.print();
  9. env.excute("socketTest");
  10. }
  11. }

2.基于集合

Flink 基于集合构建有几种方式,如下:

  1. fromCollection(Collection):基于集合构建,集合中的所有元素必须是同一类型。
  2. fromElements(T …) : 基于元素构建,从给定的对象序列中创建数据流。所有对象类型必须相同。
  3. fromCollection(Iterator, Class) :从一个迭代器中创建数据流,Iterator 为迭代器,Class 指定了该迭代器返回元素的类型。
  4. fromParallelCollection(SplittableIterator, Class) : 从一个迭代器中创建并行数据流,SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆分到多个不相交的迭代器中,Class 指定了该迭代器返回元素的类型。
  5. generateSequence(from, to) :创建一个生成指定区间范围内的数字序列的并行数据流。

使用示例如下:

  1. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.util.NumberSequenceIterator;
  5. import java.util.*;
  6. public class collectionTest {
  7. public static void main(String[] args) throws Exception {
  8. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. // 基于集合构建,集合中的所有元素必须是同一类型。
  10. DataStream<Integer> ds1 = env.fromCollection(Arrays.asList(1,2,3,4,5,6));
  11. ds1.print();
  12. // 基于元素构建,从给定的对象序列中创建数据流。所有对象类型必须相同
  13. DataStream<Integer> ds2 = env.fromElements(1,2,3,4,5,6);
  14. ds2.print();
  15. // 从一个迭代器中创建数据流,Iterator 为迭代器,Class 指定了该迭代器返回元素的类型。
  16. DataStream<Integer> ds3 = env.fromCollection(new testIterator(),BasicTypeInfo.INT_TYPE_INFO);
  17. ds3.print();
  18. // 从一个迭代器中创建并行数据流,SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆分到多个不相交的迭代器中,Class 指定了该迭代器返回元素的类型。
  19. DataStream<Long> ds4 = env.fromParallelCollection(new NumberSequenceIterator(1L,10L),BasicTypeInfo.LONG_TYPE_INFO);
  20. ds4.print();
  21. //创建一个生成指定区间范围内的数字序列的并行数据流。
  22. DataStream<Long> ds5 = env.generateSequence(1,100);
  23. ds5.print();
  24. env.execute("collectionTest");
  25. }
  26. }

testIterator 为自己定义的迭代器,我们以产生0-10区间内的数据为例,需要注意的是这里的自定义迭代器除了实现 Iterator 接口外,还需要实现序列化接口 Serializable 否则会抛出序列化失败异常(xxx is not serializable):

  1. import java.io.Serializable;
  2. import java.util.Iterator;
  3. public class testIterator implements Iterator<Integer>, Serializable {
  4. private Integer i = 0;
  5. @Override
  6. public boolean hasNext() {
  7. return i<10;
  8. }
  9. @Override
  10. public Integer next() {
  11. i++;
  12. return i;
  13. }
  14. }

3.基于文件

  1. readTextFile(filePath) : 读取符合 TextInputFormat 规范的文本文件,并将其作为字符串返回。
  2. readTextFile(String filePath, String charsetName): filePath 文件名称,charsetName:读取文件的字符集名称,如UTF-8
  3. readFile(fileInputFormat, path) :按照指定格式读取文件。
  4. readFile(inputFormat, filePath, watchType, interval, typeInformation):按照指定格式周期性的读取文件。其中各个参数的含义如下:
    • inputFormat:输入的数据流格式。
    • filePath:文件路径,可以是本地文件系统上的路径,也可以是 HDFS 上的文件路径。
    • watchType:读取方式,它有两个可选值
      • FileProcessingMode.PROCESS_ONCE :指定路径上的数据只读取一次,然后退出;
      • FileProcessingMode.PROCESS_CONTINUOUSLY:路径进行定期地扫描和读取。 设置这个值需要注意当文件被修改时,其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理,因此这会打破 Flink 的 exactly-once 语义。
    • interval:定期扫描的时间间隔(以毫秒为单位)。
    • typeInformation:输入流中元素的类型

使用示例:

  1. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  2. import org.apache.flink.api.java.io.TextInputFormat;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.core.fs.Path;
  6. import static org.apache.flink.streaming.api.functions.source.FileProcessingMode.PROCESS_CONTINUOUSLY;
  7. public class readFileTest {
  8. public static void main(String[] args) throws Exception {
  9. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. // 读取为本地文件
  11. DataStream<String> fileStream = env.readFile(
  12. new TextInputFormat(new Path(myFilePath) //获取数据流格式,
  13. myFilePath //文件路径,
  14. PROCESS_CONTINUOUSLY //读取方式,
  15. 100 //定期扫描的时间间隔,这里设置为100毫秒,
  16. BasicTypeInfo.STRING_TYPE_INFO //输入流中元素的类型
  17. );
  18. fileStream.print();
  19. env.execute("readFileTest");
  20. }
  21. }

自定义source

如果想自定义 source 需要怎么做?
自定义 source 首先需要去了解下SourceFunction 接口了,它继承自一个标记接口(空接口)Function。
下图可以看到 SourceFunction 定义了两个接口方法:

  • run:启动一个 source,即对接一个外部数据源然后发送元素形成 stream(大部分情况下会通过在该方法里运行一个 while 循环的形式来产生 stream)
  • cancle:取消一个 source,也即将 run 中的循环 emit 元素的行为终止

image.png
话不多说,上代码:

  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  4. public class mySourceFunction {
  5. public static void main(String[] args) throws Exception {
  6. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7. DataStream<Integer> mySource = env.addSource(
  8. new SourceFunction<Integer>() {
  9. private Integer count = 0;
  10. private volatile boolean isRunning = true;
  11. @Override
  12. public void run(SourceContext<Integer> ctx) throws Exception {
  13. while(isRunning && count<=100){
  14. // 通过collect将输入发送出去
  15. ctx.collect(count);
  16. count++;
  17. }
  18. }
  19. @Override
  20. public void cancel() {
  21. isRunning=false;
  22. }
  23. }
  24. );
  25. mySource.print();
  26. env.execute("mySourceFunction");
  27. }
  28. }

前面说到 SourceFunction 实现的数据源是不具备并行度的,如果想实现并行度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口。
所以这里需要注意使用SourceFunction实现的数据源,在DataStream 上不能调用 setParallelism(n) 方法,否则会抛出如下异常:Exception in thread "main" java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
image.pngParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图: 的关系如下图:
image.png
ParallelSourceFunction 继承了 SourceFunction 接口,但是他没有其他额外的方法,仅仅只是用接口表明了自己是可以并行执行的 stream data source

AbstractRichFunction 只有默认的生命周期方法 open() 和 close() 的空实现

RichParallelSourceFunction 实现了ParallelSourceFunction 同时继承了AbstractRichFunction,所以其既有并行度的功能,也有生命周期的功能

Flink 内置连接器

Flink 已经实现了多种连接器,用于满足大多数的数据收集场景。当前内置连接器的支持情况如下:

  • Apache Kafka (支持 source 和 sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

除了上述的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink 相关的连接器如下:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

image.png
目前比较常用的 source 连接器是 kafka connection ,所以,这里我们以 kafka 为例,写一个自定义连接器

Kafka 连接器 Demo

Kafka 依赖

注意:使用 Kafka 连接器需要关注 Kafka 的版本,不同版本间所需的 Maven 依赖和开发时所调用的类均不相同,具体如下:

Maven 依赖 自从哪个版本开始支持 消费者和生产者的类名称 Kafka 版本 注意
flink-connector-kafka-0.8_2.11 1.0.0 FlinkKafkaConsumer08 FlinkKafkaProducer08 0.8.x 这个连接器在内部使用 Kafka 的 SimpleConsumer
API。偏移量由 Flink 提交给 ZK。
flink-connector-kafka-0.9_2.11 1.0.0 FlinkKafkaConsumer09 FlinkKafkaProducer09 0.9.x 这个连接器使用新的 Kafka Consumer API
flink-connector-kafka-0.10_2.11 1.2.0 FlinkKafkaConsumer010 FlinkKafkaProducer010 0.10.x 这个连接器支持 带有时间戳的 Kafka 消息
,用于生产和消费。
flink-connector-kafka-0.11_2.11 1.4.0 FlinkKafkaConsumer011 FlinkKafkaProducer011 0.11.x Kafka 从 0.11.x 版本开始不支持 Scala 2.10。此连接器支持了 Kafka 事务性的消息传递
来为生产者提供 Exactly once 语义。
flink-connector-kafka_2.11 1.7.0 FlinkKafkaConsumer FlinkKafkaProducer >= 1.0.0 这个通用的 Kafka 连接器尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。从 Flink 1.9 版本开始,它使用 Kafka 2.2.0 client。当前 Kafka 客户端向后兼容 0.10.0 或更高版本的 Kafka broker。 但是对于 Kafka 0.11.x 和 0.10.x 版本,我们建议你分别使用专用的 flink-connector-kafka-0.11_2.11 和 flink-connector-kafka-0.10_2.11 连接器。

Kafka 依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.12</artifactId>
  4. <version>1.12.1</version>
  5. </dependency>

代码示例

  1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import java.util.Properties;
  6. public class KafkaSourceDemo {
  7. public static final String broker_list = "localhost:9092";
  8. public static final String topic = "kafkaDemo";
  9. public static Properties properties;
  10. public static void main(String[] args) throws Exception {
  11. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. // 指定Kafka的连接地址
  13. properties = new Properties();
  14. properties.setProperty("bootstrap.servers", broker_list);
  15. DataStream<String> stream = env
  16. .addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties));
  17. stream.print();
  18. env.execute("Kafka Source Demo");
  19. }
  20. }

Docker 安装kafka

这里使用docker 安装 kafka 和 zk 当然你也可以在本地环境安装kafka

  1. version: '3.7'
  2. services:
  3. zookeeper:
  4. image: wurstmeister/zookeeper
  5. container_name: zk
  6. volumes:
  7. - ./data:/data
  8. ports:
  9. - 2182:2181
  10. kafka9094:
  11. image: wurstmeister/kafka
  12. container_name: kafka
  13. ports:
  14. - 9092:9092
  15. environment:
  16. KAFKA_BROKER_ID: 0
  17. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.60:9092
  18. KAFKA_CREATE_TOPICS: "kafeidou:2:0" #kafka启动后初始化一个有2个partition(分区)0个副本名叫kafeidou的topic
  19. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  20. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
  21. volumes:
  22. - ./kafka-logs:/kafka
  23. depends_on:
  24. - zookeeper

创建kafka topic

  1. # docker 使用 kafka 可能遇到问题比较多,这里按照我的docker写的kafka 生成topic
  2. bin/kafka-topics.sh --create --zookeeper zookeeper:2181/
  3. --topic kafkaDemo --replication-factor 1 --partitions 1
  4. # 本地
  5. bin/kafka-topics.sh --create --zookeeper localhost:2181/
  6. --topic kafkaDemo --replication-factor 1 --partitions 1

安装成功提示
image.png

查看所有topic

  1. # docker
  2. bin/kafka-topics.sh --list --zookeeper zookeeper:2181
  3. # 本地
  4. bin/kafka-topics.sh --list --zookeeper localhost:2181

可以看到输出有我们刚才创建的 kafkaDemo
image.png

启动生产者

  1. # docker
  2. kafka-console-producer.sh --broker-list localhost:9092 -
  3. -topic kafkaDemo
  4. # 本地
  5. kafka-console-producer.sh --broker-list localhost:9092 -
  6. -topic kafkaDemo

启动成功截图:
image.png

测试demo

启动 KafkaSourceDemo ,在生产者中输入数据,看Idea 控制台的输出
kafka生产者
image.png
idea 输出
image.png
至此,kafka 连接器的测试完成。

自定义 Mysql 连接器

现在我们自定义一个 Mysql 连接器

前置依赖

  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>8.0.15</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.projectlombok</groupId>
  8. <artifactId>lombok</artifactId>
  9. <version>1.18.12</version>
  10. <scope>provided</scope>
  11. </dependency>

mysql 建表语句

  1. CREATE TABLE `user_behavior` (
  2. `user_id` int NOT NULL,
  3. `item_id` int DEFAULT NULL,
  4. `category_id` int DEFAULT NULL,
  5. `behavior_type` varchar(255) DEFAULT NULL,
  6. `created_time` bigint DEFAULT NULL,
  7. `id` int NOT NULL AUTO_INCREMENT,
  8. PRIMARY KEY (`id`) USING BTREE
  9. ) ENGINE=InnoDB AUTO_INCREMENT=3869 DEFAULT CHARSET=utf8mb3

插入 Mysql 数据

这里只有部分数据,余下数据可以到我的百度网盘获取

  1. INSERT INTO `user_behavior` (`user_id`, `item_id`, `category_id`, `behavior_type`, `created_time`, `id`) VALUES (1000040, 3849409, 4756105, 'pv', 1511578189, 1);
  2. INSERT INTO `user_behavior` (`user_id`, `item_id`, `category_id`, `behavior_type`, `created_time`, `id`) VALUES (1000040, 3386371, 2355072, 'pv', 1511578330, 2);
  3. INSERT INTO `user_behavior` (`user_id`, `item_id`, `category_id`, `behavior_type`, `created_time`, `id`) VALUES (1000040, 2331370, 3607361, 'pv', 1511578403, 3);
  4. INSERT INTO `user_behavior` (`user_id`, `item_id`, `category_id`, `behavior_type`, `created_time`, `id`) VALUES (1000040, 3381470, 4145813, 'pv', 1511596534, 4);

User_behavior 实体类

user_behavior 实体类用lombok ,具体使用可以参考这篇lombok使用

  1. import lombok.Data;
  2. @Data
  3. public class UserBehavior {
  4. public int user_id;
  5. public int item_id;
  6. public int category_id;
  7. public String behavior_type;
  8. public Long timestamp;
  9. public UserBehavior(int user_id, int item_id, int category_id, String behavior_type, Long timestamp) {
  10. this.user_id = user_id;
  11. this.item_id = item_id;
  12. this.category_id = category_id;
  13. this.behavior_type = behavior_type;
  14. this.timestamp = timestamp;
  15. }
  16. }

Mysql Source

自定义的 MysqlSource 继承 RichSourceFunction ,实现里面的 open、close、run、cancel(暂时不实现) 方法,并写一个mysql连接器(getConnection)

  1. public class MysqlSource extends RichSourceFunction<UserBehavior> {
  2. PreparedStatement ps;
  3. private Connection connection;
  4. /**
  5. * open() 中建立连接,防止每次调用的时候都建立连接和释放连接
  6. * @param parameters
  7. * @throws Exception
  8. */
  9. @Override
  10. public void open(Configuration parameters) throws Exception {
  11. super.open(parameters);
  12. connection = getConnection();
  13. String sql = "select * from user_behavior";
  14. ps = this.connection.prepareStatement(sql);
  15. }
  16. @Override
  17. public void close() throws Exception {
  18. super.close();
  19. if(connection != null){
  20. connection.close();
  21. }
  22. if(ps != null) {
  23. ps.close();
  24. }
  25. }
  26. /**
  27. * DataStream 调用一次 run() 方法用来获取数据
  28. * @param ctx
  29. * @throws Exception
  30. */
  31. @Override
  32. public void run(SourceContext<UserBehavior> ctx) throws Exception {
  33. ResultSet resultSet = ps.executeQuery();
  34. while (resultSet.next()){
  35. UserBehavior userBehavior = new UserBehavior(
  36. resultSet.getInt("user_id"),
  37. resultSet.getInt("item_id"),
  38. resultSet.getInt("category_id"),
  39. resultSet.getString("behavior_type").trim(),
  40. resultSet.getLong("created_time")
  41. );
  42. ctx.collect(userBehavior);
  43. }
  44. }
  45. @Override
  46. public void cancel() {
  47. }
  48. private static Connection getConnection() {
  49. Connection con = null;
  50. try {
  51. Class.forName("com.mysql.jdbc.Driver");
  52. con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_learn?useUnicode=true&characterEncoding=UTF-8", "root", "12345678");
  53. } catch (ClassNotFoundException | SQLException e) {
  54. e.printStackTrace();
  55. }
  56. return con;
  57. }
  58. }