kakfa是一个非常流行的消息中间件,因为分布式和高吞吐量而得到广泛应用。kafka官方仅提供了java的客户端,如果您的应用是基于golang等其他语言开发,就需要使用第三方的开源客户端了。本文将探讨golang下访问kafka的常见客户端的使用和比较。


1、常见golang下kafka客户端介绍

  • sarama 应该是目前最流行的kafka go客户端了,它由Shopify基于MIT许可证开源到Github上的。支持kafka v0.8及以上版本。该项目提供了简单的Godoc文档,仅仅支持kafka协议的low level级别API,并没有提供topic创建等管理接口。其另一个缺点是将所有的值通过指针传递,这会导致大量的动态内存分配和高频率的GC。
  • confluent-kafka-go由confluent提供的基于librdkafka而封装的高性能kafka客户端,使用方式也很接近于官方原生java客户端。Confluent是一家在Google Cloud和AWS上提供kafka服务的公司,由从Linkedin公司离职的kafka创建者们组建。虽然他们提供的客户端具有高性能、高可靠及商业支持等特性,但由于对C语言库的依赖,在某些场景下,特别是CI方面也需要额外考虑。
  • segmentio 纯粹的kafka golang客户端。由Segment基于MIT许可证开源。它提供了high level和low level的API来访问kafka。同时这个项目也提供了丰富的example和文档。
  • goka goka是一个比较新的kafka go客户端,它专注于将kafka作为服务间消息传递总线使用。goka内部依赖了sarama,是对sarama的封装。
  • 其它的go语言kafka客户端如siestaoptiopay/kafka等一些较为小众的第三方库,这里不再详细介绍。

2、sarama

producer

sarama提供了两种消息生产者:AsyncProducerSyncProducerAsyncProducer通过一个管道接收消息,然后异步发送。这种生产者也是我们绝大多数情况下所需要的。SyncProducer则会提供阻塞方法直到收到kafka server确认该消息已经发布的消息。SyncProducer效率低下,而且实际效率还依赖于Producer.RequiredAcks的配置。
先来看一看同步生产者

  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "log"
  5. "time"
  6. "fmt"
  7. )
  8. func main() {
  9. config := sarama.NewConfig()
  10. // request.timeout.ms
  11. config.Producer.Timeout = time.Second * 5
  12. // message.max.bytes
  13. config.Producer.MaxMessageBytes = 1024 * 1024
  14. // request.required.acks
  15. config.Producer.RequiredAcks = sarama.WaitForAll
  16. config.Version = sarama.V0_11_0_1
  17. if err := config.Validate(); err != nil {
  18. panic(fmt.Errorf("invalid configuration, error: %v", err))
  19. }
  20. producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  21. if err != nil {
  22. log.Fatalln(err)
  23. }
  24. defer func() {
  25. if err := producer.Close(); err != nil {
  26. log.Fatalln(err)
  27. }
  28. }()
  29. for {
  30. msg := &sarama.ProducerMessage{
  31. Topic: "topic-C",
  32. Value: sarama.StringEncoder("this is a testing message from sarama!!!"),
  33. }
  34. partition, offset, err := producer.SendMessage(msg)
  35. if err != nil {
  36. log.Printf("FAILED to send message: %s\n", err)
  37. } else {
  38. log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
  39. }
  40. time.Sleep(1 * time.Second)
  41. }
  42. }

同步生产者提供了两个发送消息的方法:(*SyncProducer).SendMessage(msg)(int32,int64,error)(*SyncProducer).SendMessages(msgs) error,这里调用的是第一个方法,该方法返回所生产的消息的partitionoffset。第二个消息在全部给定的消息都发布完成后返回,无论消息发布失败或者成功。

这里需要说明的是,config.Producer.RequiredAcks用来目前版本中只有三个有效值:

  • NoResponse RequiredAcks = 0 除了tcp ACK,没有额外回应
  • WaitForLocal RequiredAcks = 1 等待本地commit成功即返回响应
  • WaitForAll RequiredAcks = -1 等待所有的副本commit才响应

