一、创建监听者
二、创建消息队列和交换机,并给定路由键——routing-key
@Component
public class JSAgentProfitListener {
private static Logger logger = LoggerFactory.getLogger(JSAgentProfitListener.class);
private static final String ROUTING_KEY_SIGN = "routingKey";
@Autowired
private JSAgentProfitService jsAgentProfitService;
@RabbitListener(
//1、将消费队列和交换机进行绑定
bindings = {@QueueBinding(
//1.1 创建队列
value = @Queue(
//1.1.1 给队列起一个名字
value = "${e6yun3.auto.estimated.read.queueName:mq_e6yun3_business_js_agent_profit}",
//1.1.2 是否持久化
durable = "${e6yun3.auto.estimated.read.durable:true}",
//1.1.3 消费者消费消息之后是否自动删除
autoDelete = "${e6yun3.auto.estimated.read.autoDelete:false}",
//1.1.4 排他性
exclusive = "${e6yun3.auto.estimated.read.exclusive:false}"
),
//1.2 创建交换机
exchange = @Exchange(
//1.2.1 给交换机起一个名字
value = "${e6yun3.business.exchange:ex_e6yun3_financial}",
//1.2.2 起确定交换机的类型
type = "${e6yun3.auto.estimated.read.type:topic}"
),
//1.3 确定路由键
key = {"e6yun3.financial.waybillPayFee","e6yun3.financial.waybillReceiveFee"}
)},
containerFactory = "defaultFactory"
)
public void processor(Message message,
Channel channel,
@Header(name = "amqp_deliveryTag") long deliveryTag,
@Header("amqp_redelivered") boolean redelivered){
try {
//2、处理监听到的消息
doProcessor(message);
} catch (Exception e) {
logger.error("解析MQ数据异常,message = " + MqHandleUtils.getMessageContent(message) + ": ", e);
} finally {
MqHandleUtils.handleReceiver(channel, deliveryTag);
}
}
private void doProcessor(Message message) {
//2.1 获取路由键
String routingKey = findRoutingKey(message);
//2.2 将监听到的消息转化为json格式
String messageStr = MqHandleUtils.getMessageContent(message);
BusinessMessageContentVO messageVo = JSONObject.parseObject(messageStr, BusinessMessageContentVO.class);
Integer corpId = messageVo.getCorpId();
UserContextHelper.setCorpId(corpId);
UserContextHelper.setUserId(0);
UserContextHelper.saveHeaderInThreadLocal(ConstOfUserContext.CORP_ID, corpId.toString());
UserContextHelper.initUser();
logger.info("@JSAgentProfitListener:doProcessor");
//2.3 通过消息中的内容进行处理业务逻辑
E6Wrapper e6Wrapper = jsAgentProfitService.insertAgentProfitData(messageVo,routingKey);
}
/**
* 找RoutingKey
*
* @param message 。
* @return 。
*/
private String findRoutingKey(Message message) {
String routingKey = "";
//2.1.1 通过消息头获取路由键
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (!MapUtils.isEmpty(headers)) {
routingKey = headers.get(ROUTING_KEY_SIGN).toString();
if (StringUtils.isEmpty(routingKey)) {
logger.warn("消息错误,Headers中没有RoutingKey, message = {}", MqHandleUtils.getMessageContent(message));
}
} else {
logger.warn("消息错误,Headers中没有值, message = {}", MqHandleUtils.getMessageContent(message));
}
return routingKey;
}
}
三、业务逻辑的处理
@Override
public E6Wrapper insertAgentProfitData(BusinessMessageContentVO messageVo, String routingKey) {
//1. rpc传参:ProfitableQueryVO调用财务接口,返回:ContractProfitableVO
ProfitableQueryVO profitableQueryVO = new ProfitableQueryVO();
//2.3.1 将消息中的内容进行赋值
BeanUtils.copyProperties(messageVo, profitableQueryVO);
//2.3.2 通过消息中的内容并通过RPC接口进行调用
E6Wrapper<List<ContractProfitableVO>> profitableFeeInfo = contractFeeRpcService.findProfitableFeeInfo(profitableQueryVO);
List<ContractProfitableVO> contractProfitableVOList = profitableFeeInfo.getResult();
logger.info("@insertAgentProfitData rpc invoke");
if (CollectionUtils.isEmpty(contractProfitableVOList)) {
return E6WrapperUtil.error(I18nDictUtil.getI18nValue("rpc返回数据为空!"));
}
//2.处理业务数据,封装AgentProfitInfoPO
ContractProfitableVO contractProfitableVO = contractProfitableVOList.get(0);
AgentProfitInfoPO agentProfitInfoPO = new AgentProfitInfoPO();
setAgentProfitInfoPOData(contractProfitableVO, agentProfitInfoPO);
agentProfitInfoMapper.save(agentProfitInfoPO);
return E6WrapperUtil.ok();
}