下载所需JRA包

  • flume-ng-sql-source-1.4.3.jar
  • oracle/mysql的jdbc驱动包

oracle数据库实时数据到Kafka

  1. 把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下
  2. 将oracle的jdbc驱动包放到flume的lib目录下
  3. 新建配置文件oracle-flume-kafka.conf
  1. agentOne.channels = channelOne
  2. agentOne.sources = sourceOne
  3. agentOne.sinks = sinkOne
  4. ###########sql source#################
  5. # For each one of the sources, the type is defined
  6. agentOne.sources.sourceOne.type = org.keedio.flume.source.SQLSource
  7. agentOne.sources.sourceOne.hibernate.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
  8. # Hibernate Database connection properties
  9. agentOne.sources.sourceOne.hibernate.connection.user = flume
  10. agentOne.sources.sourceOne.hibernate.connection.password = 1234
  11. agentOne.sources.sourceOne.hibernate.connection.autocommit = true
  12. agentOne.sources.sourceOne.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
  13. agentOne.sources.sourceOne.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
  14. agentOne.sources.sourceOne.run.query.delay=10000
  15. agentOne.sources.sourceOne.status.file.path = /tmp
  16. agentOne.sources.sourceOne.status.file.name = sqlSource.status
  17. # Custom query
  18. agentOne.sources.sourceOne.start.from = 0
  19. agentOne.sources.sourceOne.custom.query = select tno,tname from tuser where tno > $@$ order by tno asc
  20. agentOne.sources.sourceOne.batch.size = 1000
  21. agentOne.sources.sourceOne.max.rows = 1000
  22. agentOne.sources.sourceOne.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
  23. agentOne.sources.sourceOne.hibernate.c3p0.min_size=1
  24. agentOne.sources.sourceOne.hibernate.c3p0.max_size=10
  25. ##############################
  26. agentOne.channels.channelOne.type = memory
  27. agentOne.channels.channelOne.capacity = 10000
  28. agentOne.channels.channelOne.transactionCapacity = 10000
  29. agentOne.channels.channelOne.byteCapacityBufferPercentage = 20
  30. agentOne.channels.channelOne.byteCapacity = 800000
  31. agentOne.sinks.sinkOne.type = org.apache.flume.sink.kafka.KafkaSink
  32. agentOne.sinks.sinkOne.topic = test
  33. agentOne.sinks.sinkOne.brokerList = sys:9092
  34. agentOne.sinks.sinkOne.requiredAcks = 1
  35. agentOne.sinks.sinkOne.batchSize = 20
  36. agentOne.sinks.sinkOne.channel = channelOne
  37. agentOne.sinks.sinkOne.channel = channelOne
  38. agentOne.sources.sourceOne.channels=channelOne
  1. 开启flume
  1. flume-ng agent -n a1 -c conf -f mysql-flume-kafka.conf -Dflume.root.logger=INFO,console
  1. 创建kafka topic
  1. kafka-topics --create --zookeeper sys:2181 --replication-factor 1 --partitions 4 --topic topic1
  1. 开启kafka consumer
  1. kafka-console-consumer --bootstrap-server sys:9092 --topic topic1 --from-beginning