异步生产者的示例代码如下:

  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "time"
  5. "fmt"
  6. "os/signal"
  7. "os"
  8. "log"
  9. )
  10. func main() {
  11. config := sarama.NewConfig()
  12. config.Version = sarama.V0_11_0_1
  13. config.Producer.Return.Errors = true
  14. config.Producer.Timeout = time.Second
  15. config.Producer.Retry.Max = 5
  16. config.Producer.Retry.Backoff = 5 * time.Second
  17. config.Producer.Flush.Frequency = time.Second
  18. config.Producer.Flush.Messages = 100
  19. config.Producer.Flush.Bytes = 5 * 1024 * 1024
  20. if err := config.Validate(); err != nil {
  21. panic(fmt.Errorf("invalid configuration, error: %v", err))
  22. }
  23. producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
  24. if err != nil {
  25. panic(err)
  26. }
  27. defer func() {
  28. if err := producer.Close(); err != nil {
  29. log.Fatal(err)
  30. }
  31. }()
  32. signals := make(chan os.Signal, 1)
  33. signal.Notify(signals, os.Interrupt)
  34. var successed, failed int32
  35. for {
  36. select {
  37. case producer.Input() <- &sarama.ProducerMessage{
  38. Topic: "topic-C",
  39. Key: nil,
  40. Value: sarama.StringEncoder("this is another message for asyncProducer"),
  41. }:
  42. successed++
  43. case err := <-producer.Errors():
  44. log.Println(err)
  45. failed++
  46. case <-signals:
  47. log.Println("program stopping")
  48. return
  49. }
  50. time.Sleep(time.Second * 2)
  51. }
  52. log.Println("ending")
  53. }

AsyncProducer提供了(*AsyncProducer).Input() chan<- *ProducerMessage方法,通过管道以不阻塞的方式接收消息,并通过管道从方法(*AsyncProducer).Successes() <-chan *ProducerMessage(*AsyncProducer).Errors() <-chan *ProducerError方法接收发送成功或者失败的消息,前提是config.Producer.Return.Successesconfig.Producer.Return.Errors都配置了true。而且需要注意的是,如果这两个配置为true的话,必需从这两个管道中读取数据,否则如果channel满后就会会形成死锁。因此异步生产者比较合理的应用是将它们都放在一个select语句中。

consumer

