1 sarama

52c7fd5dbac3e6ecce443d05738cb8d.png
sarama是一个go语言中最流行的操作kafka的库

  1. go get github.com/Shopify/sarama

(1) 生产者

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. "time"
  6. )
  7. func main() {
  8. config := sarama.NewConfig()
  9. config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
  10. config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
  11. config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
  12. // 连接kafka
  13. client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  14. if err != nil {
  15. fmt.Println("producer closed, err:", err)
  16. return
  17. }
  18. fmt.Println("连接kafka成功")
  19. defer client.Close()
  20. // 发送消息
  21. // 构造一个消息
  22. for i := 0; i < 100; i++ {
  23. msg := &sarama.ProducerMessage{
  24. Topic: "1_3",
  25. Key: sarama.StringEncoder("AAA"),
  26. Value: sarama.StringEncoder("BBB"),
  27. }
  28. pid, offset, err := client.SendMessage(msg)
  29. if err != nil {
  30. fmt.Println("send msg failed, err:", err)
  31. return
  32. }
  33. fmt.Printf("pid:%v offset:%v\n", pid, offset)
  34. time.Sleep(time.Second)
  35. }
  36. }

(2) 消费者

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. )
  6. // kafka consumer
  7. func main() {
  8. consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
  9. if err != nil {
  10. fmt.Printf("fail to start consumer, err:%v\n", err)
  11. return
  12. }
  13. topic := "1_3"
  14. partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
  15. if err != nil {
  16. fmt.Printf("fail to get list of partition:err%v\n", err)
  17. return
  18. }
  19. fmt.Println(partitionList)
  20. for partition := range partitionList { // 遍历所有的分区
  21. // 针对每个分区创建一个对应的分区消费者
  22. pc, err := consumer.ConsumePartition(topic, int32(partition), 0) // sarama.OffsetNewest是最新偏移
  23. if err != nil {
  24. fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
  25. return
  26. }
  27. defer pc.AsyncClose()
  28. // 同步从每个分区消费信息
  29. for msg := range pc.Messages() {
  30. fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
  31. }
  32. }
  33. }

2 kafka-go

kafka-go的API简洁易懂, 但有几个缺点:

  1. 生产者用起来, 刚上手可能觉得比较坑
  • *kafka.Conn发送速度快, 但不能自动重连
  • *kafka.Writer可自动重连. 但要1秒才能发给broker

    这里要注意writer是为了效率, 默认收集一秒内的消息, 一秒后再批量发给broker, 但IM系统里不允许这么慢 可以通过配置修改, 个人认为20ms左右比较ok

  1. w := &kafka.Writer{
  2. Addr: kafka.TCP("localhost:9093"),
  3. Topic: "test",
  4. BatchTimeout: time.Millisecond * 20,
  5. }
  1. 官方文档 没写 分区策略模式 能配置哪几种
  • 想配置 按消息键相同的保存在同一分区, 不知道怎么实现
    1. go get github.com/segmentio/kafka-go

    (1) 生产者

    ```go package main

import ( “context” “github.com/segmentio/kafka-go” log “github.com/sirupsen/logrus” “time” )

func main() { topic := “my-topic” partition := 0

  1. conn, err := kafka.DialLeader(context.TODO(), "tcp", "101.42.134.18:9093", topic, partition)
  2. if err != nil {
  3. log.Fatal("failed to dial leader:", err)
  4. }
  5. log.Info("connect success")
  6. conn.SetWriteDeadline(time.Now().Add(10*time.Second))
  7. _, err = conn.WriteMessages(
  8. kafka.Message{Value: []byte("one!")},
  9. kafka.Message{Value: []byte("two!")},
  10. kafka.Message{Value: []byte("three!")},
  11. )
  12. if err != nil {
  13. log.Fatal("failed to write messages:", err)
  14. }
  15. if err := conn.Close(); err != nil {
  16. log.Fatal("failed to close writer:", err)
  17. }

}

  1. <a name="KZKFB"></a>
  2. ## (2) 消费者
  3. ```go
  4. package main
  5. import (
  6. "context"
  7. "fmt"
  8. "github.com/segmentio/kafka-go"
  9. log "github.com/sirupsen/logrus"
  10. )
  11. func main() {
  12. r := kafka.NewReader(kafka.ReaderConfig{
  13. Brokers: []string{"47.95.223.243:9092"},
  14. GroupID: "consumer-group-id",
  15. Topic: "my-topic",
  16. })
  17. for {
  18. m, err := r.ReadMessage(context.Background())
  19. if err != nil {
  20. break
  21. }
  22. fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
  23. }
  24. if err := r.Close(); err != nil {
  25. log.Fatal("failed to close reader:", err)
  26. }
  27. }

(3) 创建topic

其实没有的话会自动创建, 不需要这个函数

  1. func CreateTopic(conn *kafka.Conn, topic string) {
  2. controller, err := conn.Controller()
  3. if err != nil {
  4. panic(err.Error())
  5. }
  6. var controllerConn *kafka.Conn
  7. controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
  8. if err != nil {
  9. panic(err.Error())
  10. }
  11. defer controllerConn.Close()
  12. topicConfigs := []kafka.TopicConfig{
  13. {
  14. Topic: topic,
  15. NumPartitions: 1, // 分区1
  16. ReplicationFactor: 2, // 副本两个, 90929093各一个, 保证高可用
  17. },
  18. }
  19. err = controllerConn.CreateTopics(topicConfigs...)
  20. if err != nil {
  21. panic(err.Error())
  22. }
  23. }