问题导读:
1.如何配置Flume对接Kafka?
2.如何实现Kafka的监控?
3.Kafka有哪些常见的面试题?
5 Flume对接Kafka**
1)配置flume(flume-kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 启动kafkaIDEA消费者
3) 进入flume根目录下,启动flume
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
4) 向 /opt/module/datas/flume.log里追加数据,查看kafka消费者消费情况
$ echo hello >> /opt/module/datas/flume.log
6 Kafka监控
6.1 Kafka Monitor
- 上传jar包KafkaOffsetMonitor-assembly-0.4.6.jar到集群
- 在/opt/module/下创建kafka-offset-console文件夹
- 将上传的jar包放入刚创建的目录下
在/opt/module/kafka-offset-console目录下创建启动脚本start.sh,内容如下:
#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \
--kafkaSecurityProtocol PLAINTEXT \
--zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &
在/opt/module/kafka-offset-console目录下创建mobile-logs文件夹
mkdir /opt/module/kafka-offset-console/mobile-logs
启动KafkaMonitor
./start.sh
登录页面hadoop102:8086端口查看详情
6.2 Kafka Manager
- 上传压缩包kafka-manager-1.3.3.15.zip到集群
- 解压到/opt/module
修改配置文件conf/application.conf
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
修改为:
kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
启动kafka-manager
bin/kafka-manager
登录hadoop102:9000页面查看详细信息
7 Kafka面试题
7.1 面试问题
- Kafka中的ISR、AR又代表什么?
- Kafka中的HW、LEO等分别代表什么?
- Kafka中是怎么体现消息顺序性的?
- Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
- Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
- “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
- 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
- 有哪些情形会造成重复消费?
- 那些情景会造成消息漏消费?
- .当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first
2)触发Controller的监听程序
3)kafka Controller 负责topic的创建工作,并更新metadata cache - topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
- topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
- Kafka有内部的topic吗?如果有是什么?有什么所用?
- Kafka分区分配的概念?
- 简述Kafka的日志目录结构?
- 如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
- 聊一聊Kafka Controller的作用?
- Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
- 失效副本是指什么?有那些应对措施?
- Kafka的那些设计让它有如此高的性能?