kafka
本小节主要介绍如何使用pan快速完成kafka消息的生产,在此不再对kafka做相关的介绍,不太了解kafka的可以去kafka官网阅读相关文档。
1、启动zookeeper
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
2、启动kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties
3、创建topic
./bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
4、修改pan中kafka相关配置
[KafkaProxy]enable=true//开关KafkaWaitAll=trueKafkaCompression=trueKafkaPartitioner=roundKafkaProducerTimeout=10brokers=localhost:9092sasl=falseuser=password=valid= //topic白名单,若为空则所有topic均可发送failMode=retry/save/discard(无限次重试、保存到redis、丢弃)
配置文档请点击这里
5、编译
make
6、运行
./bin/pan -c ../conf/conf.ini
7、业务代码
业务方往pan中发消息,需要引入kafkautil,具体使用方法如下
使用方法
package mainimport ("fmt""time""github.com/tal-tech/xtools/kafkautil""github.com/spf13/cast")func main() {t := time.Tick(5 * time.Second)count := 0for {select {case <-t:count++err := kafkautil.Send2Proxy("test", []byte("kafka "+cast.ToString(count)))if err != nil {fmt.Println(err)}continue}}}
使用配置
[KafkaProxy]unix=/home/www/pan.xesv5.com/pan.sock //pan的sock文件地址host=localhost:9999 //pan的ip:port地址
注意事项
注意go.mod文件中替换包
replace github.com/henrylee2cn/teleport v5.0.0+incompatible => github.com/hhtlxhhxy/github.com_henrylee2cn_teleport v1.0.0或replace github.com/henrylee2cn/teleport v0.0.0 => github.com/hhtlxhhxy/github.com_henrylee2cn_teleport v1.0.0
