delaymq


本小节主要介绍如何使用pan中的延时队列功能,目前延时队列仅支持kafka。

1、修改pan中kafka和delaymq相关配置

  1. [KafkaProxy]
  2. enable=true//开关
  3. KafkaWaitAll=true
  4. KafkaCompression=true
  5. KafkaPartitioner=round
  6. KafkaProducerTimeout=10
  7. brokers=localhost:9092
  8. sasl=false
  9. user=
  10. password=
  11. valid= //topic白名单,若为空则所有topic均可发送
  12. fallback=false //消息发送失败是否回传(若回传,一直失败,可能会造成CPU短暂飙升)
  13. [DelayMQProxy]
  14. enable=true //开关
  15. window=5 //延时容错s
  16. key=delay_queue //redis中key名称
  17. limit=50 //一次性获取数据限制,建议设置为1000以下

配置文档请点击这里

2、编译

  1. make

3、运行

  1. ./bin/pan -c ../conf/conf.ini

4、业务代码

业务方往pan中发送延时消息,需要引入delaymqutil,具体使用方法如下

使用方法

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/tal-tech/xtools/delaymqutil"
  6. "github.com/spf13/cast"
  7. )
  8. func main() {
  9. t := time.Tick(3 * time.Second)
  10. count := 0
  11. for {
  12. select {
  13. case <-t:
  14. count++
  15. now := time.Now().Format("2006-01-02 15:04:05")
  16. sendTimestamp := time.Now().Unix() + 10
  17. err := delaymqutil.Send2Proxy("kafka", "test", "", []byte("kafka "+cast.ToString(count)+" "+now), sendTimestamp)
  18. if err != nil {
  19. fmt.Println(err)
  20. }
  21. continue
  22. }
  23. }
  24. }

使用配置

  1. [DelayMQProxy]
  2. unix=/home/www/pan.xesv5.com/pan.sock //pan的sock文件地址
  3. host=localhost:9999 //pan的ip:port地址

注意事项

注意go.mod文件中替换包

  1. replace github.com/henrylee2cn/teleport v5.0.0+incompatible => github.com/hhtlxhhxy/github.com_henrylee2cn_teleport v1.0.0
  2. replace github.com/henrylee2cn/teleport v0.0.0 => github.com/hhtlxhhxy/github.com_henrylee2cn_teleport v1.0.0