Kafka消费服务孵化
项目开发过程中会有异步任务的场景,而kafka作为一款性能非常高效的队列服务,被广泛使用,因此我们的孵化器增加了Kafka消费服务孵化的功能。
消息如何生产?
我们专门提供了一个MQ消息代理中间件Pan
,除kafka
外,还支持rabbitmq
、rocketmq
、nsq
等消息的生产
更多详细介绍请参考Pan的帮助文档
使用hera孵化kafka消费服务
新建一个main包main.go
, 以下是一个通用模板,唯一变化的地方就是//注入处理方法
package main
import (
bs "github.com/tal-tech/hera/bootstrap"
"github.com/tal-tech/hera/kafkaconsumer"
"github.com/tal-tech/xesTools/confutil"
_ "github.com/tal-tech/xesTools/expvarutil"
"github.com/tal-tech/xesTools/flagutil"
"os"
"os/signal"
"syscall"
)
var (
exit = make(chan os.Signal, 2)
)
func main() {
//init config
confutil.InitConfig()
// watch exit signal
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
//生成kafka消费者服务
s := kafkaconsumer.NewServer()
s.AddBeforeServerStartFunc(bs.InitLogger(),
bs.InitTraceLogger("HS-Golang", "0.1"),
bs.InitPerfutil(),
bs.InitPprof(),
)
s.AddAfterServerStopFunc(bs.CloseLogger())
//注入处理方法
s.InjectHandleFuncs(handlerFuncs)
err := s.Start()
if err != nil {
panic(err)
}
// wait for exit signal
<-exit
s.Stop()
}
配置书写
[KafkaServer]
groupName=
consumerCnt=5
;多个用逗号分隔
topic=test,test1
failTopic=falitopic
;多个broker用逗号分隔
kafkaHost=localhost:9092
消息处理方法
待注入的参数必须是[]kafkaconsumer.HandlerFunc
类型,该类型的定义如下:
type HandlerFunc struct {
//消息标识符
MessageKey string
//标识符所对应的处理方法
//参数[]byte 就是存储在kafka中的消息体
Func func([]byte) error
}
解释:
- 标识符
MessageKey
和Func
一一对应 - 消息体的协议格式为:MessageKey+” “+实际消息,注意中间有一个空格,所以我们在定义标识符时一定不能包含空格,否则解析消息时会产生错误
创建消费函数 funcs.go
package main
import(
"fmt"
"git.100tal.com/wangxiao_go_lib/hera/kafkaconsumer"
)
var handlerFuncs = []kafkaconsumer.HandlerFunc{
{
MessageKey: "kafka_msg_test1",
Func: daalFunc,
},
}
func daalFunc(v []byte) (err error) {
fmt.Println(string(v))
return
}
消费安全控制级别
At Least One
即至少消费一次