1. agent.channels.ch1.type = memory
    2. agent.sources.sql-source.channels = ch1
    3. agent.channels = ch1
    4. agent.sinks = HDFS
    5. agent.sources = sql-source
    6. agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
    7. agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test
    8. agent.sources.sql-source.user = root
    9. agent.sources.sql-source.password = 123456
    10. agent.sources.sql-source.table = wlslog
    11. agent.sources.sql-source.columns.to.select = *
    12. agent.sources.sql-source.incremental.column.name = id
    13. agent.sources.sql-source.incremental.value = 0
    14. agent.sources.sql-source.run.query.delay=5000
    15. agent.sources.sql-source.status.file.path = /var/lib/flume
    16. agent.sources.sql-source.status.file.name = sql-source.status
    17. agent.sinks.HDFS.type = logger
    18. -----
    19. agent.sinks.HDFS.channel = ch1
    20. agent.sinks.HDFS.type = hdfs
    21. agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql
    22. agent.sinks.HDFS.hdfs.fileType = DataStream
    23. agent.sinks.HDFS.hdfs.writeFormat = Text
    24. agent.sinks.HDFS.hdfs.rollSize = 268435456
    25. agent.sinks.HDFS.hdfs.rollInterval = 0
    26. agent.sinks.HDFS.hdfs.rollCount = 0
    27. ------
    1. f2.sources = r1
    2. f2.channels = c1
    3. f2.sinks = k1
    4. # 这里用 自己定义的 SQLSource
    5. f2.sources.r1.type = org.keedio.flume.source.SQLSource
    6. f2.sources.r1.connectionurl = jdbc:mysql://172.16.1.127:3306/test
    7. f2.sources.r1.user = yyj
    8. f2.sources.r1.password = yyj
    9. #f2.sources.r1.driverclass= oracle.jdbc.driver.OracleDriver
    10. f2.sources.r1.filepath = /var/log/sqllog
    11. f2.sources.r1.filename = sqlSource.status
    12. #f2.sources.r1.customquery = select a.id,a.COUPON_id,b.id from USER_COUPON_CODE_1 a ,COUPON_CODE b where a.COUPON_id = b.id
    13. #f2.sources.r1.customquery = select a.id,a.COUPON_id,b.id from USER_COUPON_CODE_1 a ,COUPON_CODE b where a.COUPON_id = b.id and a.id > $@$
    14. f2.sources.r1.customquery = select * from USER_COUPON_CODE_1
    15. f2.sources.r1.begin = 0
    16. f2.sources.r1.autoincrementfield = a.id
    17. f2.sources.r1.batchsize = 1000
    18. #具体定义channel
    19. f2.channels.c1.type = memory
    20. f2.channels.c1.capacity = 1000
    21. f2.channels.c1.transactionCapacity = 100
    22. #具体定义sink
    23. f2.sinks.k1.type = logger
    24. #组装source、channel、sink
    25. f2.sources.r1.channels = c1
    26. f2.sinks.k1.channel = c1

    测试过:

    1. agent.sources = mysql
    2. agent.sinks = k1
    3. agent.channels = ch1
    4. agent.sources.mysql.type = org.keedio.flume.source.SQLSource
    5. agent.sources.mysql.hibernate.connection.url = jdbc:mysql://192.168.1.64:3306/test
    6. agent.sources.mysql.hibernate.connection.user = root
    7. agent.sources.mysql.hibernate.connection.password = 123456
    8. agent.sources.mysql.table = wlslog
    9. agent.sources.mysql.columns.to.select = *
    10. agent.sources.mysql.hibernate.connection.autocommit = true
    11. agent.sources.mysql.incremental.column.name = id
    12. agent.sources.mysql.incremental.value = 0
    13. agent.sources.mysql.status.file.path = /var/lib/flume
    14. agent.sources.mysql.status.file.name = sql-source.status1
    15. agent.channels.ch1.type = memory
    16. agent.channels.ch1.capacity = 10000
    17. agent.channels.ch1.transactionCapacity = 1000
    18. agent.sinks.k1.type=logger
    19. agent.sources.mysql.channels=ch1
    20. agent.sinks.k1.channel=ch1