在dubbo的项目基础上整合kafka
项目结构
目标: 实现 services 与 webs两个项目之间 消息通信
前提条件,
下载安装kafka https://blog.csdn.net/qq_39326472/article/details/98102837
最新版的 kafka 自带了 zooperker bin文件下有启动脚本
先开zp ,在启动 kafka
springboot项目改造
1:提供方改造
1:引入pom
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2: 配置文件,
bootstrap-servers: 写自己的kafka集群
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:auto:commit:interval:ms: 1000auto-offset-reset: latestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:group:id: defaultConsumerGrouprequest:timeout:ms: 180000session:timeout:ms: 120000value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:missing-topics-fatal: falseproducer:acks: 1batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 0retries: 0value-serializer: org.apache.kafka.common.serialization.StringSerializer
3: sercies 来一个简单的demo
@RestControllerpublic class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// 发送消息@GetMapping("/kafka/normal/{message}")public void sendMessage1(@PathVariable("message") String normalMessage) {kafkaTemplate.send("topic1", normalMessage);}}
说明1:
1: 如果不存在 topic
kafkaTemplate.send(“topic1”, normalMessage); 这个会自动创建一个 topic1 且分区为0,默认无副本
2:可以自定义topic 及分区,副本信息
其实什么都没做对吧~~~
KafkaTemplate 这个bean框架自动配置了
只需要注入,然后用里面的方法
4:启动项目
2:接受方改造
1:引入pom 同上
2:配置文件,同上
3:写个demo
@Componentpublic class KafkaConsumer {// 消费监听@KafkaListener(topics = {"topic1"})public void onMessage1(ConsumerRecord<?, ?> record){// 消费的哪个topic、partition的消息,打印出消息内容System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());}}
其实也啥都没做对吧~~
@KafkaListener 开启监听
@Component 交给ioc管理
4: 启动应用,postman调用一下上面暴露的接口
成功
看 kafka的日志
demo 哦了~~~
进阶
1:自定义 topic
上面 用 kafkaTemplate.send(“topic1”, normalMessage); 会默认生成一个分区为1 ,topic为topic1,显然不符合业务需求
NewTopic 提供自定义 topic
提供三个构造
String topic名称
int 分区数
short 副本数
当然也可以直接在客户端操作
作为bean注入
@Configurationpublic class Topics {@Beanpublic NewTopic initTopic(){return new NewTopic("works-services-topic",8,(short) 1);}}
可以修改,分区数量只能增加,数据不会丢失
重启一下项目,查看刚刚建立的topic
命令 (linux 下用sh)
kafka-topics.bat —zookeeper 127.0.0.1:2181 —topic works-services-topic —describe
2:改造一下消费者
加上了 groupId 和id参数
理解为给他一个定位(那个村子的,叫啥)
查看一下guoup 列表
命令 kafka-consumer-groups.bat —bootstrap-server 127.0.0.1:9292 —list
查看group 信息
kafka-consumer-groups.bat —bootstrap-server 127.0.0.1:9292 —group —describe
3:命令行太麻烦
客户端工具
https://www.kafkatool.com/download2/offsetexplorer_64bit.exe
或者 CMAK web端查看
4:给 group里在加一个成员

调用一下接口
同组内只有一个成员能消费

