1 sarama

sarama是一个go语言中最流行的操作kafka的库
go get github.com/Shopify/sarama
(1) 生产者
package mainimport ("fmt""github.com/Shopify/sarama""time")func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partitionconfig.Producer.Return.Successes = true // 成功交付的消息将在success channel返回// 连接kafkaclient, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)if err != nil {fmt.Println("producer closed, err:", err)return}fmt.Println("连接kafka成功")defer client.Close()// 发送消息// 构造一个消息for i := 0; i < 100; i++ {msg := &sarama.ProducerMessage{Topic: "1_3",Key: sarama.StringEncoder("AAA"),Value: sarama.StringEncoder("BBB"),}pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send msg failed, err:", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)time.Sleep(time.Second)}}
(2) 消费者
package mainimport ("fmt""github.com/Shopify/sarama")// kafka consumerfunc main() {consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)if err != nil {fmt.Printf("fail to start consumer, err:%v\n", err)return}topic := "1_3"partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区if err != nil {fmt.Printf("fail to get list of partition:err%v\n", err)return}fmt.Println(partitionList)for partition := range partitionList { // 遍历所有的分区// 针对每个分区创建一个对应的分区消费者pc, err := consumer.ConsumePartition(topic, int32(partition), 0) // sarama.OffsetNewest是最新偏移if err != nil {fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)return}defer pc.AsyncClose()// 同步从每个分区消费信息for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)}}}
2 kafka-go
kafka-go的API简洁易懂, 但有几个缺点:
- 生产者用起来, 刚上手可能觉得比较坑
- *kafka.Conn发送速度快, 但不能自动重连
- *kafka.Writer可自动重连. 但要1秒才能发给broker
这里要注意writer是为了效率, 默认收集一秒内的消息, 一秒后再批量发给broker, 但IM系统里不允许这么慢 可以通过配置修改, 个人认为20ms左右比较ok
w := &kafka.Writer{Addr: kafka.TCP("localhost:9093"),Topic: "test",BatchTimeout: time.Millisecond * 20,}
- 官方文档 没写 分区策略模式 能配置哪几种
import ( “context” “github.com/segmentio/kafka-go” log “github.com/sirupsen/logrus” “time” )
func main() { topic := “my-topic” partition := 0
conn, err := kafka.DialLeader(context.TODO(), "tcp", "101.42.134.18:9093", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}log.Info("connect success")conn.SetWriteDeadline(time.Now().Add(10*time.Second))_, err = conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)if err != nil {log.Fatal("failed to write messages:", err)}if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}
<a name="KZKFB"></a>## (2) 消费者```gopackage mainimport ("context""fmt""github.com/segmentio/kafka-go"log "github.com/sirupsen/logrus")func main() {r := kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"47.95.223.243:9092"},GroupID: "consumer-group-id",Topic: "my-topic",})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}if err := r.Close(); err != nil {log.Fatal("failed to close reader:", err)}}
(3) 创建topic
其实没有的话会自动创建, 不需要这个函数
func CreateTopic(conn *kafka.Conn, topic string) {controller, err := conn.Controller()if err != nil {panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer controllerConn.Close()topicConfigs := []kafka.TopicConfig{{Topic: topic,NumPartitions: 1, // 分区1ReplicationFactor: 2, // 副本两个, 9092和9093各一个, 保证高可用},}err = controllerConn.CreateTopics(topicConfigs...)if err != nil {panic(err.Error())}}
