问题导读:
    1.如何配置Flume对接Kafka?
    2.如何实现Kafka的监控?
    3.Kafka有哪些常见的面试题?


    5 Flume对接Kafka**
    1)配置flume(flume-kafka.conf)

    1. # define
    2. a1.sources = r1
    3. a1.sinks = k1
    4. a1.channels = c1
    5. # source
    6. a1.sources.r1.type = exec
    7. a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
    8. a1.sources.r1.shell = /bin/bash -c
    9. # sink
    10. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    11. a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    12. a1.sinks.k1.kafka.topic = first
    13. a1.sinks.k1.kafka.flumeBatchSize = 20
    14. a1.sinks.k1.kafka.producer.acks = 1
    15. a1.sinks.k1.kafka.producer.linger.ms = 1
    16. # channel
    17. a1.channels.c1.type = memory
    18. a1.channels.c1.capacity = 1000
    19. a1.channels.c1.transactionCapacity = 100
    20. # bind
    21. a1.sources.r1.channels = c1
    22. a1.sinks.k1.channel = c1

    2) 启动kafkaIDEA消费者
    3) 进入flume根目录下,启动flume

    1. $ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

    4) 向 /opt/module/datas/flume.log里追加数据,查看kafka消费者消费情况

    1. $ echo hello >> /opt/module/datas/flume.log

    6 Kafka监控
    6.1 Kafka Monitor

    • 上传jar包KafkaOffsetMonitor-assembly-0.4.6.jar到集群
    • 在/opt/module/下创建kafka-offset-console文件夹
    • 将上传的jar包放入刚创建的目录下
    • 在/opt/module/kafka-offset-console目录下创建启动脚本start.sh,内容如下:

      1. #!/bin/bash
      2. java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
      3. com.quantifind.kafka.offsetapp.OffsetGetterWeb \
      4. --offsetStorage kafka \
      5. --kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \
      6. --kafkaSecurityProtocol PLAINTEXT \
      7. --zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \
      8. --port 8086 \
      9. --refresh 10.seconds \
      10. --retain 2.days \
      11. --dbName offsetapp_kafka &
    • 在/opt/module/kafka-offset-console目录下创建mobile-logs文件夹

      1. mkdir /opt/module/kafka-offset-console/mobile-logs
    • 启动KafkaMonitor

      1. ./start.sh
    • 登录页面hadoop102:8086端口查看详情

    6.2 Kafka Manager

    • 上传压缩包kafka-manager-1.3.3.15.zip到集群
    • 解压到/opt/module
    • 修改配置文件conf/application.conf

      1. kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
    • 修改为:

      1. kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
    • 启动kafka-manager

      1. bin/kafka-manager
    • 登录hadoop102:9000页面查看详细信息

    7 Kafka面试题
    7.1 面试问题

    • Kafka中的ISR、AR又代表什么?
    • Kafka中的HW、LEO等分别代表什么?
    • Kafka中是怎么体现消息顺序性的?
    • Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
    • Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
    • “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
    • 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
    • 有哪些情形会造成重复消费?
    • 那些情景会造成消息漏消费?
    • .当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
      1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first
      2)触发Controller的监听程序
      3)kafka Controller 负责topic的创建工作,并更新metadata cache
    • topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
    • topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
    • Kafka有内部的topic吗?如果有是什么?有什么所用?
    • Kafka分区分配的概念?
    • 简述Kafka的日志目录结构?
    • 如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
    • 聊一聊Kafka Controller的作用?
    • Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
    • 失效副本是指什么?有那些应对措施?
    • Kafka的那些设计让它有如此高的性能?