在dubbo的项目基础上整合kafka
项目结构
目标: 实现 services 与 webs两个项目之间 消息通信
前提条件,
下载安装kafka https://blog.csdn.net/qq_39326472/article/details/98102837
最新版的 kafka 自带了 zooperker bin文件下有启动脚本
先开zp ,在启动 kafka
springboot项目改造
1:提供方改造
1:引入pom
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2: 配置文件,
bootstrap-servers: 写自己的kafka集群
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
auto:
commit:
interval:
ms: 1000
auto-offset-reset: latest
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
group:
id: defaultConsumerGroup
request:
timeout:
ms: 180000
session:
timeout:
ms: 120000
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
missing-topics-fatal: false
producer:
acks: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger:
ms: 0
retries: 0
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3: sercies 来一个简单的demo
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/kafka/normal/{message}")
public void sendMessage1(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("topic1", normalMessage);
}
}
说明1:
1: 如果不存在 topic
kafkaTemplate.send(“topic1”, normalMessage); 这个会自动创建一个 topic1 且分区为0,默认无副本
2:可以自定义topic 及分区,副本信息
其实什么都没做对吧~~~
KafkaTemplate 这个bean框架自动配置了
只需要注入,然后用里面的方法
4:启动项目
2:接受方改造
1:引入pom 同上
2:配置文件,同上
3:写个demo
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
其实也啥都没做对吧~~
@KafkaListener 开启监听
@Component 交给ioc管理
4: 启动应用,postman调用一下上面暴露的接口
成功
看 kafka的日志
demo 哦了~~~
进阶
1:自定义 topic
上面 用 kafkaTemplate.send(“topic1”, normalMessage); 会默认生成一个分区为1 ,topic为topic1,显然不符合业务需求
NewTopic 提供自定义 topic
提供三个构造
String topic名称
int 分区数
short 副本数
当然也可以直接在客户端操作
作为bean注入
@Configuration
public class Topics {
@Bean
public NewTopic initTopic(){
return new NewTopic("works-services-topic",8,(short) 1);
}
}
可以修改,分区数量只能增加,数据不会丢失
重启一下项目,查看刚刚建立的topic
命令 (linux 下用sh)
kafka-topics.bat —zookeeper 127.0.0.1:2181 —topic works-services-topic —describe
2:改造一下消费者
加上了 groupId 和id参数
理解为给他一个定位(那个村子的,叫啥)
查看一下guoup 列表
命令 kafka-consumer-groups.bat —bootstrap-server 127.0.0.1:9292 —list
查看group 信息
kafka-consumer-groups.bat —bootstrap-server 127.0.0.1:9292 —group —describe
3:命令行太麻烦
客户端工具
https://www.kafkatool.com/download2/offsetexplorer_64bit.exe
或者 CMAK web端查看
4:给 group里在加一个成员
调用一下接口
同组内只有一个成员能消费