kakfa是一个非常流行的消息中间件,因为分布式和高吞吐量而得到广泛应用。kafka官方仅提供了java的客户端,如果您的应用是基于golang等其他语言开发,就需要使用第三方的开源客户端了。本文将探讨golang下访问kafka的常见客户端的使用和比较。
1、常见golang下kafka客户端介绍
- sarama 应该是目前最流行的kafka go客户端了,它由Shopify基于MIT许可证开源到Github上的。支持kafka v0.8及以上版本。该项目提供了简单的Godoc文档,仅仅支持kafka协议的low level级别API,并没有提供topic创建等管理接口。其另一个缺点是将所有的值通过指针传递,这会导致大量的动态内存分配和高频率的GC。
- confluent-kafka-go由confluent提供的基于librdkafka而封装的高性能kafka客户端,使用方式也很接近于官方原生java客户端。Confluent是一家在Google Cloud和AWS上提供kafka服务的公司,由从Linkedin公司离职的kafka创建者们组建。虽然他们提供的客户端具有高性能、高可靠及商业支持等特性,但由于对C语言库的依赖,在某些场景下,特别是CI方面也需要额外考虑。
- segmentio 纯粹的kafka golang客户端。由Segment基于MIT许可证开源。它提供了high level和low level的API来访问kafka。同时这个项目也提供了丰富的example和文档。
- goka goka是一个比较新的kafka go客户端,它专注于将kafka作为服务间消息传递总线使用。goka内部依赖了sarama,是对sarama的封装。
- 其它的go语言kafka客户端如siesta和optiopay/kafka等一些较为小众的第三方库,这里不再详细介绍。
2、sarama
producer
sarama提供了两种消息生产者:AsyncProducer和SyncProducer。AsyncProducer通过一个管道接收消息,然后异步发送。这种生产者也是我们绝大多数情况下所需要的。SyncProducer则会提供阻塞方法直到收到kafka server确认该消息已经发布的消息。SyncProducer效率低下,而且实际效率还依赖于Producer.RequiredAcks的配置。
先来看一看同步生产者
package mainimport ("github.com/Shopify/sarama""log""time""fmt")func main() {config := sarama.NewConfig()// request.timeout.msconfig.Producer.Timeout = time.Second * 5// message.max.bytesconfig.Producer.MaxMessageBytes = 1024 * 1024// request.required.acksconfig.Producer.RequiredAcks = sarama.WaitForAllconfig.Version = sarama.V0_11_0_1if err := config.Validate(); err != nil {panic(fmt.Errorf("invalid configuration, error: %v", err))}producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)if err != nil {log.Fatalln(err)}defer func() {if err := producer.Close(); err != nil {log.Fatalln(err)}}()for {msg := &sarama.ProducerMessage{Topic: "topic-C",Value: sarama.StringEncoder("this is a testing message from sarama!!!"),}partition, offset, err := producer.SendMessage(msg)if err != nil {log.Printf("FAILED to send message: %s\n", err)} else {log.Printf("> message sent to partition %d at offset %d\n", partition, offset)}time.Sleep(1 * time.Second)}}
同步生产者提供了两个发送消息的方法:(*SyncProducer).SendMessage(msg)(int32,int64,error)和(*SyncProducer).SendMessages(msgs) error,这里调用的是第一个方法,该方法返回所生产的消息的partition和offset。第二个消息在全部给定的消息都发布完成后返回,无论消息发布失败或者成功。
这里需要说明的是,
config.Producer.RequiredAcks用来目前版本中只有三个有效值:
NoResponse RequiredAcks = 0除了tcp ACK,没有额外回应WaitForLocal RequiredAcks = 1等待本地commit成功即返回响应WaitForAll RequiredAcks = -1等待所有的副本commit才响应
异步生产者的示例代码如下:
package mainimport ("github.com/Shopify/sarama""time""fmt""os/signal""os""log")func main() {config := sarama.NewConfig()config.Version = sarama.V0_11_0_1config.Producer.Return.Errors = trueconfig.Producer.Timeout = time.Secondconfig.Producer.Retry.Max = 5config.Producer.Retry.Backoff = 5 * time.Secondconfig.Producer.Flush.Frequency = time.Secondconfig.Producer.Flush.Messages = 100config.Producer.Flush.Bytes = 5 * 1024 * 1024if err := config.Validate(); err != nil {panic(fmt.Errorf("invalid configuration, error: %v", err))}producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)if err != nil {panic(err)}defer func() {if err := producer.Close(); err != nil {log.Fatal(err)}}()signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)var successed, failed int32for {select {case producer.Input() <- &sarama.ProducerMessage{Topic: "topic-C",Key: nil,Value: sarama.StringEncoder("this is another message for asyncProducer"),}:successed++case err := <-producer.Errors():log.Println(err)failed++case <-signals:log.Println("program stopping")return}time.Sleep(time.Second * 2)}log.Println("ending")}
AsyncProducer提供了(*AsyncProducer).Input() chan<- *ProducerMessage方法,通过管道以不阻塞的方式接收消息,并通过管道从方法(*AsyncProducer).Successes() <-chan *ProducerMessage和(*AsyncProducer).Errors() <-chan *ProducerError方法接收发送成功或者失败的消息,前提是config.Producer.Return.Successes和config.Producer.Return.Errors都配置了true。而且需要注意的是,如果这两个配置为true的话,必需从这两个管道中读取数据,否则如果channel满后就会会形成死锁。因此异步生产者比较合理的应用是将它们都放在一个select语句中。
consumer
sarama提供了两种消费kafka消息的方式,一种是指定从某个分区消费数据;另一种是指定一个groupId,所有属于同一个groupId的consumer一起消费某个topic上的消息。第二种consumer更符合大多数kafka应用场景。下面是一个consumer的例子:
package mainimport ("github.com/Shopify/sarama""fmt""context""time")func main() {config := sarama.NewConfig()config.Version = sarama.V0_11_0_1// If enabled, any errors that occurred while consuming are returned on// the Errors channel (default disabled).config.Consumer.Return.Errors = true// default 2sconfig.Consumer.Retry.Backoff = time.Second// fetch.min.bytesconfig.Consumer.Fetch.Min = 10 * 1024// etch.message.max.bytesconfig.Consumer.Fetch.Max = 5 * 1024 * 1024// rebalance configurationconfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRangeconfig.Consumer.Group.Rebalance.Retry.Max = 10config.Consumer.Group.Rebalance.Timeout = 10 * time.Second// config.Consumer.MaxProcessingTime = 10 * time.Second// fetch.wait.max.msconfig.Consumer.MaxWaitTime = 1 * time.Second// How frequently to commit updated offsets. Defaults to 1s.config.Consumer.Offsets.CommitInterval = 1 * time.Second// The total number of times to retry failing commitconfig.Consumer.Offsets.Retry.Max = 3// Defaults to OffsetNewest.//config.Consumer.Offsets.Initial = 0// Requires Kafka broker version 0.9.0 or later.//config.Consumer.Offsets.Retention = 0client, err := sarama.NewClient([]string{"localhost:9092"}, config)if err != nil {fmt.Println(err)return}defer func() {_ = client.Close()}()group, err:= sarama.NewConsumerGroupFromClient("consumer-group-id", client)if err != nil {fmt.Println(err)return}go func() {for err := range group.Errors() {fmt.Println("ERROR", err)}}()ctx := context.Background()for {topics := []string{"topic-C"}handler := exampleConsumerGroupHandler{}err := group.Consume(ctx, topics, handler)if err != nil {panic(err)}}}type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {return nil}func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {return nil}func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Message on topic:%q[%d]@%d : %v \n",msg.Topic,msg.Partition,msg.Offset,string(msg.Value),)sess.MarkMessage(msg, "")}return nil}
这里需要注意的是,(*ConsumerGroup).Consume(ctx, topics, handler)中,handler承载了消费到的消息的处理逻辑,handler必须继承ConsumerGroupHandler接口。
ClusterAdmin
sarama提供了ClusterAdmin,以方便对集群进行管理,如创建或者删除topic的功能。下面是创建topic的示例,需要注意的是有些接口仅仅支持高版本的kafka,使用之前要注意。
package mainimport ("github.com/Shopify/sarama""fmt")// supported by brokers with version 0.10.1.0 or higher.func main() {config := sarama.NewConfig()config.Version = sarama.V0_11_0_1admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, config)if err != nil {fmt.Println("open admin conn error is", err)return}detail := sarama.TopicDetail{NumPartitions: 1,ReplicationFactor: 1,}err = admin.CreateTopic("topic-D", &detail, false)//admin.DeleteTopic()if err != nil {fmt.Println("error is", err)}}
3、confluent-kafka-go
安装
confluent-kafka-go和其他的kafka golang客户端不同,它对librdkafka有依赖,我们需要安装librdkafka。安装步骤如下:
下载librdkafka
git clone https://github.com/edenhill/librdkafka.git
安装
./configure --prefix /usrmakesudo make install
配置
export PKG_CONFIG_PATH=/usr/lib/pkgconfig
安装完成librdkafka后方可使用confluent-kafka-go:
go get -u github.com/confluentinc/confluent-kafka-go
producer
下面示例是一个使用kafka high level balanced producer:
package mainimport ("github.com/confluentinc/confluent-kafka-go/kafka""time""fmt")var topic = "topic-C"func main() {p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092",})if err != nil {panic(err)}defer p.Close()// Delivery report handler for produced messagesgo func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v\n", ev.TopicPartition)}}}}()for {p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic,Partition: kafka.PartitionAny},Value: []byte("this is message produced from confluent client"),}, nil)time.Sleep(1 * time.Second)}}
这里使用了producer的异步发送单个消息的方法:(*Producer).Produce(msg *Message, deliveryChan chan Event) error,如果指定了deliveryChan,则消息发布结果则会发布到deliveryChan。如果需要批量发送消息,则可以使用基于channel的producer,代码片段如下:
p.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic,Partition: kafka.PartitionAny},Value: []byte(value),}
consumer
confluent-kafka-go提供了最接近于kafka JVM客户端的体验。下面代码展示了high level balanced consumer
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "consumer-group-id","auto.offset.reset": "earliest",})if err != nil {panic(err)}c.SubscribeTopics([]string{"topic-C"}, nil)for {msg, err := c.ReadMessage(-1)if err == nil {fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))} else {// The client will automatically try to recover from all errors.fmt.Printf("Consumer error: %v (%v)\n", err, msg)}}c.Close()}
其中,(*Consumer).ReadMessage(timeout)方法用来从brokers拉取一条消息。这个方法的使用场景较为狭窄,通常我们需要批量拉取消息并处理,confluent-kafka-go提供了基于channel的consumer方法(*Consumer) Events() chan Event使用方法如下:
err = c.SubscribeTopics(topics, nil)run := truefor run == true {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsecase ev := <-c.Events():switch e := ev.(type) {case kafka.AssignedPartitions:fmt.Fprintf(os.Stderr, "%% %v\n", e)c.Assign(e.Partitions)case kafka.RevokedPartitions:fmt.Fprintf(os.Stderr, "%% %v\n", e)c.Unassign()case *kafka.Message:fmt.Printf("%% Message on %s:\n%s\n",e.TopicPartition, string(e.Value))case kafka.PartitionEOF:fmt.Printf("%% Reached %v\n", e)case kafka.Error:fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)run = false}}}fmt.Printf("Closing consumer\n")c.Close()
Admin
confluent-kafka-go和sarama一样提供了admin接口,用于管理broker和topic等。下面是一个使用confluent-kafka-go创建topic的示例:
package mainimport ("github.com/confluentinc/confluent-kafka-go/kafka""fmt""os""context""time")func main() {// Create a new AdminClient.// AdminClient can also be instantiated using an existing// Producer or Consumer instance, see NewAdminClientFromProducer and// NewAdminClientFromConsumer.admin, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})if err != nil {fmt.Printf("Failed to create Admin client: %s\n", err)os.Exit(1)}defer admin.Close()// Contexts are used to abort or limit the amount of time// the Admin call blocks waiting for a result.ctx, cancel := context.WithCancel(context.Background())defer cancel()results, err := admin.CreateTopics(ctx,[]kafka.TopicSpecification{{Topic:"topic-C",NumPartitions:1,ReplicationFactor:1,},},kafka.SetAdminOperationTimeout(10 * time.Second),)if err != nil {fmt.Printf("Failed to create topic: %v\n", err)os.Exit(1)}for _, result := range results {fmt.Printf("%s\n", result)}}
4、segmentio
segmentio/kafka-go是一个纯粹使用golang标准库实现的kafka golang客户端,提供了与kafka交互的high level和low level的API。
low level API
segmentio支持从low level的API使用了Conn的类型与kafka交互,Conn代表了一个当前客户端与一个broker的连接,因此Conn创建方法DialLeader()需要指定一个broker地址和一个分区信息。因此low level的kafka-go客户端使用场景并不是很广泛。下面是官方提供的low level的kafka生产者示例:
// to produce messagestopic := "my-topic"partition := 0conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)conn.SetWriteDeadline(time.Now().Add(10*time.Second))conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)conn.Close()
high level API
segmentio/kafka-go的high level API提供了Reader和Writer的概念。
- Reader
Reader能够从某个kafka topic上消费消息,和JVM的kafka client一样,可以选择将多个消费者作为一个consumer group一起消费该topic的所有分区,也可以指定仅从某个分区消费。Reader能够在于broker断开后自动重连,并自动管理offset。此外Reader还支持使用context异步取消goroutine。使用Reader以consumer group方式消费kafka数据的示例代码如下:
package mainimport ("github.com/segmentio/kafka-go""fmt""context""time")func main() {// make a new reader that consumes from topic-Ar := kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"},GroupID: "consumer-group-id",Topic: "topic-C",//MinBytes: 10e3, // 10KBMaxBytes: 10e3, // 10KBReadLagInterval: time.Second * (-1),CommitInterval: time.Second, // flushes commits to Kafka every second})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n",m.Topic,m.Partition,m.Offset,m.Key,string(m.Value),)}r.Close()}
ReaderConfig结构体承载了Reader的所有配置。这里介绍几个重要配置:
ReaderConfig.Brokerskafka集群的broker列表ReaderConfig.GroupIDconsumer group的唯一ID,如果指定了groupID,就不能再配置PartitionReaderConfig.Topic消费消息所属的topicReaderConfig.Partition指定从某个topic的特定Partition消费消息,配置上和GroupID互斥ReaderConfig.Dialer连接器,用来和kafka cluster建立连接,配置项包括超时时间和TLS等ReaderConfig.QueueCapacityconsumer内部的消息缓冲队列大小,more为100ReaderConfig.MinBytes/MaxBytes每次从kafka拉去消息的最大/小字节数ReaderConfig.GroupBalancers当配置了GroupID时,方可使用该参数指定consumer侧rebalance的协调器ReaderConfig.MaxWait当批量拉取消息时的最大等待时间ReaderConfig.CommitInterval当配置了GroupID时,方可配置该参数以指定自动commit offset的间隔,如果该值为0,表示异步。默认值为1s其它配置可以自行参考其godoc文档。
kafka-go也支持手动commit,手动commit时,需要调用FetchMessage接口而不是示例代码中的ReadMessage():
ctx := context.Background()for {m, err := r.FetchMessage(ctx)if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n",m.Topic,m.Partition,m.Offset,string(m.Key),string(m.Value),)r.CommitMessages(ctx, m)}
Writer
Writer是一个high level的kafka producer,Writer比Conn更适合大多数向kafka发送消息的场景,而且具有一下特性:
- 错误时会自动重试和重连
- 支持同步和异步发送消息
- 可以使用context取消异步发送的消息支持
- 平滑关闭Writer,关闭时可以flush本地缓存中未produce的消息
下面是一个简单的同步producer示例:
package mainimport ("github.com/segmentio/kafka-go""golang.org/x/net/context""time""strconv")func main() {// make a writer that produces to topic-A, using the least-bytes distributionw := kafka.NewWriter(kafka.WriterConfig{Brokers: []string{"localhost:9092"},Topic: "topic-C",Balancer: &kafka.LeastBytes{},})index := 0for {w.WriteMessages(context.Background(),kafka.Message{Key: []byte(strconv.Itoa(index)),Value: []byte("this is a message from segmentio/writer"),},)time.Sleep(time.Second)index++}}
如果需要异步发送消息,需要在配置中添加Async:true的配置即可,其它重要配置包括:
WriterConfig.MaxAttempts发送消息的最大尝试次数WriterConfig.QueueCapacity内部消息队列的大小,默认值为100WriterConfig.BatchSizewriter本地缓冲中的消息数量最多达到BatchSize时,将会发送WriterConfig.BatchTimeout本地缓冲消息向kafka批量发送的最大周期WriterConfig.RequiredAcks是否需要kafka server确认所有副本收到writer发送的消息,默认值为-1,表示需要等到所有副本收到消息WriterConfig.Async用来指定writer是异步发送消息还是同步消息
Admin
segmentio也支持创建或者删除topic的功能,这些功能在low level API接口的Conn上提供:
package mainimport ("golang.org/x/net/context""github.com/segmentio/kafka-go""fmt""os")func main() {conn, err := kafka.DialContext(context.Background(), "tcp", "localhost:9092")if err != nil {panic(err)}err = conn.CreateTopics(kafka.TopicConfig{Topic:"topic-F",NumPartitions:1,ReplicationFactor:1,})if err != nil {fmt.Fprintf(os.Stderr, "create topic fialed, cause:%v", err)}}
总结
除了这里列出代码的三个kafka golang客户端,还有一些包括optiopay/kafka等,这里不再详细介绍。从以上三个开源库看来,三者都具有相对丰富的接口,但是confluent-kafka-go具有最好的效率和支持channel的特性,但是对C lib的依赖,限制了它在某些条件下的使用。sarama虽然文档不足,但经过仔细研究发现它也支持各种使用场景,包括同步异步、使用select等go特性。而segmentio虽然在Github批评前二者不支持context,但是自身也不支持将消息和反馈通过channel接收。目前看来在项目上segmentio和sarama更实用。
