导读
由于项目需要,这里面整合springboot和kafka,简单的介绍其使用教程。
使用
初始
依赖POM
在springboot项目中引入kafka的依赖。
<!--kafka,这里没有加版本号,使用的是springboot默认的,不然容易出现版本不一致导致错误--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.71</version></dependency>
配置文件
application.properties
spring.application.name=kafka-tsserver.port=8080#============== kafka ===================# 指定kafka 代理地址,可以多个spring.kafka.bootstrap-servers=localhost:9092#=============== provider =======================spring.kafka.producer.retries=3# 每次批量发送消息的数量spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432# 指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer =======================# 指定默认消费者group idspring.kafka.consumer.group-id=ts-log-groupspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
application.yml ```yaml server: port: 8080 spring: application: name: kafka-ts
指定kafka 代理地址,可以多个
kafka: bootstrap-servers: localhost:9092 producer: #生产者
retries: 3 #重试次数batch-size: 16384 #每次批量发送消息的数量buffer-memory: 3354432key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定消息key和消息体的编解码方式value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: #消费者
group-id: ts-log-groupauto-offset-reset: earliestenable-auto-commit: trueauto-commit-interval: 100key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
<a name="ynKSE"></a>### 实体类```javaimport com.fasterxml.jackson.annotation.JsonFormat;import lombok.Data;import java.util.Date;@Datapublic class Ts {private String id;private String name;private Integer age;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")private Date cTime;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")private Date insertime;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")private Date updateTime;}
生产者
import com.alibaba.fastjson.JSON;import com.ifs.Ts;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;import java.util.Date;/*** kafka的生产者** @author heioky* @date 2020/10/27*/@Componentpublic class TsProducer {Logger logger = LoggerFactory.getLogger(TsProducer.class);@Autowiredprivate KafkaTemplate kafkaTemplate;/*** @Description: 生产者发送消息* @return: void* @Author: heioky* @Date:2020/10/27 13:48*/public void send(int i) {long beginTime = System.currentTimeMillis();Ts ts = new Ts();ts.setId(String.valueOf(i));ts.setUpdateTime(new Date());ts.setAge(18);ts.setName("不知火");kafkaTemplate.send("kafka-ts", JSON.toJSONString(ts));long endTime = System.currentTimeMillis();logger.info("生产者发送消息,消耗时间: {}ms", endTime - beginTime);}}
消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.util.Optional;/*** kafka的消费者** @author heioky* @date 2020/10/27*/@Componentpublic class TsConsumer {Logger logger = LoggerFactory.getLogger(TsConsumer.class);/*** @Description: 消费者消费数据,这里面topic可以是一个或者多个* @Param: [consumerRecord]* @return: void* @Author: heioky* @Date:2020/10/27 14:09*/@KafkaListener(topics = {"kafka-ts"})public void getMsgInfo(ConsumerRecord<?, ?> consumerRecord) {//判断是否为nullOptional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());if (kafkaMessage.isPresent()) {//得到Optional实例中的值Object message = kafkaMessage.get();logger.info("获取消费消息:{}", message);}}}
启动类
import com.ifs.kafka.TsProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;/*** 启动** @author heioky* @date 2020/10/27*/@SpringBootApplicationpublic class Sinfo implements CommandLineRunner {@Autowiredprivate TsProducer tsProducer;public static void main(String[] args) {SpringApplication.run(Sinfo.class, args);}@Overridepublic void run(String... args) throws Exception {for (int i = 1; i <= 10; i++) {tsProducer.send(i);}}}
控制台打印
- 生产者

- 消费者
问题
Caused by: java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessorat java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_202]at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_202]at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_202]at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_202]... 25 common frames omitted
原因:spring-kafka的版本和springboot的不一致导致的,去掉其版本号,使用springboot默认的即可。
接收多个topic
消费者,使用@KafkaListeners注解来接受多个消息。
/*** @Description: 消费者消费数据,这里面topic可以是一个或者多个* @Param: [consumerRecord]* @return: void* @Author: heioky* @Date:2020/10/27 14:09*/@KafkaListeners({@KafkaListener(topics = {"kafka-ts"}, groupId = "ts-log-group"),@KafkaListener(topics = {"kafka-two"}, groupId = "ts-log-group")})public void getMsgInfo(String record) {//判断是否为nullOptional<?> kafkaMessage = Optional.ofNullable(record);if (kafkaMessage.isPresent()) {//得到Optional实例中的值Object message = kafkaMessage.get();logger.info("获取消费消息:{}", message);}}
生产者指定对应的topic
/*** @Description: 生产者发送消息* @return: void* @Author: heioky* @Date:2020/10/27 13:48*/public void sendTow(String unid) {long beginTime = System.currentTimeMillis();Ts ts = new Ts();ts.setId(unid);ts.setUpdateTime(new Date());ts.setAge(23);ts.setName("妖刀姬");kafkaTemplate.send("kafka-two", JSON.toJSONString(ts));long endTime = System.currentTimeMillis();logger.info("生产者sendTow发送消息,消耗时间: {}ms", endTime - beginTime);}
控制台打印
END
搞定~

