一、创建监听者
二、创建消息队列和交换机,并给定路由键——routing-key
@Componentpublic class JSAgentProfitListener {private static Logger logger = LoggerFactory.getLogger(JSAgentProfitListener.class);private static final String ROUTING_KEY_SIGN = "routingKey";@Autowiredprivate JSAgentProfitService jsAgentProfitService;@RabbitListener(//1、将消费队列和交换机进行绑定bindings = {@QueueBinding(//1.1 创建队列value = @Queue(//1.1.1 给队列起一个名字value = "${e6yun3.auto.estimated.read.queueName:mq_e6yun3_business_js_agent_profit}",//1.1.2 是否持久化durable = "${e6yun3.auto.estimated.read.durable:true}",//1.1.3 消费者消费消息之后是否自动删除autoDelete = "${e6yun3.auto.estimated.read.autoDelete:false}",//1.1.4 排他性exclusive = "${e6yun3.auto.estimated.read.exclusive:false}"),//1.2 创建交换机exchange = @Exchange(//1.2.1 给交换机起一个名字value = "${e6yun3.business.exchange:ex_e6yun3_financial}",//1.2.2 起确定交换机的类型type = "${e6yun3.auto.estimated.read.type:topic}"),//1.3 确定路由键key = {"e6yun3.financial.waybillPayFee","e6yun3.financial.waybillReceiveFee"})},containerFactory = "defaultFactory")public void processor(Message message,Channel channel,@Header(name = "amqp_deliveryTag") long deliveryTag,@Header("amqp_redelivered") boolean redelivered){try {//2、处理监听到的消息doProcessor(message);} catch (Exception e) {logger.error("解析MQ数据异常,message = " + MqHandleUtils.getMessageContent(message) + ": ", e);} finally {MqHandleUtils.handleReceiver(channel, deliveryTag);}}private void doProcessor(Message message) {//2.1 获取路由键String routingKey = findRoutingKey(message);//2.2 将监听到的消息转化为json格式String messageStr = MqHandleUtils.getMessageContent(message);BusinessMessageContentVO messageVo = JSONObject.parseObject(messageStr, BusinessMessageContentVO.class);Integer corpId = messageVo.getCorpId();UserContextHelper.setCorpId(corpId);UserContextHelper.setUserId(0);UserContextHelper.saveHeaderInThreadLocal(ConstOfUserContext.CORP_ID, corpId.toString());UserContextHelper.initUser();logger.info("@JSAgentProfitListener:doProcessor");//2.3 通过消息中的内容进行处理业务逻辑E6Wrapper e6Wrapper = jsAgentProfitService.insertAgentProfitData(messageVo,routingKey);}/*** 找RoutingKey** @param message 。* @return 。*/private String findRoutingKey(Message message) {String routingKey = "";//2.1.1 通过消息头获取路由键Map<String, Object> headers = message.getMessageProperties().getHeaders();if (!MapUtils.isEmpty(headers)) {routingKey = headers.get(ROUTING_KEY_SIGN).toString();if (StringUtils.isEmpty(routingKey)) {logger.warn("消息错误,Headers中没有RoutingKey, message = {}", MqHandleUtils.getMessageContent(message));}} else {logger.warn("消息错误,Headers中没有值, message = {}", MqHandleUtils.getMessageContent(message));}return routingKey;}}
三、业务逻辑的处理
@Overridepublic E6Wrapper insertAgentProfitData(BusinessMessageContentVO messageVo, String routingKey) {//1. rpc传参:ProfitableQueryVO调用财务接口,返回:ContractProfitableVOProfitableQueryVO profitableQueryVO = new ProfitableQueryVO();//2.3.1 将消息中的内容进行赋值BeanUtils.copyProperties(messageVo, profitableQueryVO);//2.3.2 通过消息中的内容并通过RPC接口进行调用E6Wrapper<List<ContractProfitableVO>> profitableFeeInfo = contractFeeRpcService.findProfitableFeeInfo(profitableQueryVO);List<ContractProfitableVO> contractProfitableVOList = profitableFeeInfo.getResult();logger.info("@insertAgentProfitData rpc invoke");if (CollectionUtils.isEmpty(contractProfitableVOList)) {return E6WrapperUtil.error(I18nDictUtil.getI18nValue("rpc返回数据为空!"));}//2.处理业务数据,封装AgentProfitInfoPOContractProfitableVO contractProfitableVO = contractProfitableVOList.get(0);AgentProfitInfoPO agentProfitInfoPO = new AgentProfitInfoPO();setAgentProfitInfoPOData(contractProfitableVO, agentProfitInfoPO);agentProfitInfoMapper.save(agentProfitInfoPO);return E6WrapperUtil.ok();}
