安装启动zookeeper
docker run -d —name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
安裝启动kafka
docker run -d —name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.186.130:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.186.130:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
查看zookeeper和kafka是否安装成功
创建topic test
进入kafka容器,
docker exec -it kafka /bin/sh
cd /opt/kafka_2.13-2.7.0/bin (进入自己的版本)
./kafka-console-producer.sh —broker-list localhost:9092 —topic test
SpringBoot集成Kafka
添加依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--https://search.maven.org/artifact/org.springframework.kafka/spring-kafka/2.6.10/jar--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.10</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
配置kafka
application.yml
server:port: 8080servlet:context-path: /demospring:kafka:bootstrap-servers: 192.168.186.130:9092 #kafka地址producer:retries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: javabb-demo-mq-kafka# 手动提交enable-auto-commit: falseauto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 60000listener:log-container-config: falseconcurrency: 5# 手动提交ack-mode: manual_immediate
定义常量:kafkaConsts
public class KafkaConsts {/*** 默认分区大小*/public final static Integer DEFAULT_PARTITION_NUM = 3;/*** Topic 名称*/public final static String TOPIC_TEST = "test";}
自定义kafka配置:KafkaConfig
@Configuration@EnableConfigurationProperties({KafkaProperties.class})@EnableKafka@AllArgsConstructorpublic class KafkaConfig {private final KafkaProperties kafkaProperties;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);factory.setBatchListener(true);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());}@Bean("ackContainerFactory")public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);return factory;}}
创建消息消费者
@Slf4j@Componentpublic class MessageHandle {@KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {try {String message = (String) record.value();log.info("收到消息: {}", message);} catch (Exception e) {log.error(e.getMessage(), e);} finally {// 手动提交 offsetacknowledgment.acknowledge();}}}
Springboot启动类
@SpringBootApplicationpublic class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}}
启动KafkaApplication
创建测试类,发送消息测试
@RunWith(SpringRunner.class)@SpringBootTest(classes = KafkaApplication.class)public class kafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 测试发送消息*/@Testpublic void testSend() {kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka...");}}

在服务器中也可以发送消息测试
./kafka-console-producer.sh —broker-list localhost:9092 —topic test
输入消息。
