delaymq
本小节主要介绍如何使用pan中的延时队列功能,目前延时队列仅支持kafka。
1、修改pan中kafka和delaymq相关配置
[KafkaProxy]enable=true//开关KafkaWaitAll=trueKafkaCompression=trueKafkaPartitioner=roundKafkaProducerTimeout=10brokers=localhost:9092sasl=falseuser=password=valid= //topic白名单,若为空则所有topic均可发送fallback=false //消息发送失败是否回传(若回传,一直失败,可能会造成CPU短暂飙升)[DelayMQProxy]enable=true //开关window=5 //延时容错skey=delay_queue //redis中key名称limit=50 //一次性获取数据限制,建议设置为1000以下
配置文档请点击这里
2、编译
make
3、运行
./bin/pan -c ../conf/conf.ini
4、业务代码
业务方往pan中发送延时消息,需要引入delaymqutil,具体使用方法如下
使用方法
package mainimport ("fmt""time""github.com/tal-tech/xtools/delaymqutil""github.com/spf13/cast")func main() {t := time.Tick(3 * time.Second)count := 0for {select {case <-t:count++now := time.Now().Format("2006-01-02 15:04:05")sendTimestamp := time.Now().Unix() + 10err := delaymqutil.Send2Proxy("kafka", "test", "", []byte("kafka "+cast.ToString(count)+" "+now), sendTimestamp)if err != nil {fmt.Println(err)}continue}}}
使用配置
[DelayMQProxy]unix=/home/www/pan.xesv5.com/pan.sock //pan的sock文件地址host=localhost:9999 //pan的ip:port地址
注意事项
注意go.mod文件中替换包
replace github.com/henrylee2cn/teleport v5.0.0+incompatible => github.com/hhtlxhhxy/github.com_henrylee2cn_teleport v1.0.0或replace github.com/henrylee2cn/teleport v0.0.0 => github.com/hhtlxhhxy/github.com_henrylee2cn_teleport v1.0.0