sarama提供了两种消费kafka消息的方式,一种是指定从某个分区消费数据;另一种是指定一个groupId,所有属于同一个groupId的consumer一起消费某个topic上的消息。第二种consumer更符合大多数kafka应用场景。下面是一个consumer的例子:

  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "fmt"
  5. "context"
  6. "time"
  7. )
  8. func main() {
  9. config := sarama.NewConfig()
  10. config.Version = sarama.V0_11_0_1
  11. // If enabled, any errors that occurred while consuming are returned on
  12. // the Errors channel (default disabled).
  13. config.Consumer.Return.Errors = true
  14. // default 2s
  15. config.Consumer.Retry.Backoff = time.Second
  16. // fetch.min.bytes
  17. config.Consumer.Fetch.Min = 10 * 1024
  18. // etch.message.max.bytes
  19. config.Consumer.Fetch.Max = 5 * 1024 * 1024
  20. // rebalance configuration
  21. config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
  22. config.Consumer.Group.Rebalance.Retry.Max = 10
  23. config.Consumer.Group.Rebalance.Timeout = 10 * time.Second
  24. // config.Consumer.MaxProcessingTime = 10 * time.Second
  25. // fetch.wait.max.ms
  26. config.Consumer.MaxWaitTime = 1 * time.Second
  27. // How frequently to commit updated offsets. Defaults to 1s.
  28. config.Consumer.Offsets.CommitInterval = 1 * time.Second
  29. // The total number of times to retry failing commit
  30. config.Consumer.Offsets.Retry.Max = 3
  31. // Defaults to OffsetNewest.
  32. //config.Consumer.Offsets.Initial = 0
  33. // Requires Kafka broker version 0.9.0 or later.
  34. //config.Consumer.Offsets.Retention = 0
  35. client, err := sarama.NewClient([]string{"localhost:9092"}, config)
  36. if err != nil {
  37. fmt.Println(err)
  38. return
  39. }
  40. defer func() {
  41. _ = client.Close()
  42. }()
  43. group, err:= sarama.NewConsumerGroupFromClient("consumer-group-id", client)
  44. if err != nil {
  45. fmt.Println(err)
  46. return
  47. }
  48. go func() {
  49. for err := range group.Errors() {
  50. fmt.Println("ERROR", err)
  51. }
  52. }()
  53. ctx := context.Background()
  54. for {
  55. topics := []string{"topic-C"}
  56. handler := exampleConsumerGroupHandler{}
  57. err := group.Consume(ctx, topics, handler)
  58. if err != nil {
  59. panic(err)
  60. }
  61. }
  62. }
  63. type exampleConsumerGroupHandler struct{}
  64. func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
  65. return nil
  66. }
  67. func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
  68. return nil
  69. }
  70. func (h exampleConsumerGroupHandler) ConsumeClaim(
  71. sess sarama.ConsumerGroupSession,
  72. claim sarama.ConsumerGroupClaim) error {
  73. for msg := range claim.Messages() {
  74. fmt.Printf("Message on topic:%q[%d]@%d : %v \n",
  75. msg.Topic,
  76. msg.Partition,
  77. msg.Offset,
  78. string(msg.Value),
  79. )
  80. sess.MarkMessage(msg, "")
  81. }
  82. return nil
  83. }

这里需要注意的是,(*ConsumerGroup).Consume(ctx, topics, handler)中,handler承载了消费到的消息的处理逻辑,handler必须继承ConsumerGroupHandler接口。

ClusterAdmin

sarama提供了ClusterAdmin,以方便对集群进行管理,如创建或者删除topic的功能。下面是创建topic的示例,需要注意的是有些接口仅仅支持高版本的kafka,使用之前要注意。

  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "fmt"
  5. )
  6. // supported by brokers with version 0.10.1.0 or higher.
  7. func main() {
  8. config := sarama.NewConfig()
  9. config.Version = sarama.V0_11_0_1
  10. admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, config)
  11. if err != nil {
  12. fmt.Println("open admin conn error is", err)
  13. return
  14. }
  15. detail := sarama.TopicDetail{
  16. NumPartitions: 1,
  17. ReplicationFactor: 1,
  18. }
  19. err = admin.CreateTopic("topic-D", &detail, false)
  20. //admin.DeleteTopic()
  21. if err != nil {
  22. fmt.Println("error is", err)
  23. }
  24. }

3、confluent-kafka-go

安装

confluent-kafka-go和其他的kafka golang客户端不同,它对librdkafka有依赖,我们需要安装librdkafka。安装步骤如下:

  • 下载librdkafka

    1. git clone https://github.com/edenhill/librdkafka.git
  • 安装

    1. ./configure --prefix /usr
    2. make
    3. sudo make install
  • 配置

    1. export PKG_CONFIG_PATH=/usr/lib/pkgconfig

    安装完成librdkafka后方可使用confluent-kafka-go:

    1. go get -u github.com/confluentinc/confluent-kafka-go

producer

下面示例是一个使用kafka high level balanced producer:

  1. package main
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/kafka"
  4. "time"
  5. "fmt"
  6. )
  7. var topic = "topic-C"
  8. func main() {
  9. p, err := kafka.NewProducer(&kafka.ConfigMap{
  10. "bootstrap.servers": "localhost:9092",
  11. })
  12. if err != nil {
  13. panic(err)
  14. }
  15. defer p.Close()
  16. // Delivery report handler for produced messages
  17. go func() {
  18. for e := range p.Events() {
  19. switch ev := e.(type) {
  20. case *kafka.Message:
  21. if ev.TopicPartition.Error != nil {
  22. fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
  23. } else {
  24. fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
  25. }
  26. }
  27. }
  28. }()
  29. for {
  30. p.Produce(&kafka.Message{
  31. TopicPartition: kafka.TopicPartition{
  32. Topic: &topic,
  33. Partition: kafka.PartitionAny},
  34. Value: []byte("this is message produced from confluent client"),
  35. }, nil)
  36. time.Sleep(1 * time.Second)
  37. }
  38. }