mysql数据库数据到kafka

  1. 把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下
  2. 将oracle的jdbc驱动包放到flume的lib目录下
  3. 新建配置文件oracle-flume-kafka.conf
  1. # 数据来源
  2. a1.channels = ch-1
  3. # 数据通道
  4. a1.sources = src-1
  5. # 数据去处
  6. a1.sinks = k1
  7. ###########sql source#################
  8. # For each one of the sources, the type is defined
  9. a1.sources.src-1.type = org.keedio.flume.source.SQLSource
  10. a1.sources.src-1.hibernate.connection.url = jdbc:mysql://sys:3306/cyl
  11. # Hibernate Database connection properties
  12. a1.sources.src-1.hibernate.connection.user = root
  13. a1.sources.src-1.hibernate.connection.password = 123456
  14. # 默认false,如果设为false就不会自动查询
  15. a1.sources.src-1.hibernate.connection.autocommit = true
  16. a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
  17. a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
  18. # 查询间隔,单位毫秒
  19. a1.sources.src-1.run.query.delay=5000
  20. # 声明保存flume状态的文件位置
  21. a1.sources.src-1.status.file.path = /root/flume
  22. a1.sources.src-1.status.file.name = sqlSource.status
  23. # Custom query
  24. a1.sources.src-1.start.from = 0
  25. # sql语句自定义,但是要注意:增量只能针对id字段即主键列,经测试系统默认如此.
  26. # 而且必须要将主键查询出来,因为如果不查询主键,flume无法记录上一次查询的位置.
  27. # $@$表示增量列上一次查询的值,记录在status文件中
  28. a1.sources.sqlSource.custom.query=
  29. SELECT nsc.servicecheck_id, ns.display_name, nh.alias, nsc.state, nsc.start_time, nsc.end_time, nsc.output, nsc.perfdata
  30. from nagios_servicechecks as nsc
  31. LEFT JOIN nagios_services as ns ON nsc.service_object_id = ns.service_object_id
  32. LEFT JOIN nagios_hosts as nh ON ns.host_object_id = nh.host_object_id
  33. WHERE ns.display_name = '901_CPU_load'
  34. AND nsc.servicecheck_id > $@$ ORDER BY nsc.servicecheck_id ASC
  35. # 设置分批参数
  36. a1.sources.src-1.batch.size = 1000
  37. a1.sources.src-1.max.rows = 1000
  38. # 设置c3p0连接池参数
  39. a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
  40. a1.sources.src-1.hibernate.c3p0.min_size=1
  41. a1.sources.src-1.hibernate.c3p0.max_size=10
  42. ################################################################
  43. # 选择通道为内存模式
  44. a1.channels.ch-1.type = memory
  45. a1.channels.ch-1.capacity = 10000
  46. a1.channels.ch-1.transactionCapacity = 10000
  47. a1.channels.ch-1.byteCapacityBufferPercentage = 20
  48. a1.channels.ch-1.byteCapacity = 800000
  49. ################################################################
  50. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  51. a1.sinks.k1.topic = tuser
  52. a1.sinks.k1.brokerList = sys:9092
  53. a1.sinks.k1.requiredAcks = 1
  54. a1.sinks.k1.batchSize = 20
  55. a1.sinks.k1.channel = c1
  56. a1.sinks.k1.channel = ch-1
  57. a1.sources.src-1.channels=ch-1
  1. agent.channels = ch1
  2. agent.sinks = kafkaSink
  3. agent.sources = sql-source
  4. agent.channels.ch1.type = memory
  5. agent.channels.ch1.capacity = 1000000
  6. agent.sources.sql-source.channels = ch1
  7. agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
  8. #org.apache.flume.source.SQLSource
  9. #org.keedio.flume.source.SQLSource
  10. # URL to connect to database
  11. agent.sources.sql-source.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
  12. # Database connection properties
  13. agent.sources.sql-source.user = user_name
  14. agent.sources.sql-source.password = passwd
  15. agent.sources.sql-source.table = tbl1
  16. agent.sources.sql-source.columns.to.select = *
  17. # Increment column properties
  18. agent.sources.sql-source.incremental.column.name = c1
  19. # Increment value is from you want to start taking data from tables (0 will import entire table)
  20. agent.sources.sql-source.incremental.value =1 #0
  21. # Query delay, each configured milisecond the query will be sent
  22. agent.sources.sql-source.run.query.delay=10000
  23. #Status file is used to save last readed row
  24. agent.sources.sql-source.status.file.path = /var/lib/flume
  25. agent.sources.sql-source.status.file.name = sql-source.status
  26. agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
  27. agent.sinks.kafkaSink.brokerList=sys:9092
  28. agent.sinks.kafkaSink.topic=test
  29. agent.sinks.kafkaSink.channel=ch1
  30. agent.sinks.kafkaSink.batchSize=10
  1. 开启flume
flume-ng agent -n a1 -c conf -f mysql-flume-kafka.conf  -Dflume.root.logger=INFO,console
  1. 创建kafka topic
kafka-topics --create --zookeeper sys:2181 --replication-factor 1  --partitions 4 --topic topic1
  1. 开启kafka consumer
