一、创建监听者
    二、创建消息队列和交换机,并给定路由键——routing-key

    1. @Component
    2. public class JSAgentProfitListener {
    3. private static Logger logger = LoggerFactory.getLogger(JSAgentProfitListener.class);
    4. private static final String ROUTING_KEY_SIGN = "routingKey";
    5. @Autowired
    6. private JSAgentProfitService jsAgentProfitService;
    7. @RabbitListener(
    8. //1、将消费队列和交换机进行绑定
    9. bindings = {@QueueBinding(
    10. //1.1 创建队列
    11. value = @Queue(
    12. //1.1.1 给队列起一个名字
    13. value = "${e6yun3.auto.estimated.read.queueName:mq_e6yun3_business_js_agent_profit}",
    14. //1.1.2 是否持久化
    15. durable = "${e6yun3.auto.estimated.read.durable:true}",
    16. //1.1.3 消费者消费消息之后是否自动删除
    17. autoDelete = "${e6yun3.auto.estimated.read.autoDelete:false}",
    18. //1.1.4 排他性
    19. exclusive = "${e6yun3.auto.estimated.read.exclusive:false}"
    20. ),
    21. //1.2 创建交换机
    22. exchange = @Exchange(
    23. //1.2.1 给交换机起一个名字
    24. value = "${e6yun3.business.exchange:ex_e6yun3_financial}",
    25. //1.2.2 起确定交换机的类型
    26. type = "${e6yun3.auto.estimated.read.type:topic}"
    27. ),
    28. //1.3 确定路由键
    29. key = {"e6yun3.financial.waybillPayFee","e6yun3.financial.waybillReceiveFee"}
    30. )},
    31. containerFactory = "defaultFactory"
    32. )
    33. public void processor(Message message,
    34. Channel channel,
    35. @Header(name = "amqp_deliveryTag") long deliveryTag,
    36. @Header("amqp_redelivered") boolean redelivered){
    37. try {
    38. //2、处理监听到的消息
    39. doProcessor(message);
    40. } catch (Exception e) {
    41. logger.error("解析MQ数据异常,message = " + MqHandleUtils.getMessageContent(message) + ": ", e);
    42. } finally {
    43. MqHandleUtils.handleReceiver(channel, deliveryTag);
    44. }
    45. }
    46. private void doProcessor(Message message) {
    47. //2.1 获取路由键
    48. String routingKey = findRoutingKey(message);
    49. //2.2 将监听到的消息转化为json格式
    50. String messageStr = MqHandleUtils.getMessageContent(message);
    51. BusinessMessageContentVO messageVo = JSONObject.parseObject(messageStr, BusinessMessageContentVO.class);
    52. Integer corpId = messageVo.getCorpId();
    53. UserContextHelper.setCorpId(corpId);
    54. UserContextHelper.setUserId(0);
    55. UserContextHelper.saveHeaderInThreadLocal(ConstOfUserContext.CORP_ID, corpId.toString());
    56. UserContextHelper.initUser();
    57. logger.info("@JSAgentProfitListener:doProcessor");
    58. //2.3 通过消息中的内容进行处理业务逻辑
    59. E6Wrapper e6Wrapper = jsAgentProfitService.insertAgentProfitData(messageVo,routingKey);
    60. }
    61. /**
    62. * 找RoutingKey
    63. *
    64. * @param message 。
    65. * @return 。
    66. */
    67. private String findRoutingKey(Message message) {
    68. String routingKey = "";
    69. //2.1.1 通过消息头获取路由键
    70. Map<String, Object> headers = message.getMessageProperties().getHeaders();
    71. if (!MapUtils.isEmpty(headers)) {
    72. routingKey = headers.get(ROUTING_KEY_SIGN).toString();
    73. if (StringUtils.isEmpty(routingKey)) {
    74. logger.warn("消息错误,Headers中没有RoutingKey, message = {}", MqHandleUtils.getMessageContent(message));
    75. }
    76. } else {
    77. logger.warn("消息错误,Headers中没有值, message = {}", MqHandleUtils.getMessageContent(message));
    78. }
    79. return routingKey;
    80. }
    81. }

    三、业务逻辑的处理

    1. @Override
    2. public E6Wrapper insertAgentProfitData(BusinessMessageContentVO messageVo, String routingKey) {
    3. //1. rpc传参:ProfitableQueryVO调用财务接口,返回:ContractProfitableVO
    4. ProfitableQueryVO profitableQueryVO = new ProfitableQueryVO();
    5. //2.3.1 将消息中的内容进行赋值
    6. BeanUtils.copyProperties(messageVo, profitableQueryVO);
    7. //2.3.2 通过消息中的内容并通过RPC接口进行调用
    8. E6Wrapper<List<ContractProfitableVO>> profitableFeeInfo = contractFeeRpcService.findProfitableFeeInfo(profitableQueryVO);
    9. List<ContractProfitableVO> contractProfitableVOList = profitableFeeInfo.getResult();
    10. logger.info("@insertAgentProfitData rpc invoke");
    11. if (CollectionUtils.isEmpty(contractProfitableVOList)) {
    12. return E6WrapperUtil.error(I18nDictUtil.getI18nValue("rpc返回数据为空!"));
    13. }
    14. //2.处理业务数据,封装AgentProfitInfoPO
    15. ContractProfitableVO contractProfitableVO = contractProfitableVOList.get(0);
    16. AgentProfitInfoPO agentProfitInfoPO = new AgentProfitInfoPO();
    17. setAgentProfitInfoPOData(contractProfitableVO, agentProfitInfoPO);
    18. agentProfitInfoMapper.save(agentProfitInfoPO);
    19. return E6WrapperUtil.ok();
    20. }