这里使用了producer的异步发送单个消息的方法:(*Producer).Produce(msg *Message, deliveryChan chan Event) error,如果指定了deliveryChan,则消息发布结果则会发布到deliveryChan。如果需要批量发送消息,则可以使用基于channel的producer,代码片段如下:

  1. p.ProduceChannel() <- &kafka.Message{
  2. TopicPartition: kafka.TopicPartition{
  3. Topic: &topic,
  4. Partition: kafka.PartitionAny},
  5. Value: []byte(value),
  6. }

consumer

confluent-kafka-go提供了最接近于kafka JVM客户端的体验。下面代码展示了high level balanced consumer

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/confluentinc/confluent-kafka-go/kafka"
  5. )
  6. func main() {
  7. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  8. "bootstrap.servers": "localhost:9092",
  9. "group.id": "consumer-group-id",
  10. "auto.offset.reset": "earliest",
  11. })
  12. if err != nil {
  13. panic(err)
  14. }
  15. c.SubscribeTopics([]string{"topic-C"}, nil)
  16. for {
  17. msg, err := c.ReadMessage(-1)
  18. if err == nil {
  19. fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
  20. } else {
  21. // The client will automatically try to recover from all errors.
  22. fmt.Printf("Consumer error: %v (%v)\n", err, msg)
  23. }
  24. }
  25. c.Close()
  26. }

其中,(*Consumer).ReadMessage(timeout)方法用来从brokers拉取一条消息。这个方法的使用场景较为狭窄,通常我们需要批量拉取消息并处理,confluent-kafka-go提供了基于channel的consumer方法(*Consumer) Events() chan Event使用方法如下:

  1. err = c.SubscribeTopics(topics, nil)
  2. run := true
  3. for run == true {
  4. select {
  5. case sig := <-sigchan:
  6. fmt.Printf("Caught signal %v: terminating\n", sig)
  7. run = false
  8. case ev := <-c.Events():
  9. switch e := ev.(type) {
  10. case kafka.AssignedPartitions:
  11. fmt.Fprintf(os.Stderr, "%% %v\n", e)
  12. c.Assign(e.Partitions)
  13. case kafka.RevokedPartitions:
  14. fmt.Fprintf(os.Stderr, "%% %v\n", e)
  15. c.Unassign()
  16. case *kafka.Message:
  17. fmt.Printf("%% Message on %s:\n%s\n",
  18. e.TopicPartition, string(e.Value))
  19. case kafka.PartitionEOF:
  20. fmt.Printf("%% Reached %v\n", e)
  21. case kafka.Error:
  22. fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
  23. run = false
  24. }
  25. }
  26. }
  27. fmt.Printf("Closing consumer\n")
  28. c.Close()

Admin

confluent-kafka-go和sarama一样提供了admin接口,用于管理broker和topic等。下面是一个使用confluent-kafka-go创建topic的示例:

  1. package main
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/kafka"
  4. "fmt"
  5. "os"
  6. "context"
  7. "time"
  8. )
  9. func main() {
  10. // Create a new AdminClient.
  11. // AdminClient can also be instantiated using an existing
  12. // Producer or Consumer instance, see NewAdminClientFromProducer and
  13. // NewAdminClientFromConsumer.
  14. admin, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
  15. if err != nil {
  16. fmt.Printf("Failed to create Admin client: %s\n", err)
  17. os.Exit(1)
  18. }
  19. defer admin.Close()
  20. // Contexts are used to abort or limit the amount of time
  21. // the Admin call blocks waiting for a result.
  22. ctx, cancel := context.WithCancel(context.Background())
  23. defer cancel()
  24. results, err := admin.CreateTopics(
  25. ctx,
  26. []kafka.TopicSpecification{
  27. {
  28. Topic:"topic-C",
  29. NumPartitions:1,
  30. ReplicationFactor:1,
  31. },
  32. },
  33. kafka.SetAdminOperationTimeout(10 * time.Second),
  34. )
  35. if err != nil {
  36. fmt.Printf("Failed to create topic: %v\n", err)
  37. os.Exit(1)
  38. }
  39. for _, result := range results {
  40. fmt.Printf("%s\n", result)
  41. }
  42. }

