Kafka 安装
一、环境配置
- Java 1.8 环境配置
- Scala 2.11 环境配置
- zookeeper 3.4.6 安装
二、下载 与 安装
1. 下载与 Scala 相同的版本的 Kafka
1. 下载 Binary downloads Scala 2.11 - kafka_2.11-1.0.1.tgz 具体根据 Scala 版本决定2. 解压 tar -zxvf kafka_2.11-1.0.1.tgz3. 环境配置 vim ~/.bashrc# Kafkaexport KAFKA_HOME=/usr/local/kafkaexport KAFKA_CONF_DIR=$KAFKA_HOME/configexport PATH=$KAFKA_HOME/bin:$PATH
三、操作
1. 启动其中一个 Kafka broker 代理 Server
1. 修改配置文件 cp $KAFKA_CONF_DIR/server.properties $KAFKA_CONF_DIR/server-1.properties vim $KAFKA_CONF_DIR/server-1.properties # 集群代理 id , 指定到同一个 zookeeper 集群的 kafka 集群代理 , 必须是唯一的 broker.id=1 # 监听地址和段口 listeners=PLAINTEXT://[your ip]:9092 # 配置 zookeeper 集群地址 zookeeper.connect=zookeeper-hostname:2181 # 日志目录地址 log.dirs=/data/log/kafka/kafka-logs2. 启动 Kafka broker 代理 (指定配置文件) 1) 启动一个 $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_CONF_DIR/server-1.properties & netstat -tunlp | grep 9092 查看是否启动
2. Kafka 使用
1. Topic 主题 kafka-topics.sh 参数) 1) 创建 Topic kafka-topics.sh --create --zookeeper zookeeper-hostname:2181 --replication-factor 1 --partitions 1 --topic test 2) 创建一个主题和 2 个复制因子(--replication-factor 不能超过 broker 服务的数量) kafka-topics.sh --create --zookeeper zookeeper-hostname:2181 --replication-factor 2 --partitions 3 --topic test 3) 删除主题 kafka-topics.sh --zookeeper zookeeper-hostname:2181 --delete --topic "clicki_info_topic"2. 查看 Topic 1) 查看 Topic 列表 kafka-topics.sh --list --zookeeper zookeeper-hostname:2181 2) 查看单个 Topic 详情 kafka-topics.sh --describe --zookeeper zookeeper-hostname:2181 --topic test Leader : Leader 所在 Broker replicas : 副本所在 Broker Isr : 这个副本列表的子集目前活着的和以后的领导人3. Topic Partitions 分区 kafka-add-partitions.sh 参数) 1) Topic 增加 partition 数目 kafka-add-partitions.sh kafka-add-partitions.sh --topic test --partition 2 --zookeeper zookeeper-hostname:2181,zookeeper-hostname:2181 (为topic test增加2个分区) 2)查询 topic 中 offset 的最大和最小值 # 例如 -1 最大值 kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list broker-hostname:9092 --topic test --time -1 参数: --time -1 最大 , -2 最小 结果: 主题 : 分区 : offset 最大值 test:2:29 test:1:27 test:0:263. producer 生产者 kafka-console-producer.sh 参数) 1) 向 Topic 生产数据 kafka-console-producer.sh --broker-list broker-hostname:9092 --topic test 2) 从文件读取数据 kafka-console-producer.sh --broker-list broker-hostname:9092 --topic test < file-input.txt4. consumer 消费者 kafka-console-consumer.sh 参数) --offset <String: consume offset> latest 最新, 默认 earliest 最早 --max-messages <Integer: num_messages> 退出前要使用的最大消息数。如果没有设置,消费是持续的 100000 --partition <Integer: partition> 要使用的分区。消耗从分区的末尾开始,除非指定了“—offset”。 2 --from-beginning 从最开始, 如果使用者还没有一个已建立的偏移量,那么从日志中出现的最早消息开始,而不是从最新消息开始。 --property print.key=true 打印 key 1) 从 Topic 消费数据 # 从 zookeeper 中消费 kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic test --from-beginning 2)从 broker 指定组消费数据 kafka-console-consumer.sh --bootstrap-server broker-hostname:9092 --topic test --consumer-property group.id=test-consumer-group 3)读取配置文件消费 kafka-console-consumer.sh --bootstrap-server broker-hostname:9092 --topic test --consumer.config $KAFKA_HOME/config/consumer.properties 4) 指定分区消费,从最新开始消费 kafka-console-consumer.sh --bootstrap-server broker-hostname:9092 --topic test --consumer-property group.id=test-consumer-group --partition 2 --offset latest5. consumer groups kafka-consumer-groups.sh 参数) 1) 查看 consumer groups # 查看保存在 zookeeper 中的组 kafka-consumer-groups.sh --zookeeper zookeeper:2181 --list # 查看保存在 broker 中的组 kafka-consumer-groups.sh --bootstrap-server broker-hostname:9092 --list # 查看保存在 broker 中的 group 的 offset 信息, 当前消费到的 CURRENT-OFFSET 和 LOG-END-OFFSET 值 kafka-consumer-groups.sh --bootstrap-server broker-hostname:9092 --describe --group test-consumer-group 2) 重置 offset kafka-consumer-groups.sh --bootstrap-server broker-hostname-1:9092,broker-hostname-2:9092,broker-hostname-3:9092 --reset-offsets --group test-consumer-group --topic test --to-offset 16. 使用卡夫卡连接到导入/导出数据 1) 配置文件 $KAFKA_CONF_DIR/connect-standalone.properties 卡夫卡连接的配置过程,包含常见的配置如卡夫卡代理连接和数据的序列化格式 $KAFKA_CONF_DIR/connect-file-source.properties 2) kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group accessLogBase --topic accessLog --zookeeper uhadoop-ociicy-master1:2181/kafka7. kafka-configs 配置 Kafka 参数