Kafka消费服务孵化


项目开发过程中会有异步任务的场景,而kafka作为一款性能非常高效的队列服务,被广泛使用,因此我们的孵化器增加了Kafka消费服务孵化的功能。

消息如何生产?

我们专门提供了一个MQ消息代理中间件Pan,除kafka外,还支持rabbitmqrocketmqnsq等消息的生产
更多详细介绍请参考Pan的帮助文档

使用hera孵化kafka消费服务

新建一个main包main.go, 以下是一个通用模板,唯一变化的地方就是//注入处理方法

  1. package main
  2. import (
  3. bs "github.com/tal-tech/hera/bootstrap"
  4. "github.com/tal-tech/hera/kafkaconsumer"
  5. "github.com/tal-tech/xesTools/confutil"
  6. _ "github.com/tal-tech/xesTools/expvarutil"
  7. "github.com/tal-tech/xesTools/flagutil"
  8. "os"
  9. "os/signal"
  10. "syscall"
  11. )
  12. var (
  13. exit = make(chan os.Signal, 2)
  14. )
  15. func main() {
  16. //init config
  17. confutil.InitConfig()
  18. // watch exit signal
  19. signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
  20. //生成kafka消费者服务
  21. s := kafkaconsumer.NewServer()
  22. s.AddBeforeServerStartFunc(bs.InitLogger(),
  23. bs.InitTraceLogger("HS-Golang", "0.1"),
  24. bs.InitPerfutil(),
  25. bs.InitPprof(),
  26. )
  27. s.AddAfterServerStopFunc(bs.CloseLogger())
  28. //注入处理方法
  29. s.InjectHandleFuncs(handlerFuncs)
  30. err := s.Start()
  31. if err != nil {
  32. panic(err)
  33. }
  34. // wait for exit signal
  35. <-exit
  36. s.Stop()
  37. }

配置书写

  1. [KafkaServer]
  2. groupName=
  3. consumerCnt=5
  4. ;多个用逗号分隔
  5. topic=test,test1
  6. failTopic=falitopic
  7. ;多个broker用逗号分隔
  8. kafkaHost=localhost:9092

消息处理方法

待注入的参数必须是[]kafkaconsumer.HandlerFunc类型,该类型的定义如下:

  1. type HandlerFunc struct {
  2. //消息标识符
  3. MessageKey string
  4. //标识符所对应的处理方法
  5. //参数[]byte 就是存储在kafka中的消息体
  6. Func func([]byte) error
  7. }

解释:

  • 标识符MessageKeyFunc 一一对应
  • 消息体的协议格式为:MessageKey+” “+实际消息,注意中间有一个空格,所以我们在定义标识符时一定不能包含空格,否则解析消息时会产生错误

创建消费函数 funcs.go

  1. package main
  2. import(
  3. "fmt"
  4. "git.100tal.com/wangxiao_go_lib/hera/kafkaconsumer"
  5. )
  6. var handlerFuncs = []kafkaconsumer.HandlerFunc{
  7. {
  8. MessageKey: "kafka_msg_test1",
  9. Func: daalFunc,
  10. },
  11. }
  12. func daalFunc(v []byte) (err error) {
  13. fmt.Println(string(v))
  14. return
  15. }

消费安全控制级别

At Least One 即至少消费一次