Kafka与HDFS集成
Kafka与Flink集成
Kafka与Flume集成
Kafka与ELKStack集成
Kafka与Spark集成
Kafka与Strom集成
Kafka Connector
简介
Kafka Connector是连接kafka集群和其他数据库、集群等系统的连接器。kafka connector可以进行多种系统类型与kafka的连接,主要的任务包括从kafka读(sink),向kafka写(Source),所以连接器也可以分为两种:Source Connector、Sink Connector。而kafka本身自带的Connector只有三个:file connector、Sqlite connector、hdfs connector。像连接类似于Mysql、Mongodb等数据库需要有对应的第三方库以Plugins的形式注册到kafka connector 上。
概念
连接器
实现了Connect API,决定需要运行多少个任务,按照任务进行数据分支,从work进行获取任务配置并传递下去。
任务
Work进程
相当和connector和任务的容器,用于负责管理连接器的配置,启动连接器和连接器任务,提供REST API。
转换器
kafka connect和其他存储系统直接发送或者接受数据之间转换数据。
常用Connector
Kafka Connector很多,包括开源和商业版本的。如下列表中是常用的开源Connector。
Connectors | References |
---|---|
Jdbc | Source, Sink |
Elastic |
Search | Sink1, Sink2, Sink3 | | Cassandra | Source1, Source 2, Sink1, Sink2 | | MongoDB | Source | | HBase | Sink | | Syslog | Source | | MQTT (Source) | Source | | Twitter (Source) | Source, Sink | | S3 | Sink1, Sink2 |
案例
描述
使用两个connector,将文件source.txt的内容通过source连接器写入kafka主题中,然后写入sink.txt。
- FileStreamSource:从source.txt读取并发布到Broker中。
FileStreamSink:从Broker中读取数据并写入sink.txt文件中。
步骤
Worker进程中用到的配置文件(${KAFKA_HOME}/config/connect-standalone.properties)。
# kafka集群连接的地址
bootstrap.servers = localhost:9092
# 格式转换类
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# json消息中是否包含
schemakey.converter.schemas.enable=true
value.converter.schemas.enable=true
# 保存偏移量的文件路径
offset.storage.file.filename=/tmp/connect.offsets
# 设定提交偏移量的频率
offset.flush.interval.ms=10000
Source使用的配置文件在(${KAFKA_HOME}/config/connect-file-souce.properties)。
# 配置连接器的名称
name= local-file-source
# 连接器的全限定名称,设置类名称也是可以
connector.class=FileStreamSource
# task数量
tasks.max=1
# 数据源的文件路径
file=/temp/source.txt
# 主题名称
topic=topic0701
sink使用的配置文件是(${KAFKA_HOME}/config/connect-file-sink.properties)。
# 配置连接器的名称
name= local-file-source
# 连接器的全限定名称,设置类名称也是可以
connector.class=FileStreamSink
# task数量
tasks.max=1
# 数据源的文件路径
file=/temp/sink.txt
# 主题名称
topic=topic0701
启动source连接器。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
启动sink连接器。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
source写入文本信息(打开sink.txt,就能看到)。
echo “Hello kafka,I coming;” >> /temp/source.txt
看sink文件。
cat /temp/sink.txt