4、segmentio

segmentio/kafka-go是一个纯粹使用golang标准库实现的kafka golang客户端,提供了与kafka交互的high level和low level的API。

low level API

segmentio支持从low level的API使用了Conn的类型与kafka交互,Conn代表了一个当前客户端与一个broker的连接,因此Conn创建方法DialLeader()需要指定一个broker地址和一个分区信息。因此low level的kafka-go客户端使用场景并不是很广泛。下面是官方提供的low level的kafka生产者示例:

  1. // to produce messages
  2. topic := "my-topic"
  3. partition := 0
  4. conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
  5. conn.SetWriteDeadline(time.Now().Add(10*time.Second))
  6. conn.WriteMessages(
  7. kafka.Message{Value: []byte("one!")},
  8. kafka.Message{Value: []byte("two!")},
  9. kafka.Message{Value: []byte("three!")},
  10. )
  11. conn.Close()

high level API

segmentio/kafka-go的high level API提供了Reader和Writer的概念。

  • Reader

Reader能够从某个kafka topic上消费消息,和JVM的kafka client一样,可以选择将多个消费者作为一个consumer group一起消费该topic的所有分区,也可以指定仅从某个分区消费。Reader能够在于broker断开后自动重连,并自动管理offset。此外Reader还支持使用context异步取消goroutine。使用Reader以consumer group方式消费kafka数据的示例代码如下:

  1. package main
  2. import (
  3. "github.com/segmentio/kafka-go"
  4. "fmt"
  5. "context"
  6. "time"
  7. )
  8. func main() {
  9. // make a new reader that consumes from topic-A
  10. r := kafka.NewReader(kafka.ReaderConfig{
  11. Brokers: []string{"localhost:9092"},
  12. GroupID: "consumer-group-id",
  13. Topic: "topic-C",
  14. //MinBytes: 10e3, // 10KB
  15. MaxBytes: 10e3, // 10KB
  16. ReadLagInterval: time.Second * (-1),
  17. CommitInterval: time.Second, // flushes commits to Kafka every second
  18. })
  19. for {
  20. m, err := r.ReadMessage(context.Background())
  21. if err != nil {
  22. break
  23. }
  24. fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n",
  25. m.Topic,
  26. m.Partition,
  27. m.Offset,
  28. m.Key,
  29. string(m.Value),
  30. )
  31. }
  32. r.Close()
  33. }

ReaderConfig结构体承载了Reader的所有配置。这里介绍几个重要配置:

  • ReaderConfig.Brokers kafka集群的broker列表
  • ReaderConfig.GroupID consumer group的唯一ID,如果指定了groupID,就不能再配置Partition
  • ReaderConfig.Topic 消费消息所属的topic
  • ReaderConfig.Partition 指定从某个topic的特定Partition消费消息,配置上和GroupID互斥
  • ReaderConfig.Dialer 连接器,用来和kafka cluster建立连接,配置项包括超时时间和TLS等
  • ReaderConfig.QueueCapacity consumer内部的消息缓冲队列大小,more为100
  • ReaderConfig.MinBytes/MaxBytes 每次从kafka拉去消息的最大/小字节数
  • ReaderConfig.GroupBalancers 当配置了GroupID时,方可使用该参数指定consumer侧rebalance的协调器
  • ReaderConfig.MaxWait 当批量拉取消息时的最大等待时间
  • ReaderConfig.CommitInterval 当配置了GroupID时,方可配置该参数以指定自动commit offset的间隔,如果该值为0,表示异步。默认值为1s

