package com.yougou.ordercenter.web.listener.kafka.base;
import com.yougou.ordercenter.web.listener.kafka.topic.ds.*;
import com.yougou.ordercenter.web.listener.kafka.topic.retail.*;
/**
* 把零售的,电商的所有消费者topic都列出来,配置好映射关系
*
* @author david
* @site www.belle.net.cn
* @company 百丽国际
* @Date 创建时间:2021-04-13 21:36
*/
public enum ConsumerEnum {
// 零售组(与零售系统交互)
DISPATCH_INFO(OrderSyncDispatchInfoConsumer.TOPIC, OrderSyncDispatchInfoConsumer.class),
ISP_CONFIRM_COMMAND(IspConfirmCommandConsumer.TOPIC, IspConfirmCommandConsumer.class),
ORDER_NOTIFY_RESULT(OrderSyncNotifyResultConsumer.TOPIC, OrderSyncNotifyResultConsumer.class),
ORDER_SPLIT(OrderSplitOrderResultConsumer.TOPIC, OrderSplitOrderResultConsumer.class),
DELIVERY_PACK(OrderDeliveryPackConsumer.TOPIC, OrderDeliveryPackConsumer.class),
ORDER_PACK(OrderPackConsumer.TOPIC, OrderPackConsumer.class),
ORDER_EXCEPTION_BACK(OrderExceptionBackConsumer.TOPIC, OrderExceptionBackConsumer.class),
ORDER_INTERCEPT_BACK(InterceptOrderCallBackConsumer.TOPIC, InterceptOrderCallBackConsumer.class),
ORDER_UPDATE_WPH_BACK(WphOrderArrivalTimeConsumer.TOPIC, WphOrderArrivalTimeConsumer.class),
ORDER_UPDATE_JITX_AMOUNT_BACK(OrderUpdateJitxAmountConsumer.TOPIC, OrderUpdateJitxAmountConsumer.class),
ORDER_UPDATE_EXPRESSS(OrderUpdateExpressConsumer.TOPIC, OrderUpdateExpressConsumer.class),
JIT_PUSH_CALLBACK(JitPushResultCallbackConsumer.TOPIC, JitPushResultCallbackConsumer.class),
FOR_WAREHOUSE_BACK(ForWarehouseBackConsumer.TOPIC, ForWarehouseBackConsumer.class),
// 电商组(与电商系统交互)
ORDER_LOG(OrderLogConsumer.TOPIC, OrderLogConsumer.class),
ORDER_FLOW_LOG(OrderLogFlowConsumer.TOPIC, OrderLogFlowConsumer.class),
ORDER_REMARK(OrderRemarkConsumer.TOPIC, OrderRemarkConsumer.class),
SMS_TASK(SmsTaskConsumer.TOPIC, SmsTaskConsumer.class),
CREATE_NOTIFY(CreateOrderNotifyConsumer.TOPIC, CreateOrderNotifyConsumer.class),
INVOICE_CREATE(InvoiceCreateConsumer.TOPIC, InvoiceCreateConsumer.class),
CREATE_EXCEPTION(CreateExceptionConsumer.TOPIC, CreateExceptionConsumer.class),
UPDATE_JIT_AMOUNT(UpdateJitAmountConsumer.TOPIC, UpdateJitAmountConsumer.class),
SALE_ABNORMAL_TASK(SaleAbnormalTaskConsumer.TOPIC, SaleAbnormalTaskConsumer.class),
ORDER_PREEMPTION(OrderPreemptionConsumer.TOPIC, OrderPreemptionConsumer.class),
FMS_REFUND_NOTIFY(FmsRefundNotifyConsumer.TOPIC, FmsRefundNotifyConsumer.class),
FMS_INVOICE_DELIVERED(FmsInvoiceDeliveredConsumer.TOPIC, FmsInvoiceDeliveredConsumer.class),
WMS_QUALITY(WmsQualityConsumer.TOPIC, WmsQualityConsumer.class),
PRODUCT_PREEMPTION(ProductPreemptionConsumer.TOPIC, ProductPreemptionConsumer.class),
PREEMPTION_SPLIT_ORDER(PreemptionSplitOrderConsumer.TOPIC, PreemptionSplitOrderConsumer.class),
FMS_CONFIRM_COMMAND(FmsConfirmCommandConsumer.TOPIC, FmsConfirmCommandConsumer.class),
FOR_PUSH_AMOUNT(ForPushAmountConsumer.TOPIC, ForPushAmountConsumer.class),
OCCUPIED_LIS(OccupiedLisConsumer.TOPIC, OccupiedLisConsumer.class),
ORDER_TRACK_INFO_SHENCE(OrderTrackInfoShenceConsumer.TOPIC, OrderTrackInfoShenceConsumer.class),
LMP_STOCK_INCREASE(LmpStockIncreaseConsumer.TOPIC, LmpStockIncreaseConsumer.class),
MERGED_ORDER(MergedOrderConsumer.TOPIC, MergedOrderConsumer.class);
/**
* kafka集群的topic
*/
private final String topic;
/**
* 处理类bean
*/
private final Class dealBean;
ConsumerEnum(String topic, Class dealBean) {
this.topic = topic;
this.dealBean = dealBean;
}
public String getTopic() {
return topic;
}
public Class getDealBean() {
return dealBean;
}
}
topic跟具体处理类,通过枚举绑定关系,然后保存到hashmap,使用SpringContextHolder获取bean,再从hashMap获取对应bean;
package com.yougou.ordercenter.web.listener.kafka.base;
import com.yougou.framework.logger.Logger;
import com.yougou.framework.logger.LoggerFactory;
import com.yougou.tools.common.utils.SpringContextHolder;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* 静态工厂模式
*
* @author david
* @site www.belle.net.cn
* @company 百丽国际
* @Date 创建时间:2021-04-13 21:36
*/
@Service
public class TopicConsumerFactory {
private static final Logger logger = LoggerFactory.getLogger(TopicConsumerFactory.class);
// 把topic对应的具体处理类,关系映射
private static final Map<String, Class> dealMap = new HashMap<>(ConsumerEnum.values().length);
static {
try {
for (ConsumerEnum type : ConsumerEnum.values()) {
dealMap.put(type.getTopic(), type.getDealBean());
}
} catch (Exception e) {
logger.error("实例化ConsumerFactory工厂类异常" + e.getMessage(), e);
}
}
/**
* 通过topic获取具体实现类
*
* @param topic
* @return
*/
public AbstractConsumerBase getConsumer(String topic) {
return (AbstractConsumerBase) SpringContextHolder.getBean(dealMap.get(topic));
}
}
topic相关业务配置,聚合到一起,通过枚举
package com.yougou.ordercenter.enums.mq;
import com.yougou.ordercenter.common.OrderOptTypeEnum;
import com.yougou.ordercenter.enums.FullBizCodeEnum;
/**
* 生产者枚举类
* 列出当前系统,所有生产端的topic列表,包括电商集群和公共集群
*
* @author david
* @site www.belle.net.cn
* @company 百丽国际
* @Date 创建时间:2021-04-13 21:36
*/
public enum ProducerEnum {
// 零售组(与零售交互场景)
ORDER_TO_ISP_NOTIFY_OUT_STORE(ChannelEnum.PUB, "order_to_isp_notify_out_store", "通知发货消息", OrderOptTypeEnum.DELIVERY, "/ios/order/online/pushOrder"),
// 零售组(与零售握手场景)
ORDER_TO_IOS_SYNC_DISPATCH_INFO_ACK(ChannelEnum.PUB, "order_to_ios_sync_dispatch_info_ack", "派单日志回传-握手", null, null),
ORDER_TO_IOS_EXPRESS_UPDATE_ACK(ChannelEnum.PUB, "order_to_ios_express_update_ack", "更新物流公司-握手", null, null),
ORDER_TO_IOS_EXCEPTION_BACK_ACK(ChannelEnum.PUB, "order_to_ios_exception_back_ack", "异常取消订单回写接口(缺货打回)-握手", null, null),
ORDER_TO_IOS_FOR_WAREHOUSE_BACK_ACK(ChannelEnum.PUB, "order_to_ios_for_warehouse_back_ack", "jitx寻仓结果回传-握手", null, null),
ORDER_TO_IOS_SPLIT_ORDER_RESULT_ACK(ChannelEnum.PUB, "order_to_ios_split_order_result_ack", "isp派单拆单接口(支持拆行)-握手", null, null),
// 电商内部组(财务系统)
DS_ORDER_TO_FMS_CONFIRM_COMMAND(ChannelEnum.DS, "ds_order_to_fms_confirm_command", FullBizCodeEnum.MH1004.getBizName(), null, null);
/**
* 集群渠道
*/
private final ChannelEnum channelEnum;
/**
* topic名称
*/
private final String topic;
/**
* topic对应的业务说明
*/
private final String bizName;
/**
* 生产者消息表的biz_type
*/
private final OrderOptTypeEnum bizType;
/**
* 灾备http接口
*/
private final String url;
ProducerEnum(ChannelEnum channelEnum, String topic, String bizName, OrderOptTypeEnum bizType, String url) {
this.channelEnum = channelEnum;
this.topic = topic;
this.bizName = bizName;
this.bizType = bizType;
this.url = url;
}
public ChannelEnum getChannelEnum() {
return channelEnum;
}
public String getTopic() {
return topic;
}
public String getBizName() {
return bizName;
}
public OrderOptTypeEnum getBizType() {
return bizType;
}
public String getUrl() {
return url;
}
}