介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用Go语言发送和接收kafka消息。


工作流程
安装
1.下载
https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz
2.解压
tar -zxvf kafka_2.12-3.1.0.tgz
mv kafka_2.12-3.1.0 kafka
cd kafka
3.启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
4.启动kafka
bin/kafka-server-start.sh config/server.properties
5.修改配置,如不修改默认寻找主机名,需要配置hosts文件
vim config/server.properties
listeners=PLAINTEXT://192.168.72.10:9092
命令总结
1.手动消费topic(从头开始消费)
./kafka-console-consumer.sh —bootstrap-server=”192.168.72.10:9092” —topic=”test” —from-beginning
#指定消费者组和消费者id
sh kafka-console-consumer.sh —bootstrap-server 192.168.72.10:9092 —consumer-property group.id=test-group —consumer-property client.id=new-consumer-cl —topic test
2.查看所有的topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —list
3.创建topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —create —topic test
4.删除topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —delete—topic test
5.生产消息
./kafka-console-producer.sh —bootstrap-server=”192.168.72.10:9092” —topic=”test”
6.查看topic详细情况
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —topic test —describe
7.查看所有消费者组
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —list
8.查看消费者组的消费情况(消息堆积情况)
3.1.0版本执行以下命令
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe —all-groups
2.x可能执行以下命令
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe-all
指定group查看
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe —group console-consumer-36067
参数说明:
GROUP:消费者组
TOPIC:主题
PARTITION:分区
CURRENT-OFFSET:当前已消费偏移量
LOG-END-OFFSET:当前生产消息偏移量
LAG:消息堆积数量=(当前生产消息偏移量-当前已消费偏移量)
CONSUMER-ID:消费者ID
HOST:客户端IP
CLIENT-ID:客户端ID
9.删除消费者组
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —delete—group console-consumer-36067
golang操作
生产者代码
package mainimport ("fmt""github.com/Shopify/sarama")func main() {//初始化配置config := sarama.NewConfig()//设置生产者的ACK消息应答机制为allconfig.Producer.RequiredAcks = sarama.WaitForAll//指定生产者消息些人选定partition分区的策略config.Producer.Partitioner = sarama.NewRandomPartitioner//成功消费的消息将在success channel返回config.Producer.Return.Successes = true//构造一个消息msg := &sarama.ProducerMessage{}msg.Topic = "test"msg.Value = sarama.StringEncoder("this is a test log")//连接客户端client, err := sarama.NewSyncProducer([]string{"192.168.72.10:9092"}, config)if err != nil {fmt.Println("client connect err", err)return}defer client.Close()//发送消息pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send message err", err)return}fmt.Printf("pid:%v offset:%v", pid, offset)}
消费者代码
package mainimport ("fmt""github.com/Shopify/sarama")func main() {consumer, err := sarama.NewConsumer([]string{"192.168.72.10:9092"}, nil)if err != nil {fmt.Println("create consumer err", err)return}//根据topic获取分区partitions, err := consumer.Partitions("aa")if err != nil {fmt.Println("get topic partitions err", err)return}fmt.Println(partitions)for partition := range partitions {// 针对每个分区创建一个对应的分区消费者pc, err := consumer.ConsumePartition("aa", int32(partition), sarama.OffsetNewest)defer pc.AsyncClose()if err != nil {fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)return}// 异步从每个分区消费信息go func(sarama.PartitionConsumer) {for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)}}(pc)}//阻塞select {}}
消费组代码
package mainimport ("context""fmt""github.com/Shopify/sarama""sync""time")type Consumer struct {ready chan boolconsumerName string}func main() {consumer := &Consumer{ready: make(chan bool),consumerName: "consumer1",}//consumer2 := &Consumer{// ready: make(chan bool),// consumerName: "consumer2",//}ctx, consumerCancel := context.WithCancel(context.Background())wg := &sync.WaitGroup{}wg.Add(2)go startConsumer(ctx, []string{"test"}, consumer)//go startConsumer(ctx, []string{"test"}, consumer2)wg.Wait()consumerCancel()}func (consumer *Consumer) Setup(_ sarama.ConsumerGroupSession) error {// Mark the consumer as readyclose(consumer.ready)return nil}func (consumer *Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() {fmt.Printf("ConsumerName:%s Message claimed: value = %s, topic = %s,partition=%d", consumer.consumerName, string(message.Value), message.Topic, message.Partition)session.MarkMessage(message, "")}return nil}func startConsumer(ctx context.Context, topics []string, consumer *Consumer) {config := sarama.NewConfig()//config.Consumer.Return.Errors = true//config.Version = sarama.V0_11_0_2//config.Net.SASL.Enable = true//config.Net.SASL.User = "admin"//config.Net.SASL.Password = "admin"// consumerconsumerGroup, err := sarama.NewConsumerGroup([]string{"192.168.72.10:9092"}, "test-group", config)if err != nil {fmt.Printf("NewConsumerGroup create consumer error %s\n", err.Error())return}wg := &sync.WaitGroup{}wg.Add(1)go func() {defer wg.Done()//time.Sleep(1*time.Nanosecond)time.Sleep(10*time.Minute)for {if err := consumerGroup.Consume(context.Background(), topics, consumer); err != nil {fmt.Printf("Error from consumer: %v", err)}consumer.ready = make(chan bool)}}()<-consumer.readyselect {case <-ctx.Done():fmt.Printf("kafka terminating: context cancelled")}wg.Wait()}