其它配置可以自行参考其godoc文档。

kafka-go也支持手动commit,手动commit时,需要调用FetchMessage接口而不是示例代码中的ReadMessage():

  1. ctx := context.Background()
  2. for {
  3. m, err := r.FetchMessage(ctx)
  4. if err != nil {
  5. break
  6. }
  7. fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n",
  8. m.Topic,
  9. m.Partition,
  10. m.Offset,
  11. string(m.Key),
  12. string(m.Value),
  13. )
  14. r.CommitMessages(ctx, m)
  15. }

Writer

Writer是一个high level的kafka producer,Writer比Conn更适合大多数向kafka发送消息的场景,而且具有一下特性:

  • 错误时会自动重试和重连
  • 支持同步和异步发送消息
  • 可以使用context取消异步发送的消息支持
  • 平滑关闭Writer,关闭时可以flush本地缓存中未produce的消息

下面是一个简单的同步producer示例:

  1. package main
  2. import (
  3. "github.com/segmentio/kafka-go"
  4. "golang.org/x/net/context"
  5. "time"
  6. "strconv"
  7. )
  8. func main() {
  9. // make a writer that produces to topic-A, using the least-bytes distribution
  10. w := kafka.NewWriter(kafka.WriterConfig{
  11. Brokers: []string{"localhost:9092"},
  12. Topic: "topic-C",
  13. Balancer: &kafka.LeastBytes{},
  14. })
  15. index := 0
  16. for {
  17. w.WriteMessages(context.Background(),
  18. kafka.Message{
  19. Key: []byte(strconv.Itoa(index)),
  20. Value: []byte("this is a message from segmentio/writer"),
  21. },
  22. )
  23. time.Sleep(time.Second)
  24. index++
  25. }
  26. }

如果需要异步发送消息,需要在配置中添加Async:true的配置即可,其它重要配置包括:

  • WriterConfig.MaxAttempts 发送消息的最大尝试次数
  • WriterConfig.QueueCapacity 内部消息队列的大小,默认值为100
  • WriterConfig.BatchSize writer本地缓冲中的消息数量最多达到BatchSize时,将会发送
  • WriterConfig.BatchTimeout 本地缓冲消息向kafka批量发送的最大周期
  • WriterConfig.RequiredAcks 是否需要kafka server确认所有副本收到writer发送的消息,默认值为-1,表示需要等到所有副本收到消息
  • WriterConfig.Async 用来指定writer是异步发送消息还是同步消息

Admin

segmentio也支持创建或者删除topic的功能,这些功能在low level API接口的Conn上提供:

  1. package main
  2. import (
  3. "golang.org/x/net/context"
  4. "github.com/segmentio/kafka-go"
  5. "fmt"
  6. "os"
  7. )
  8. func main() {
  9. conn, err := kafka.DialContext(context.Background(), "tcp", "localhost:9092")
  10. if err != nil {
  11. panic(err)
  12. }
  13. err = conn.CreateTopics(kafka.TopicConfig{
  14. Topic:"topic-F",
  15. NumPartitions:1,
  16. ReplicationFactor:1,
  17. })
  18. if err != nil {
  19. fmt.Fprintf(os.Stderr, "create topic fialed, cause:%v", err)
  20. }
  21. }

总结

除了这里列出代码的三个kafka golang客户端,还有一些包括optiopay/kafka等,这里不再详细介绍。从以上三个开源库看来,三者都具有相对丰富的接口,但是confluent-kafka-go具有最好的效率和支持channel的特性,但是对C lib的依赖,限制了它在某些条件下的使用。sarama虽然文档不足,但经过仔细研究发现它也支持各种使用场景,包括同步异步、使用select等go特性。而segmentio虽然在Github批评前二者不支持context,但是自身也不支持将消息和反馈通过channel接收。目前看来在项目上segmentio和sarama更实用。