1 sarama
sarama是一个go语言中最流行的操作kafka的库
go get github.com/Shopify/sarama
(1) 生产者
package main
import (
"fmt"
"github.com/Shopify/sarama"
"time"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
fmt.Println("连接kafka成功")
defer client.Close()
// 发送消息
// 构造一个消息
for i := 0; i < 100; i++ {
msg := &sarama.ProducerMessage{
Topic: "1_3",
Key: sarama.StringEncoder("AAA"),
Value: sarama.StringEncoder("BBB"),
}
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
time.Sleep(time.Second)
}
}
(2) 消费者
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
topic := "1_3"
partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍历所有的分区
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition(topic, int32(partition), 0) // sarama.OffsetNewest是最新偏移
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 同步从每个分区消费信息
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}
}
2 kafka-go
kafka-go的API简洁易懂, 但有几个缺点:
- 生产者用起来, 刚上手可能觉得比较坑
- *kafka.Conn发送速度快, 但不能自动重连
- *kafka.Writer可自动重连. 但要1秒才能发给broker
这里要注意writer是为了效率, 默认收集一秒内的消息, 一秒后再批量发给broker, 但IM系统里不允许这么慢 可以通过配置修改, 个人认为20ms左右比较ok
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9093"),
Topic: "test",
BatchTimeout: time.Millisecond * 20,
}
- 官方文档 没写 分区策略模式 能配置哪几种
import ( “context” “github.com/segmentio/kafka-go” log “github.com/sirupsen/logrus” “time” )
func main() { topic := “my-topic” partition := 0
conn, err := kafka.DialLeader(context.TODO(), "tcp", "101.42.134.18:9093", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
log.Info("connect success")
conn.SetWriteDeadline(time.Now().Add(10*time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
<a name="KZKFB"></a>
## (2) 消费者
```go
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
)
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"47.95.223.243:9092"},
GroupID: "consumer-group-id",
Topic: "my-topic",
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
(3) 创建topic
其实没有的话会自动创建, 不需要这个函数
func CreateTopic(conn *kafka.Conn, topic string) {
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1, // 分区1
ReplicationFactor: 2, // 副本两个, 9092和9093各一个, 保证高可用
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}