Kafka消费服务孵化
项目开发过程中会有异步任务的场景,而kafka作为一款性能非常高效的队列服务,被广泛使用,因此我们的孵化器增加了Kafka消费服务孵化的功能。
消息如何生产?
我们专门提供了一个MQ消息代理中间件Pan,除kafka外,还支持rabbitmq、rocketmq、nsq等消息的生产
更多详细介绍请参考Pan的帮助文档
使用hera孵化kafka消费服务
新建一个main包main.go, 以下是一个通用模板,唯一变化的地方就是//注入处理方法
package mainimport (bs "github.com/tal-tech/hera/bootstrap""github.com/tal-tech/hera/kafkaconsumer""github.com/tal-tech/xesTools/confutil"_ "github.com/tal-tech/xesTools/expvarutil""github.com/tal-tech/xesTools/flagutil""os""os/signal""syscall")var (exit = make(chan os.Signal, 2))func main() {//init configconfutil.InitConfig()// watch exit signalsignal.Notify(exit, os.Interrupt, syscall.SIGTERM)//生成kafka消费者服务s := kafkaconsumer.NewServer()s.AddBeforeServerStartFunc(bs.InitLogger(),bs.InitTraceLogger("HS-Golang", "0.1"),bs.InitPerfutil(),bs.InitPprof(),)s.AddAfterServerStopFunc(bs.CloseLogger())//注入处理方法s.InjectHandleFuncs(handlerFuncs)err := s.Start()if err != nil {panic(err)}// wait for exit signal<-exits.Stop()}
配置书写
[KafkaServer]groupName=consumerCnt=5;多个用逗号分隔topic=test,test1failTopic=falitopic;多个broker用逗号分隔kafkaHost=localhost:9092
消息处理方法
待注入的参数必须是[]kafkaconsumer.HandlerFunc类型,该类型的定义如下:
type HandlerFunc struct {//消息标识符MessageKey string//标识符所对应的处理方法//参数[]byte 就是存储在kafka中的消息体Func func([]byte) error}
解释:
- 标识符
MessageKey和Func一一对应 - 消息体的协议格式为:MessageKey+” “+实际消息,注意中间有一个空格,所以我们在定义标识符时一定不能包含空格,否则解析消息时会产生错误
创建消费函数 funcs.go
package mainimport("fmt""git.100tal.com/wangxiao_go_lib/hera/kafkaconsumer")var handlerFuncs = []kafkaconsumer.HandlerFunc{{MessageKey: "kafka_msg_test1",Func: daalFunc,},}func daalFunc(v []byte) (err error) {fmt.Println(string(v))return}
消费安全控制级别
At Least One 即至少消费一次
