介绍

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用Go语言发送和接收kafka消息。
image.png
image.png
image.png
image.png

工作流程

image.png
image.png
image.png
image.png
image.png

安装

1.下载
https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz
2.解压
tar -zxvf kafka_2.12-3.1.0.tgz
mv kafka_2.12-3.1.0 kafka
cd kafka
3.启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
4.启动kafka
bin/kafka-server-start.sh config/server.properties
5.修改配置,如不修改默认寻找主机名,需要配置hosts文件
vim config/server.properties
listeners=PLAINTEXT://192.168.72.10:9092

命令总结

1.手动消费topic(从头开始消费)
./kafka-console-consumer.sh —bootstrap-server=”192.168.72.10:9092” —topic=”test” —from-beginning
#指定消费者组和消费者id
sh kafka-console-consumer.sh —bootstrap-server 192.168.72.10:9092 —consumer-property group.id=test-group —consumer-property client.id=new-consumer-cl —topic test
2.查看所有的topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —list
3.创建topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —create —topic test
4.删除topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —delete—topic test
5.生产消息
./kafka-console-producer.sh —bootstrap-server=”192.168.72.10:9092” —topic=”test”
6.查看topic详细情况
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —topic test —describe
7.查看所有消费者组
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —list
8.查看消费者组的消费情况(消息堆积情况)
3.1.0版本执行以下命令
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe —all-groups
2.x可能执行以下命令
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe-all
指定group查看
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe —group console-consumer-36067
image.png
参数说明:
GROUP:消费者组
TOPIC:主题
PARTITION:分区
CURRENT-OFFSET:当前已消费偏移量
LOG-END-OFFSET:当前生产消息偏移量
LAG:消息堆积数量=(当前生产消息偏移量-当前已消费偏移量)
CONSUMER-ID:消费者ID
HOST:客户端IP
CLIENT-ID:客户端ID
9.删除消费者组
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —delete—group console-consumer-36067

golang操作

生产者代码

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. )
  6. func main() {
  7. //初始化配置
  8. config := sarama.NewConfig()
  9. //设置生产者的ACK消息应答机制为all
  10. config.Producer.RequiredAcks = sarama.WaitForAll
  11. //指定生产者消息些人选定partition分区的策略
  12. config.Producer.Partitioner = sarama.NewRandomPartitioner
  13. //成功消费的消息将在success channel返回
  14. config.Producer.Return.Successes = true
  15. //构造一个消息
  16. msg := &sarama.ProducerMessage{}
  17. msg.Topic = "test"
  18. msg.Value = sarama.StringEncoder("this is a test log")
  19. //连接客户端
  20. client, err := sarama.NewSyncProducer([]string{"192.168.72.10:9092"}, config)
  21. if err != nil {
  22. fmt.Println("client connect err", err)
  23. return
  24. }
  25. defer client.Close()
  26. //发送消息
  27. pid, offset, err := client.SendMessage(msg)
  28. if err != nil {
  29. fmt.Println("send message err", err)
  30. return
  31. }
  32. fmt.Printf("pid:%v offset:%v", pid, offset)
  33. }

消费者代码

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. )
  6. func main() {
  7. consumer, err := sarama.NewConsumer([]string{"192.168.72.10:9092"}, nil)
  8. if err != nil {
  9. fmt.Println("create consumer err", err)
  10. return
  11. }
  12. //根据topic获取分区
  13. partitions, err := consumer.Partitions("aa")
  14. if err != nil {
  15. fmt.Println("get topic partitions err", err)
  16. return
  17. }
  18. fmt.Println(partitions)
  19. for partition := range partitions {
  20. // 针对每个分区创建一个对应的分区消费者
  21. pc, err := consumer.ConsumePartition("aa", int32(partition), sarama.OffsetNewest)
  22. defer pc.AsyncClose()
  23. if err != nil {
  24. fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
  25. return
  26. }
  27. // 异步从每个分区消费信息
  28. go func(sarama.PartitionConsumer) {
  29. for msg := range pc.Messages() {
  30. fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
  31. }
  32. }(pc)
  33. }
  34. //阻塞
  35. select {
  36. }
  37. }

消费组代码

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/Shopify/sarama"
  6. "sync"
  7. "time"
  8. )
  9. type Consumer struct {
  10. ready chan bool
  11. consumerName string
  12. }
  13. func main() {
  14. consumer := &Consumer{
  15. ready: make(chan bool),
  16. consumerName: "consumer1",
  17. }
  18. //consumer2 := &Consumer{
  19. // ready: make(chan bool),
  20. // consumerName: "consumer2",
  21. //}
  22. ctx, consumerCancel := context.WithCancel(context.Background())
  23. wg := &sync.WaitGroup{}
  24. wg.Add(2)
  25. go startConsumer(ctx, []string{"test"}, consumer)
  26. //go startConsumer(ctx, []string{"test"}, consumer2)
  27. wg.Wait()
  28. consumerCancel()
  29. }
  30. func (consumer *Consumer) Setup(_ sarama.ConsumerGroupSession) error {
  31. // Mark the consumer as ready
  32. close(consumer.ready)
  33. return nil
  34. }
  35. func (consumer *Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
  36. func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  37. for message := range claim.Messages() {
  38. fmt.Printf("ConsumerName:%s Message claimed: value = %s, topic = %s,partition=%d", consumer.consumerName, string(message.Value), message.Topic, message.Partition)
  39. session.MarkMessage(message, "")
  40. }
  41. return nil
  42. }
  43. func startConsumer(ctx context.Context, topics []string, consumer *Consumer) {
  44. config := sarama.NewConfig()
  45. //config.Consumer.Return.Errors = true
  46. //config.Version = sarama.V0_11_0_2
  47. //config.Net.SASL.Enable = true
  48. //config.Net.SASL.User = "admin"
  49. //config.Net.SASL.Password = "admin"
  50. // consumer
  51. consumerGroup, err := sarama.NewConsumerGroup([]string{"192.168.72.10:9092"}, "test-group", config)
  52. if err != nil {
  53. fmt.Printf("NewConsumerGroup create consumer error %s\n", err.Error())
  54. return
  55. }
  56. wg := &sync.WaitGroup{}
  57. wg.Add(1)
  58. go func() {
  59. defer wg.Done()
  60. //time.Sleep(1*time.Nanosecond)
  61. time.Sleep(10*time.Minute)
  62. for {
  63. if err := consumerGroup.Consume(context.Background(), topics, consumer); err != nil {
  64. fmt.Printf("Error from consumer: %v", err)
  65. }
  66. consumer.ready = make(chan bool)
  67. }
  68. }()
  69. <-consumer.ready
  70. select {
  71. case <-ctx.Done():
  72. fmt.Printf("kafka terminating: context cancelled")
  73. }
  74. wg.Wait()
  75. }