package com.yougou.ordercenter.web.listener.kafka.base;import com.yougou.ordercenter.web.listener.kafka.topic.ds.*;import com.yougou.ordercenter.web.listener.kafka.topic.retail.*;/*** 把零售的,电商的所有消费者topic都列出来,配置好映射关系** @author david* @site www.belle.net.cn* @company 百丽国际* @Date 创建时间:2021-04-13 21:36*/public enum ConsumerEnum {// 零售组(与零售系统交互)DISPATCH_INFO(OrderSyncDispatchInfoConsumer.TOPIC, OrderSyncDispatchInfoConsumer.class),ISP_CONFIRM_COMMAND(IspConfirmCommandConsumer.TOPIC, IspConfirmCommandConsumer.class),ORDER_NOTIFY_RESULT(OrderSyncNotifyResultConsumer.TOPIC, OrderSyncNotifyResultConsumer.class),ORDER_SPLIT(OrderSplitOrderResultConsumer.TOPIC, OrderSplitOrderResultConsumer.class),DELIVERY_PACK(OrderDeliveryPackConsumer.TOPIC, OrderDeliveryPackConsumer.class),ORDER_PACK(OrderPackConsumer.TOPIC, OrderPackConsumer.class),ORDER_EXCEPTION_BACK(OrderExceptionBackConsumer.TOPIC, OrderExceptionBackConsumer.class),ORDER_INTERCEPT_BACK(InterceptOrderCallBackConsumer.TOPIC, InterceptOrderCallBackConsumer.class),ORDER_UPDATE_WPH_BACK(WphOrderArrivalTimeConsumer.TOPIC, WphOrderArrivalTimeConsumer.class),ORDER_UPDATE_JITX_AMOUNT_BACK(OrderUpdateJitxAmountConsumer.TOPIC, OrderUpdateJitxAmountConsumer.class),ORDER_UPDATE_EXPRESSS(OrderUpdateExpressConsumer.TOPIC, OrderUpdateExpressConsumer.class),JIT_PUSH_CALLBACK(JitPushResultCallbackConsumer.TOPIC, JitPushResultCallbackConsumer.class),FOR_WAREHOUSE_BACK(ForWarehouseBackConsumer.TOPIC, ForWarehouseBackConsumer.class),// 电商组(与电商系统交互)ORDER_LOG(OrderLogConsumer.TOPIC, OrderLogConsumer.class),ORDER_FLOW_LOG(OrderLogFlowConsumer.TOPIC, OrderLogFlowConsumer.class),ORDER_REMARK(OrderRemarkConsumer.TOPIC, OrderRemarkConsumer.class),SMS_TASK(SmsTaskConsumer.TOPIC, SmsTaskConsumer.class),CREATE_NOTIFY(CreateOrderNotifyConsumer.TOPIC, CreateOrderNotifyConsumer.class),INVOICE_CREATE(InvoiceCreateConsumer.TOPIC, InvoiceCreateConsumer.class),CREATE_EXCEPTION(CreateExceptionConsumer.TOPIC, CreateExceptionConsumer.class),UPDATE_JIT_AMOUNT(UpdateJitAmountConsumer.TOPIC, UpdateJitAmountConsumer.class),SALE_ABNORMAL_TASK(SaleAbnormalTaskConsumer.TOPIC, SaleAbnormalTaskConsumer.class),ORDER_PREEMPTION(OrderPreemptionConsumer.TOPIC, OrderPreemptionConsumer.class),FMS_REFUND_NOTIFY(FmsRefundNotifyConsumer.TOPIC, FmsRefundNotifyConsumer.class),FMS_INVOICE_DELIVERED(FmsInvoiceDeliveredConsumer.TOPIC, FmsInvoiceDeliveredConsumer.class),WMS_QUALITY(WmsQualityConsumer.TOPIC, WmsQualityConsumer.class),PRODUCT_PREEMPTION(ProductPreemptionConsumer.TOPIC, ProductPreemptionConsumer.class),PREEMPTION_SPLIT_ORDER(PreemptionSplitOrderConsumer.TOPIC, PreemptionSplitOrderConsumer.class),FMS_CONFIRM_COMMAND(FmsConfirmCommandConsumer.TOPIC, FmsConfirmCommandConsumer.class),FOR_PUSH_AMOUNT(ForPushAmountConsumer.TOPIC, ForPushAmountConsumer.class),OCCUPIED_LIS(OccupiedLisConsumer.TOPIC, OccupiedLisConsumer.class),ORDER_TRACK_INFO_SHENCE(OrderTrackInfoShenceConsumer.TOPIC, OrderTrackInfoShenceConsumer.class),LMP_STOCK_INCREASE(LmpStockIncreaseConsumer.TOPIC, LmpStockIncreaseConsumer.class),MERGED_ORDER(MergedOrderConsumer.TOPIC, MergedOrderConsumer.class);/*** kafka集群的topic*/private final String topic;/*** 处理类bean*/private final Class dealBean;ConsumerEnum(String topic, Class dealBean) {this.topic = topic;this.dealBean = dealBean;}public String getTopic() {return topic;}public Class getDealBean() {return dealBean;}}
topic跟具体处理类,通过枚举绑定关系,然后保存到hashmap,使用SpringContextHolder获取bean,再从hashMap获取对应bean;
package com.yougou.ordercenter.web.listener.kafka.base;import com.yougou.framework.logger.Logger;import com.yougou.framework.logger.LoggerFactory;import com.yougou.tools.common.utils.SpringContextHolder;import org.springframework.stereotype.Service;import java.util.HashMap;import java.util.Map;/*** 静态工厂模式** @author david* @site www.belle.net.cn* @company 百丽国际* @Date 创建时间:2021-04-13 21:36*/@Servicepublic class TopicConsumerFactory {private static final Logger logger = LoggerFactory.getLogger(TopicConsumerFactory.class);// 把topic对应的具体处理类,关系映射private static final Map<String, Class> dealMap = new HashMap<>(ConsumerEnum.values().length);static {try {for (ConsumerEnum type : ConsumerEnum.values()) {dealMap.put(type.getTopic(), type.getDealBean());}} catch (Exception e) {logger.error("实例化ConsumerFactory工厂类异常" + e.getMessage(), e);}}/*** 通过topic获取具体实现类** @param topic* @return*/public AbstractConsumerBase getConsumer(String topic) {return (AbstractConsumerBase) SpringContextHolder.getBean(dealMap.get(topic));}}
topic相关业务配置,聚合到一起,通过枚举
package com.yougou.ordercenter.enums.mq;import com.yougou.ordercenter.common.OrderOptTypeEnum;import com.yougou.ordercenter.enums.FullBizCodeEnum;/*** 生产者枚举类* 列出当前系统,所有生产端的topic列表,包括电商集群和公共集群** @author david* @site www.belle.net.cn* @company 百丽国际* @Date 创建时间:2021-04-13 21:36*/public enum ProducerEnum {// 零售组(与零售交互场景)ORDER_TO_ISP_NOTIFY_OUT_STORE(ChannelEnum.PUB, "order_to_isp_notify_out_store", "通知发货消息", OrderOptTypeEnum.DELIVERY, "/ios/order/online/pushOrder"),// 零售组(与零售握手场景)ORDER_TO_IOS_SYNC_DISPATCH_INFO_ACK(ChannelEnum.PUB, "order_to_ios_sync_dispatch_info_ack", "派单日志回传-握手", null, null),ORDER_TO_IOS_EXPRESS_UPDATE_ACK(ChannelEnum.PUB, "order_to_ios_express_update_ack", "更新物流公司-握手", null, null),ORDER_TO_IOS_EXCEPTION_BACK_ACK(ChannelEnum.PUB, "order_to_ios_exception_back_ack", "异常取消订单回写接口(缺货打回)-握手", null, null),ORDER_TO_IOS_FOR_WAREHOUSE_BACK_ACK(ChannelEnum.PUB, "order_to_ios_for_warehouse_back_ack", "jitx寻仓结果回传-握手", null, null),ORDER_TO_IOS_SPLIT_ORDER_RESULT_ACK(ChannelEnum.PUB, "order_to_ios_split_order_result_ack", "isp派单拆单接口(支持拆行)-握手", null, null),// 电商内部组(财务系统)DS_ORDER_TO_FMS_CONFIRM_COMMAND(ChannelEnum.DS, "ds_order_to_fms_confirm_command", FullBizCodeEnum.MH1004.getBizName(), null, null);/*** 集群渠道*/private final ChannelEnum channelEnum;/*** topic名称*/private final String topic;/*** topic对应的业务说明*/private final String bizName;/*** 生产者消息表的biz_type*/private final OrderOptTypeEnum bizType;/*** 灾备http接口*/private final String url;ProducerEnum(ChannelEnum channelEnum, String topic, String bizName, OrderOptTypeEnum bizType, String url) {this.channelEnum = channelEnum;this.topic = topic;this.bizName = bizName;this.bizType = bizType;this.url = url;}public ChannelEnum getChannelEnum() {return channelEnum;}public String getTopic() {return topic;}public String getBizName() {return bizName;}public OrderOptTypeEnum getBizType() {return bizType;}public String getUrl() {return url;}}
