在dubbo的项目基础上整合kafka

项目结构
image.png
目标: 实现 services 与 webs两个项目之间 消息通信

前提条件,

下载安装kafka https://blog.csdn.net/qq_39326472/article/details/98102837
最新版的 kafka 自带了 zooperker bin文件下有启动脚本
先开zp ,在启动 kafka

springboot项目改造

1:提供方改造

1:引入pom

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2: 配置文件,

bootstrap-servers: 写自己的kafka集群

  1. spring:
  2. kafka:
  3. bootstrap-servers: 127.0.0.1:9092
  4. consumer:
  5. auto:
  6. commit:
  7. interval:
  8. ms: 1000
  9. auto-offset-reset: latest
  10. enable-auto-commit: true
  11. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. properties:
  13. group:
  14. id: defaultConsumerGroup
  15. request:
  16. timeout:
  17. ms: 180000
  18. session:
  19. timeout:
  20. ms: 120000
  21. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  22. listener:
  23. missing-topics-fatal: false
  24. producer:
  25. acks: 1
  26. batch-size: 16384
  27. buffer-memory: 33554432
  28. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  29. properties:
  30. linger:
  31. ms: 0
  32. retries: 0
  33. value-serializer: org.apache.kafka.common.serialization.StringSerializer

3: sercies 来一个简单的demo

  1. @RestController
  2. public class KafkaProducer {
  3. @Autowired
  4. private KafkaTemplate<String, Object> kafkaTemplate;
  5. // 发送消息
  6. @GetMapping("/kafka/normal/{message}")
  7. public void sendMessage1(@PathVariable("message") String normalMessage) {
  8. kafkaTemplate.send("topic1", normalMessage);
  9. }
  10. }

说明1:

1: 如果不存在 topic
kafkaTemplate.send(“topic1”, normalMessage); 这个会自动创建一个 topic1 且分区为0,默认无副本
2:可以自定义topic 及分区,副本信息

其实什么都没做对吧~~~
KafkaTemplate 这个bean框架自动配置了
只需要注入,然后用里面的方法

4:启动项目

2:接受方改造

1:引入pom 同上

2:配置文件,同上

3:写个demo

  1. @Component
  2. public class KafkaConsumer {
  3. // 消费监听
  4. @KafkaListener(topics = {"topic1"})
  5. public void onMessage1(ConsumerRecord<?, ?> record){
  6. // 消费的哪个topic、partition的消息,打印出消息内容
  7. System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
  8. }
  9. }

其实也啥都没做对吧~~
@KafkaListener 开启监听
@Component 交给ioc管理

4: 启动应用,postman调用一下上面暴露的接口

成功
image.png

看 kafka的日志
image.png

demo 哦了~~~

进阶

1:自定义 topic

上面 用 kafkaTemplate.send(“topic1”, normalMessage); 会默认生成一个分区为1 ,topic为topic1,显然不符合业务需求
NewTopic 提供自定义 topic
提供三个构造
String topic名称
int 分区数
short 副本数
image.png
当然也可以直接在客户端操作
作为bean注入

  1. @Configuration
  2. public class Topics {
  3. @Bean
  4. public NewTopic initTopic(){
  5. return new NewTopic("works-services-topic",8,(short) 1);
  6. }
  7. }

可以修改,分区数量只能增加,数据不会丢失

重启一下项目,查看刚刚建立的topic
命令 (linux 下用sh)
kafka-topics.bat —zookeeper 127.0.0.1:2181 —topic works-services-topic —describe
image.png

2:改造一下消费者

加上了 groupId 和id参数
理解为给他一个定位(那个村子的,叫啥)
image.png
查看一下guoup 列表
命令 kafka-consumer-groups.bat —bootstrap-server 127.0.0.1:9292 —list
image.png
查看group 信息
kafka-consumer-groups.bat —bootstrap-server 127.0.0.1:9292 —group —describe

3:命令行太麻烦

客户端工具
https://www.kafkatool.com/download2/offsetexplorer_64bit.exe
或者 CMAK web端查看

image.png

4:给 group里在加一个成员

image.png

调用一下接口
同组内只有一个成员能消费

image.png