pom配置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springcloud-demo</artifactId> <groupId>demo</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>demo-stream-rabbit-provider</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--sentinel--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> <!--健康监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <!-- mybatis 和SpringBoot 整合--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> </dependency> <!-- MySQL 驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> </dependency> <!-- jdbc --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> </dependencies></project>
yaml配置
server: port: 8003spring: application: name: demo-stream-rabbit-provider datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.2.20:30569/test?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: root cloud: nacos: server-addr: 192.168.2.20:30010 username: nacos password: nacos sentinel: transport: dashboard: 192.168.2.20:31596 # 控制台的安装位置 # port: 8719 # 与sentinel单独连接的端口 # client-ip: 192.168.2.6 # 本机的ip,如果sentinel装在虚拟机,必须配这个 # port: 30195 # 与sentinel单独连接的端口 # client-ip: 192.168.2.20 # 本机的ip,如果sentinel装在虚拟机,必须配这个 stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型# environment: # 设置rabbitmq的相关的环境配置# spring:# rabbitmq:# host: 192.168.2.20# port: 31672# username: user# password: ze2tgb2WGC bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: default-group input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: default-group rabbitmq: host: 192.168.2.20 port: 31672 username: user password: ze2tgb2WGC# 暴露应用信息management: endpoints: web: exposure: include: '*'
示例:controller
package com.test.rabbit.consumer.controller;import com.test.rabbit.consumer.service.MessageProviderService;import lombok.RequiredArgsConstructor;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @author jia * @since 2022-04-05 11:39 */@RestController@RequestMapping("/rabbit")@RequiredArgsConstructorpublic class SendMessageController { protected final MessageProviderService messageProviderService; @GetMapping("/sendMessage") public String sendMessage() { return messageProviderService.send(); }}
示例:service
package com.test.rabbit.consumer.serviceimpl;import com.test.rabbit.consumer.service.MessageProviderService;import lombok.extern.slf4j.Slf4j;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.support.MessageBuilder;import javax.annotation.Resource;import java.util.UUID;/** * @author jia * @since 2022-04-05 11:42 */@EnableBinding(Source.class)@Slf4jpublic class MessageProviderServiceImpl implements MessageProviderService { @Resource MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); log.info("***** serial: {}", serial); return null; }}
其它
使用原生方式
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("transactional.id", "my-transactional-id");Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());producer.initTransactions();try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close();} catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction();}producer.close();
参考
https://www.orchome.com/303