介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用Go语言发送和接收kafka消息。
工作流程
安装
1.下载
https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz
2.解压
tar -zxvf kafka_2.12-3.1.0.tgz
mv kafka_2.12-3.1.0 kafka
cd kafka
3.启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
4.启动kafka
bin/kafka-server-start.sh config/server.properties
5.修改配置,如不修改默认寻找主机名,需要配置hosts文件
vim config/server.properties
listeners=PLAINTEXT://192.168.72.10:9092
命令总结
1.手动消费topic(从头开始消费)
./kafka-console-consumer.sh —bootstrap-server=”192.168.72.10:9092” —topic=”test” —from-beginning
#指定消费者组和消费者id
sh kafka-console-consumer.sh —bootstrap-server 192.168.72.10:9092 —consumer-property group.id=test-group —consumer-property client.id=new-consumer-cl —topic test
2.查看所有的topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —list
3.创建topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —create —topic test
4.删除topic
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —delete—topic test
5.生产消息
./kafka-console-producer.sh —bootstrap-server=”192.168.72.10:9092” —topic=”test”
6.查看topic详细情况
sh kafka-topics.sh —bootstrap-server 192.168.72.10:9092 —topic test —describe
7.查看所有消费者组
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —list
8.查看消费者组的消费情况(消息堆积情况)
3.1.0版本执行以下命令
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe —all-groups
2.x可能执行以下命令
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe-all
指定group查看
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —describe —group console-consumer-36067
参数说明:
GROUP:消费者组
TOPIC:主题
PARTITION:分区
CURRENT-OFFSET:当前已消费偏移量
LOG-END-OFFSET:当前生产消息偏移量
LAG:消息堆积数量=(当前生产消息偏移量-当前已消费偏移量)
CONSUMER-ID:消费者ID
HOST:客户端IP
CLIENT-ID:客户端ID
9.删除消费者组
sh kafka-consumer-groups.sh —bootstrap-server 192.168.72.10:9092 —delete—group console-consumer-36067
golang操作
生产者代码
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
//初始化配置
config := sarama.NewConfig()
//设置生产者的ACK消息应答机制为all
config.Producer.RequiredAcks = sarama.WaitForAll
//指定生产者消息些人选定partition分区的策略
config.Producer.Partitioner = sarama.NewRandomPartitioner
//成功消费的消息将在success channel返回
config.Producer.Return.Successes = true
//构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "test"
msg.Value = sarama.StringEncoder("this is a test log")
//连接客户端
client, err := sarama.NewSyncProducer([]string{"192.168.72.10:9092"}, config)
if err != nil {
fmt.Println("client connect err", err)
return
}
defer client.Close()
//发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message err", err)
return
}
fmt.Printf("pid:%v offset:%v", pid, offset)
}
消费者代码
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
consumer, err := sarama.NewConsumer([]string{"192.168.72.10:9092"}, nil)
if err != nil {
fmt.Println("create consumer err", err)
return
}
//根据topic获取分区
partitions, err := consumer.Partitions("aa")
if err != nil {
fmt.Println("get topic partitions err", err)
return
}
fmt.Println(partitions)
for partition := range partitions {
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("aa", int32(partition), sarama.OffsetNewest)
defer pc.AsyncClose()
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
//阻塞
select {
}
}
消费组代码
package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"sync"
"time"
)
type Consumer struct {
ready chan bool
consumerName string
}
func main() {
consumer := &Consumer{
ready: make(chan bool),
consumerName: "consumer1",
}
//consumer2 := &Consumer{
// ready: make(chan bool),
// consumerName: "consumer2",
//}
ctx, consumerCancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(2)
go startConsumer(ctx, []string{"test"}, consumer)
//go startConsumer(ctx, []string{"test"}, consumer2)
wg.Wait()
consumerCancel()
}
func (consumer *Consumer) Setup(_ sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
func (consumer *Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Printf("ConsumerName:%s Message claimed: value = %s, topic = %s,partition=%d", consumer.consumerName, string(message.Value), message.Topic, message.Partition)
session.MarkMessage(message, "")
}
return nil
}
func startConsumer(ctx context.Context, topics []string, consumer *Consumer) {
config := sarama.NewConfig()
//config.Consumer.Return.Errors = true
//config.Version = sarama.V0_11_0_2
//config.Net.SASL.Enable = true
//config.Net.SASL.User = "admin"
//config.Net.SASL.Password = "admin"
// consumer
consumerGroup, err := sarama.NewConsumerGroup([]string{"192.168.72.10:9092"}, "test-group", config)
if err != nil {
fmt.Printf("NewConsumerGroup create consumer error %s\n", err.Error())
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
//time.Sleep(1*time.Nanosecond)
time.Sleep(10*time.Minute)
for {
if err := consumerGroup.Consume(context.Background(), topics, consumer); err != nil {
fmt.Printf("Error from consumer: %v", err)
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready
select {
case <-ctx.Done():
fmt.Printf("kafka terminating: context cancelled")
}
wg.Wait()
}