agent.channels.ch1.type = memory
agent.sources.sql-source.channels = ch1
agent.channels = ch1
agent.sinks = HDFS
agent.sources = sql-source
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test
agent.sources.sql-source.user = root
agent.sources.sql-source.password = 123456
agent.sources.sql-source.table = wlslog
agent.sources.sql-source.columns.to.select = *
agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0
agent.sources.sql-source.run.query.delay=5000
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status
agent.sinks.HDFS.type = logger
-----
agent.sinks.HDFS.channel = ch1
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.rollSize = 268435456
agent.sinks.HDFS.hdfs.rollInterval = 0
agent.sinks.HDFS.hdfs.rollCount = 0
------
f2.sources = r1
f2.channels = c1
f2.sinks = k1
# 这里用 自己定义的 SQLSource
f2.sources.r1.type = org.keedio.flume.source.SQLSource
f2.sources.r1.connectionurl = jdbc:mysql://172.16.1.127:3306/test
f2.sources.r1.user = yyj
f2.sources.r1.password = yyj
#f2.sources.r1.driverclass= oracle.jdbc.driver.OracleDriver
f2.sources.r1.filepath = /var/log/sqllog
f2.sources.r1.filename = sqlSource.status
#f2.sources.r1.customquery = select a.id,a.COUPON_id,b.id from USER_COUPON_CODE_1 a ,COUPON_CODE b where a.COUPON_id = b.id
#f2.sources.r1.customquery = select a.id,a.COUPON_id,b.id from USER_COUPON_CODE_1 a ,COUPON_CODE b where a.COUPON_id = b.id and a.id > $@$
f2.sources.r1.customquery = select * from USER_COUPON_CODE_1
f2.sources.r1.begin = 0
f2.sources.r1.autoincrementfield = a.id
f2.sources.r1.batchsize = 1000
#具体定义channel
f2.channels.c1.type = memory
f2.channels.c1.capacity = 1000
f2.channels.c1.transactionCapacity = 100
#具体定义sink
f2.sinks.k1.type = logger
#组装source、channel、sink
f2.sources.r1.channels = c1
f2.sinks.k1.channel = c1
测试过:
agent.sources = mysql
agent.sinks = k1
agent.channels = ch1
agent.sources.mysql.type = org.keedio.flume.source.SQLSource
agent.sources.mysql.hibernate.connection.url = jdbc:mysql://192.168.1.64:3306/test
agent.sources.mysql.hibernate.connection.user = root
agent.sources.mysql.hibernate.connection.password = 123456
agent.sources.mysql.table = wlslog
agent.sources.mysql.columns.to.select = *
agent.sources.mysql.hibernate.connection.autocommit = true
agent.sources.mysql.incremental.column.name = id
agent.sources.mysql.incremental.value = 0
agent.sources.mysql.status.file.path = /var/lib/flume
agent.sources.mysql.status.file.name = sql-source.status1
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.channels.ch1.transactionCapacity = 1000
agent.sinks.k1.type=logger
agent.sources.mysql.channels=ch1
agent.sinks.k1.channel=ch1