下载所需JRA包
- flume-ng-sql-source-1.4.3.jar
- oracle/mysql的jdbc驱动包
oracle数据库实时数据到Kafka
- 把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下
- 将oracle的jdbc驱动包放到flume的lib目录下
- 新建配置文件oracle-flume-kafka.conf
agentOne.channels = channelOne
agentOne.sources = sourceOne
agentOne.sinks = sinkOne
###########sql source#################
# For each one of the sources, the type is defined
agentOne.sources.sourceOne.type = org.keedio.flume.source.SQLSource
agentOne.sources.sourceOne.hibernate.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
# Hibernate Database connection properties
agentOne.sources.sourceOne.hibernate.connection.user = flume
agentOne.sources.sourceOne.hibernate.connection.password = 1234
agentOne.sources.sourceOne.hibernate.connection.autocommit = true
agentOne.sources.sourceOne.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
agentOne.sources.sourceOne.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
agentOne.sources.sourceOne.run.query.delay=10000
agentOne.sources.sourceOne.status.file.path = /tmp
agentOne.sources.sourceOne.status.file.name = sqlSource.status
# Custom query
agentOne.sources.sourceOne.start.from = 0
agentOne.sources.sourceOne.custom.query = select tno,tname from tuser where tno > $@$ order by tno asc
agentOne.sources.sourceOne.batch.size = 1000
agentOne.sources.sourceOne.max.rows = 1000
agentOne.sources.sourceOne.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agentOne.sources.sourceOne.hibernate.c3p0.min_size=1
agentOne.sources.sourceOne.hibernate.c3p0.max_size=10
##############################
agentOne.channels.channelOne.type = memory
agentOne.channels.channelOne.capacity = 10000
agentOne.channels.channelOne.transactionCapacity = 10000
agentOne.channels.channelOne.byteCapacityBufferPercentage = 20
agentOne.channels.channelOne.byteCapacity = 800000
agentOne.sinks.sinkOne.type = org.apache.flume.sink.kafka.KafkaSink
agentOne.sinks.sinkOne.topic = test
agentOne.sinks.sinkOne.brokerList = sys:9092
agentOne.sinks.sinkOne.requiredAcks = 1
agentOne.sinks.sinkOne.batchSize = 20
agentOne.sinks.sinkOne.channel = channelOne
agentOne.sinks.sinkOne.channel = channelOne
agentOne.sources.sourceOne.channels=channelOne
- 开启flume
flume-ng agent -n a1 -c conf -f mysql-flume-kafka.conf -Dflume.root.logger=INFO,console
- 创建kafka topic
kafka-topics --create --zookeeper sys:2181 --replication-factor 1 --partitions 4 --topic topic1
- 开启kafka consumer
kafka-console-consumer --bootstrap-server sys:9092 --topic topic1 --from-beginning
mysql数据库数据到kafka
- 把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下
- 将oracle的jdbc驱动包放到flume的lib目录下
- 新建配置文件oracle-flume-kafka.conf
# 数据来源
a1.channels = ch-1
# 数据通道
a1.sources = src-1
# 数据去处
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://sys:3306/cyl
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = 123456
# 默认false,如果设为false就不会自动查询
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
# 查询间隔,单位毫秒
a1.sources.src-1.run.query.delay=5000
# 声明保存flume状态的文件位置
a1.sources.src-1.status.file.path = /root/flume
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.start.from = 0
# sql语句自定义,但是要注意:增量只能针对id字段即主键列,经测试系统默认如此.
# 而且必须要将主键查询出来,因为如果不查询主键,flume无法记录上一次查询的位置.
# $@$表示增量列上一次查询的值,记录在status文件中
a1.sources.sqlSource.custom.query=
SELECT nsc.servicecheck_id, ns.display_name, nh.alias, nsc.state, nsc.start_time, nsc.end_time, nsc.output, nsc.perfdata
from nagios_servicechecks as nsc
LEFT JOIN nagios_services as ns ON nsc.service_object_id = ns.service_object_id
LEFT JOIN nagios_hosts as nh ON ns.host_object_id = nh.host_object_id
WHERE ns.display_name = '901_CPU_load'
AND nsc.servicecheck_id > $@$ ORDER BY nsc.servicecheck_id ASC
# 设置分批参数
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
# 设置c3p0连接池参数
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
################################################################
# 选择通道为内存模式
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000
################################################################
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = tuser
a1.sinks.k1.brokerList = sys:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1
agent.channels = ch1
agent.sinks = kafkaSink
agent.sources = sql-source
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1000000
agent.sources.sql-source.channels = ch1
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
#org.apache.flume.source.SQLSource
#org.keedio.flume.source.SQLSource
# URL to connect to database
agent.sources.sql-source.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
# Database connection properties
agent.sources.sql-source.user = user_name
agent.sources.sql-source.password = passwd
agent.sources.sql-source.table = tbl1
agent.sources.sql-source.columns.to.select = *
# Increment column properties
agent.sources.sql-source.incremental.column.name = c1
# Increment value is from you want to start taking data from tables (0 will import entire table)
agent.sources.sql-source.incremental.value =1 #0
# Query delay, each configured milisecond the query will be sent
agent.sources.sql-source.run.query.delay=10000
#Status file is used to save last readed row
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.brokerList=sys:9092
agent.sinks.kafkaSink.topic=test
agent.sinks.kafkaSink.channel=ch1
agent.sinks.kafkaSink.batchSize=10
- 开启flume
flume-ng agent -n a1 -c conf -f mysql-flume-kafka.conf -Dflume.root.logger=INFO,console
- 创建kafka topic
kafka-topics --create --zookeeper sys:2181 --replication-factor 1 --partitions 4 --topic topic1
- 开启kafka consumer
kafka-console-consumer --bootstrap-server sys:9092 --topic topic1 --from-beginning
agent.channels = ch1
agent.sinks = kafkaSink
agent.sources = sql-source
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1000000
agent.sources.sql-source.channels = ch1
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
#org.apache.flume.source.SQLSource
#org.keedio.flume.source.SQLSource
# URL to connect to database
agent.sources.sql-source.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
# Database connection properties
agent.sources.sql-source.user = user_name
agent.sources.sql-source.password = passwd
agent.sources.sql-source.table = tbl1
agent.sources.sql-source.columns.to.select = *
# Increment column properties
agent.sources.sql-source.incremental.column.name = c1
# Increment value is from you want to start taking data from tables (0 will import entire table)
agent.sources.sql-source.incremental.value =1 #0
# Query delay, each configured milisecond the query will be sent
agent.sources.sql-source.run.query.delay=10000
#Status file is used to save last readed row
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.brokerList=sys:9092
agent.sinks.kafkaSink.topic=test
agent.sinks.kafkaSink.channel=ch1
agent.sinks.kafkaSink.batchSize=10
Windows环境下使用flume导出本地mysql数据到远程kafka(Linux)
下载所需软件包
配置环境变量 PATH
# 配置环境变量
FLUME_HOME=D:\apache-flume-1.8.0-bin.
# 在path变量中添加
%FLUME_HOME%\conf;%FLUME_HOME%\bin;
- 运行命令
flume-ng version
修改flume配置文件,并将驱动程序放到lib文件夹下
- 在flume的conf文件下,运行cmd,启动flume
flume-ng agent --conf ../conf --conf-file mysql-flume-kafka.conf --name a1
flume-ng agent --conf ../conf --conf-file oracle-flume-kafka.conf --name agentOne
读取文件到kafka
- 创建agent.conf,将配置文件放在conf文件夹中 ``` agent.channels = ch1 agent.sinks = kafkaSink agent.sources = file-source
配置source
agent.sources.file-source.type = exec agent.sources.file-source.command = tail -F /root/flume/target/traj.csv agent.sources.file-source.channels = ch1
agent.channels.ch1.type = file
检查点文件目录
agent.channels.ch1.checkpointDir=/opt/flume/checkpoint
缓存数据文件夹
agent.channels.ch1.dataDirs=/opt/flume/data
Status file is used to save last readed row
agent.sources.file-source.status.file.path = /opt/flume agent.sources.file-source.status.file.name = file-source.status
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.brokerList=sys:9092 agent.sinks.kafkaSink.topic=test agent.sinks.kafkaSink.channel=ch1 agent.sinks.kafkaSink.batchSize=10 (base) [root@sys flume]# cat conf/agent.conf agent.channels = ch1 agent.sinks = kafkaSink agent.sources = file-source
配置source
agent.sources.file-source.type = exec agent.sources.file-source.command = tail -F /root/flume/target/traj.csv agent.sources.file-source.channels = ch1
agent.channels.ch1.type = file
检查点文件目录
agent.channels.ch1.checkpointDir=/opt/flume/checkpoint
缓存数据文件夹
agent.channels.ch1.dataDirs=/opt/flume/data
Status file is used to save last readed row
agent.sources.file-source.status.file.path = /opt/flume agent.sources.file-source.status.file.name = file-source.status
agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.brokerList=sys:9092 agent.sinks.kafkaSink.topic=test agent.sinks.kafkaSink.channel=ch1 agent.sinks.kafkaSink.batchSize=10
2. 开启flume服务
flume-ng agent -n agent -c conf -f conf/agent.conf -Dflume.root.logger=INFO,console
3. 开启kafka consumer
kafka-console-consumer —bootstrap-server sys:9092 —topic test —from-beginnin ```
- 观察到数据进入到kafka的指定topic中