Kafka与HDFS集成

640-1-1.jpeg

Kafka与Flink集成

5.Kafka集成 - 图2

Kafka与Flume集成

5.Kafka集成 - 图3

Kafka与ELKStack集成

5.Kafka集成 - 图4

Kafka与Spark集成

5.Kafka集成 - 图55.Kafka集成 - 图6

Kafka与Strom集成

flume812.png

Kafka Connector

1112222222.png

简介

Kafka Connector是连接kafka集群和其他数据库、集群等系统的连接器。kafka connector可以进行多种系统类型与kafka的连接,主要的任务包括从kafka读(sink),向kafka写(Source),所以连接器也可以分为两种:Source Connector、Sink Connector。而kafka本身自带的Connector只有三个:file connector、Sqlite connector、hdfs connector。像连接类似于Mysql、Mongodb等数据库需要有对应的第三方库以Plugins的形式注册到kafka connector 上。

概念

image.png

连接器

实现了Connect API,决定需要运行多少个任务,按照任务进行数据分支,从work进行获取任务配置并传递下去。

任务

负责将数据一如或移除kafka。

Work进程

相当和connector和任务的容器,用于负责管理连接器的配置,启动连接器和连接器任务,提供REST API。

转换器

kafka connect和其他存储系统直接发送或者接受数据之间转换数据。

常用Connector

Kafka Connector很多,包括开源和商业版本的。如下列表中是常用的开源Connector。

Connectors References
Jdbc Source, Sink
Elastic

Search | Sink1, Sink2, Sink3 | | Cassandra | Source1, Source 2, Sink1, Sink2 | | MongoDB | Source | | HBase | Sink | | Syslog | Source | | MQTT (Source) | Source | | Twitter (Source) | Source, Sink | | S3 | Sink1, Sink2 |

案例

描述

使用两个connector,将文件source.txt的内容通过source连接器写入kafka主题中,然后写入sink.txt。

  1. FileStreamSource:从source.txt读取并发布到Broker中。
  2. FileStreamSink:从Broker中读取数据并写入sink.txt文件中。

    步骤

  3. Worker进程中用到的配置文件(${KAFKA_HOME}/config/connect-standalone.properties)。

    1. # kafka集群连接的地址
    2. bootstrap.servers = localhost:9092
    3. # 格式转换类
    4. key.converter=org.apache.kafka.connect.json.JsonConverter
    5. value.converter=org.apache.kafka.connect.json.JsonConverter
    6. # json消息中是否包含
    7. schemakey.converter.schemas.enable=true
    8. value.converter.schemas.enable=true
    9. # 保存偏移量的文件路径
    10. offset.storage.file.filename=/tmp/connect.offsets
    11. # 设定提交偏移量的频率
    12. offset.flush.interval.ms=10000
  4. Source使用的配置文件在(${KAFKA_HOME}/config/connect-file-souce.properties)。

    1. # 配置连接器的名称
    2. name= local-file-source
    3. # 连接器的全限定名称,设置类名称也是可以
    4. connector.class=FileStreamSource
    5. # task数量
    6. tasks.max=1
    7. # 数据源的文件路径
    8. file=/temp/source.txt
    9. # 主题名称
    10. topic=topic0701
  5. sink使用的配置文件是(${KAFKA_HOME}/config/connect-file-sink.properties)。

    1. # 配置连接器的名称
    2. name= local-file-source
    3. # 连接器的全限定名称,设置类名称也是可以
    4. connector.class=FileStreamSink
    5. # task数量
    6. tasks.max=1
    7. # 数据源的文件路径
    8. file=/temp/sink.txt
    9. # 主题名称
    10. topic=topic0701
  6. 启动source连接器。

    1. bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
  7. 启动sink连接器。

    1. bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
  8. source写入文本信息(打开sink.txt,就能看到)。

    1. echo Hello kafka,I coming;” >> /temp/source.txt
  9. 看sink文件。

    1. cat /temp/sink.txt