下载所需JRA包
- flume-ng-sql-source-1.4.3.jar
- oracle/mysql的jdbc驱动包
oracle数据库实时数据到Kafka
- 把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下
- 将oracle的jdbc驱动包放到flume的lib目录下
- 新建配置文件oracle-flume-kafka.conf
agentOne.channels = channelOneagentOne.sources = sourceOneagentOne.sinks = sinkOne###########sql source################## For each one of the sources, the type is definedagentOne.sources.sourceOne.type = org.keedio.flume.source.SQLSourceagentOne.sources.sourceOne.hibernate.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl# Hibernate Database connection propertiesagentOne.sources.sourceOne.hibernate.connection.user = flumeagentOne.sources.sourceOne.hibernate.connection.password = 1234agentOne.sources.sourceOne.hibernate.connection.autocommit = trueagentOne.sources.sourceOne.hibernate.dialect = org.hibernate.dialect.Oracle10gDialectagentOne.sources.sourceOne.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriveragentOne.sources.sourceOne.run.query.delay=10000agentOne.sources.sourceOne.status.file.path = /tmpagentOne.sources.sourceOne.status.file.name = sqlSource.status# Custom queryagentOne.sources.sourceOne.start.from = 0agentOne.sources.sourceOne.custom.query = select tno,tname from tuser where tno > $@$ order by tno ascagentOne.sources.sourceOne.batch.size = 1000agentOne.sources.sourceOne.max.rows = 1000agentOne.sources.sourceOne.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvideragentOne.sources.sourceOne.hibernate.c3p0.min_size=1agentOne.sources.sourceOne.hibernate.c3p0.max_size=10##############################agentOne.channels.channelOne.type = memoryagentOne.channels.channelOne.capacity = 10000agentOne.channels.channelOne.transactionCapacity = 10000agentOne.channels.channelOne.byteCapacityBufferPercentage = 20agentOne.channels.channelOne.byteCapacity = 800000agentOne.sinks.sinkOne.type = org.apache.flume.sink.kafka.KafkaSinkagentOne.sinks.sinkOne.topic = testagentOne.sinks.sinkOne.brokerList = sys:9092agentOne.sinks.sinkOne.requiredAcks = 1agentOne.sinks.sinkOne.batchSize = 20agentOne.sinks.sinkOne.channel = channelOneagentOne.sinks.sinkOne.channel = channelOneagentOne.sources.sourceOne.channels=channelOne
- 开启flume
flume-ng agent -n a1 -c conf -f mysql-flume-kafka.conf -Dflume.root.logger=INFO,console
- 创建kafka topic
kafka-topics --create --zookeeper sys:2181 --replication-factor 1 --partitions 4 --topic topic1
- 开启kafka consumer
kafka-console-consumer --bootstrap-server sys:9092 --topic topic1 --from-beginning
mysql数据库数据到kafka
- 把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下
- 将oracle的jdbc驱动包放到flume的lib目录下
- 新建配置文件oracle-flume-kafka.conf
# 数据来源a1.channels = ch-1# 数据通道a1.sources = src-1# 数据去处a1.sinks = k1###########sql source################## For each one of the sources, the type is defineda1.sources.src-1.type = org.keedio.flume.source.SQLSourcea1.sources.src-1.hibernate.connection.url = jdbc:mysql://sys:3306/cyl# Hibernate Database connection propertiesa1.sources.src-1.hibernate.connection.user = roota1.sources.src-1.hibernate.connection.password = 123456# 默认false,如果设为false就不会自动查询a1.sources.src-1.hibernate.connection.autocommit = truea1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialecta1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver# 查询间隔,单位毫秒a1.sources.src-1.run.query.delay=5000# 声明保存flume状态的文件位置a1.sources.src-1.status.file.path = /root/flumea1.sources.src-1.status.file.name = sqlSource.status# Custom querya1.sources.src-1.start.from = 0# sql语句自定义,但是要注意:增量只能针对id字段即主键列,经测试系统默认如此.# 而且必须要将主键查询出来,因为如果不查询主键,flume无法记录上一次查询的位置.# $@$表示增量列上一次查询的值,记录在status文件中a1.sources.sqlSource.custom.query=SELECT nsc.servicecheck_id, ns.display_name, nh.alias, nsc.state, nsc.start_time, nsc.end_time, nsc.output, nsc.perfdatafrom nagios_servicechecks as nscLEFT JOIN nagios_services as ns ON nsc.service_object_id = ns.service_object_idLEFT JOIN nagios_hosts as nh ON ns.host_object_id = nh.host_object_idWHERE ns.display_name = '901_CPU_load'AND nsc.servicecheck_id > $@$ ORDER BY nsc.servicecheck_id ASC# 设置分批参数a1.sources.src-1.batch.size = 1000a1.sources.src-1.max.rows = 1000# 设置c3p0连接池参数a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvidera1.sources.src-1.hibernate.c3p0.min_size=1a1.sources.src-1.hibernate.c3p0.max_size=10################################################################# 选择通道为内存模式a1.channels.ch-1.type = memorya1.channels.ch-1.capacity = 10000a1.channels.ch-1.transactionCapacity = 10000a1.channels.ch-1.byteCapacityBufferPercentage = 20a1.channels.ch-1.byteCapacity = 800000################################################################a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = tusera1.sinks.k1.brokerList = sys:9092a1.sinks.k1.requiredAcks = 1a1.sinks.k1.batchSize = 20a1.sinks.k1.channel = c1a1.sinks.k1.channel = ch-1a1.sources.src-1.channels=ch-1
agent.channels = ch1agent.sinks = kafkaSinkagent.sources = sql-sourceagent.channels.ch1.type = memoryagent.channels.ch1.capacity = 1000000agent.sources.sql-source.channels = ch1agent.sources.sql-source.type = org.keedio.flume.source.SQLSource#org.apache.flume.source.SQLSource#org.keedio.flume.source.SQLSource# URL to connect to databaseagent.sources.sql-source.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl# Database connection propertiesagent.sources.sql-source.user = user_nameagent.sources.sql-source.password = passwdagent.sources.sql-source.table = tbl1agent.sources.sql-source.columns.to.select = *# Increment column propertiesagent.sources.sql-source.incremental.column.name = c1# Increment value is from you want to start taking data from tables (0 will import entire table)agent.sources.sql-source.incremental.value =1 #0# Query delay, each configured milisecond the query will be sentagent.sources.sql-source.run.query.delay=10000#Status file is used to save last readed rowagent.sources.sql-source.status.file.path = /var/lib/flumeagent.sources.sql-source.status.file.name = sql-source.statusagent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSinkagent.sinks.kafkaSink.brokerList=sys:9092agent.sinks.kafkaSink.topic=testagent.sinks.kafkaSink.channel=ch1agent.sinks.kafkaSink.batchSize=10
- 开启flume
flume-ng agent -n a1 -c conf -f mysql-flume-kafka.conf -Dflume.root.logger=INFO,console
- 创建kafka topic
kafka-topics --create --zookeeper sys:2181 --replication-factor 1 --partitions 4 --topic topic1
- 开启kafka consumer
kafka-console-consumer --bootstrap-server sys:9092 --topic topic1 --from-beginning
agent.channels = ch1
agent.sinks = kafkaSink
agent.sources = sql-source
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1000000
agent.sources.sql-source.channels = ch1
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
#org.apache.flume.source.SQLSource
#org.keedio.flume.source.SQLSource
# URL to connect to database
agent.sources.sql-source.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
# Database connection properties
agent.sources.sql-source.user = user_name
agent.sources.sql-source.password = passwd
agent.sources.sql-source.table = tbl1
agent.sources.sql-source.columns.to.select = *
# Increment column properties
agent.sources.sql-source.incremental.column.name = c1
# Increment value is from you want to start taking data from tables (0 will import entire table)
agent.sources.sql-source.incremental.value =1 #0
# Query delay, each configured milisecond the query will be sent
agent.sources.sql-source.run.query.delay=10000
#Status file is used to save last readed row
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.brokerList=sys:9092
agent.sinks.kafkaSink.topic=test
agent.sinks.kafkaSink.channel=ch1
agent.sinks.kafkaSink.batchSize=10
Windows环境下使用flume导出本地mysql数据到远程kafka(Linux)
下载所需软件包
配置环境变量 PATH
# 配置环境变量
FLUME_HOME=D:\apache-flume-1.8.0-bin.
# 在path变量中添加
%FLUME_HOME%\conf;%FLUME_HOME%\bin;
- 运行命令
flume-ng version
修改flume配置文件,并将驱动程序放到lib文件夹下
- 在flume的conf文件下,运行cmd,启动flume
flume-ng agent --conf ../conf --conf-file mysql-flume-kafka.conf --name a1
flume-ng agent --conf ../conf --conf-file oracle-flume-kafka.conf --name agentOne
读取文件到kafka
- 创建agent.conf,将配置文件放在conf文件夹中 ``` agent.channels = ch1 agent.sinks = kafkaSink agent.sources = file-source
配置source
agent.sources.file-source.type = exec agent.sources.file-source.command = tail -F /root/flume/target/traj.csv agent.sources.file-source.channels = ch1
agent.channels.ch1.type = file
检查点文件目录
agent.channels.ch1.checkpointDir=/opt/flume/checkpoint
缓存数据文件夹
agent.channels.ch1.dataDirs=/opt/flume/data
Status file is used to save last readed row
agent.sources.file-source.status.file.path = /opt/flume agent.sources.file-source.status.file.name = file-source.status
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.brokerList=sys:9092 agent.sinks.kafkaSink.topic=test agent.sinks.kafkaSink.channel=ch1 agent.sinks.kafkaSink.batchSize=10 (base) [root@sys flume]# cat conf/agent.conf agent.channels = ch1 agent.sinks = kafkaSink agent.sources = file-source
配置source
agent.sources.file-source.type = exec agent.sources.file-source.command = tail -F /root/flume/target/traj.csv agent.sources.file-source.channels = ch1
agent.channels.ch1.type = file
检查点文件目录
agent.channels.ch1.checkpointDir=/opt/flume/checkpoint
缓存数据文件夹
agent.channels.ch1.dataDirs=/opt/flume/data
Status file is used to save last readed row
agent.sources.file-source.status.file.path = /opt/flume agent.sources.file-source.status.file.name = file-source.status
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.brokerList=sys:9092 agent.sinks.kafkaSink.topic=test agent.sinks.kafkaSink.channel=ch1 agent.sinks.kafkaSink.batchSize=10
2. 开启flume服务
flume-ng agent -n agent -c conf -f conf/agent.conf -Dflume.root.logger=INFO,console
3. 开启kafka consumer
kafka-console-consumer —bootstrap-server sys:9092 —topic test —from-beginnin ```
- 观察到数据进入到kafka的指定topic中
