消息中间件
消息中间件概念
消息队列中间件是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
消息中间件作用
解耦、冗余(存储数据)、扩展性、削峰、可恢复性、顺序保证、缓冲、异步通信。
RabbitMQ入门
生产者和消费者
RabbitMQ整体结构图如下,由生产者产生消息,通过绑定的交换机发送到对应的队列种,再由消费者消费队列种的消息。
- Producer: 生产者,投递消息的一方;
- Consumer: 消费者,接受消息的一方;
Broker: 整个中间部分是broker,是一个服务节点,
队列
Queue: 队列,存储消息的内部对象。
- 队列消费的两种方式:
- 平均分摊(Round-Robin):每个消费者轮流消费一个消息
- 公平分发(Fair Dispatch):消费者消费手动发送确认之后,即可获取下一条消息进行消费。
注意:队列层面不支持广播消费,一个消息只能被一个消费者消费。
交换器类型
交换机会根据不同规则,将消息广播到所有符合规则的队列中。
常用交换器类型四种,如下:fanout:会把消息路由到所有与该交换器绑定的队列中。
- direct:会把消息路由到所有BindingKey与RoutingKey完全匹配的队列中。
- topic:会把消息路由到所有BindingKey与RoutingKey符合匹配规则的队列中。
headers:根据发送消息中的headers(一个键值对)属性进行匹配;在绑定队列和交换器时定制一组键值对,如果消息中的键值对与定制键值对相同,则会路由到该队列。
SpringBoot中集成MQ
```java // 绑定mq的交换器和队列 @Configuration public class MqMatchConfiguration { @Bean public Queue startCapitalMatchYEQueue() {
return new Queue(CapitalBizConst.CAPITAL_MATCH_YE_QUEUE);
}
@Bean public TopicExchange startCapitalMatchExchange() {
return new TopicExchange(CapitalBizConst.CAPITAL_MATCH_EXCHANGE, true, false, false);
}
@Bean public Binding capitalMatchBinding() {
return BindingBuilder
.bind(startCapitalMatchYEQueue())
.to(startCapitalMatchExchange())
.with(CapitalBizConst.CAPITAL_MATCH_YE_QUEUE_ROUTINGKEY);
}
}
```java
// 消费者消费对应队列的消息
@Component
public class CapitalMatchMessageListener extends BaseService {
@Autowired
private BalancePayMatchService balancePayMatchService;
@RabbitListener(queues = CapitalBizConst.CAPITAL_MATCH_YE_QUEUE)
public void onMessage(Message message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
CapitalMatchReq capitalMatchReq = JSONObject.parseObject(msg, CapitalMatchReq.class);
log.info("接收到mq消息:待勾兑订单{}", JSON.toJSON(capitalMatchReq.getAppsheetserialno()));
String appsheetserialno = capitalMatchReq.getAppsheetserialno();
balancePayMatchService.match(appsheetserialno);
}
}