kafka-console-consumer --bootstrap-server sys:9092 --topic topic1 --from-beginning
agent.channels = ch1
agent.sinks = kafkaSink
agent.sources = sql-source

agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1000000
agent.sources.sql-source.channels = ch1
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource 
#org.apache.flume.source.SQLSource 
#org.keedio.flume.source.SQLSource

# URL to connect to database
agent.sources.sql-source.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl

# Database connection properties
agent.sources.sql-source.user = user_name
agent.sources.sql-source.password = passwd
agent.sources.sql-source.table = tbl1
agent.sources.sql-source.columns.to.select = *

# Increment column properties
agent.sources.sql-source.incremental.column.name = c1

# Increment value is from you want to start taking data from tables (0 will import entire table)
agent.sources.sql-source.incremental.value =1 #0

# Query delay, each configured milisecond the query will be sent
agent.sources.sql-source.run.query.delay=10000

#Status file is used to save last readed row
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status 

agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.brokerList=sys:9092
agent.sinks.kafkaSink.topic=test
agent.sinks.kafkaSink.channel=ch1
agent.sinks.kafkaSink.batchSize=10

Windows环境下使用flume导出本地mysql数据到远程kafka(Linux)

  1. 下载所需软件包

    1. flume安装包修订版apache-flume-1.9.0.rar
    2. 驱动程序 database-flume-kafka.rar
    3. apache-flume-1.9.0.rar
  2. 配置环境变量 PATH

# 配置环境变量
FLUME_HOME=D:\apache-flume-1.8.0-bin.

# 在path变量中添加
%FLUME_HOME%\conf;%FLUME_HOME%\bin;
  1. 运行命令
flume-ng version
  1. 修改flume配置文件,并将驱动程序放到lib文件夹下

    1. 在flume的conf文件下,运行cmd,启动flume
flume-ng agent --conf ../conf --conf-file mysql-flume-kafka.conf --name a1

flume-ng agent --conf ../conf --conf-file oracle-flume-kafka.conf --name agentOne

读取文件到kafka

  1. 创建agent.conf,将配置文件放在conf文件夹中 ``` agent.channels = ch1 agent.sinks = kafkaSink agent.sources = file-source

配置source

agent.sources.file-source.type = exec agent.sources.file-source.command = tail -F /root/flume/target/traj.csv agent.sources.file-source.channels = ch1

agent.channels.ch1.type = file

检查点文件目录

agent.channels.ch1.checkpointDir=/opt/flume/checkpoint

缓存数据文件夹

agent.channels.ch1.dataDirs=/opt/flume/data

Status file is used to save last readed row

agent.sources.file-source.status.file.path = /opt/flume agent.sources.file-source.status.file.name = file-source.status

agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.brokerList=sys:9092 agent.sinks.kafkaSink.topic=test agent.sinks.kafkaSink.channel=ch1 agent.sinks.kafkaSink.batchSize=10 (base) [root@sys flume]# cat conf/agent.conf agent.channels = ch1 agent.sinks = kafkaSink agent.sources = file-source

配置source

agent.sources.file-source.type = exec agent.sources.file-source.command = tail -F /root/flume/target/traj.csv agent.sources.file-source.channels = ch1

agent.channels.ch1.type = file

检查点文件目录

agent.channels.ch1.checkpointDir=/opt/flume/checkpoint

缓存数据文件夹

agent.channels.ch1.dataDirs=/opt/flume/data

Status file is used to save last readed row

agent.sources.file-source.status.file.path = /opt/flume agent.sources.file-source.status.file.name = file-source.status

agent.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.brokerList=sys:9092 agent.sinks.kafkaSink.topic=test agent.sinks.kafkaSink.channel=ch1 agent.sinks.kafkaSink.batchSize=10


2. 开启flume服务

flume-ng agent -n agent -c conf -f conf/agent.conf -Dflume.root.logger=INFO,console


3. 开启kafka consumer

kafka-console-consumer —bootstrap-server sys:9092 —topic test —from-beginnin ```

  1. 观察到数据进入到kafka的指定topic中