1. package com.yougou.ordercenter.web.listener.kafka.base;
    2. import com.yougou.ordercenter.web.listener.kafka.topic.ds.*;
    3. import com.yougou.ordercenter.web.listener.kafka.topic.retail.*;
    4. /**
    5. * 把零售的,电商的所有消费者topic都列出来,配置好映射关系
    6. *
    7. * @author david
    8. * @site www.belle.net.cn
    9. * @company 百丽国际
    10. * @Date 创建时间:2021-04-13 21:36
    11. */
    12. public enum ConsumerEnum {
    13. // 零售组(与零售系统交互)
    14. DISPATCH_INFO(OrderSyncDispatchInfoConsumer.TOPIC, OrderSyncDispatchInfoConsumer.class),
    15. ISP_CONFIRM_COMMAND(IspConfirmCommandConsumer.TOPIC, IspConfirmCommandConsumer.class),
    16. ORDER_NOTIFY_RESULT(OrderSyncNotifyResultConsumer.TOPIC, OrderSyncNotifyResultConsumer.class),
    17. ORDER_SPLIT(OrderSplitOrderResultConsumer.TOPIC, OrderSplitOrderResultConsumer.class),
    18. DELIVERY_PACK(OrderDeliveryPackConsumer.TOPIC, OrderDeliveryPackConsumer.class),
    19. ORDER_PACK(OrderPackConsumer.TOPIC, OrderPackConsumer.class),
    20. ORDER_EXCEPTION_BACK(OrderExceptionBackConsumer.TOPIC, OrderExceptionBackConsumer.class),
    21. ORDER_INTERCEPT_BACK(InterceptOrderCallBackConsumer.TOPIC, InterceptOrderCallBackConsumer.class),
    22. ORDER_UPDATE_WPH_BACK(WphOrderArrivalTimeConsumer.TOPIC, WphOrderArrivalTimeConsumer.class),
    23. ORDER_UPDATE_JITX_AMOUNT_BACK(OrderUpdateJitxAmountConsumer.TOPIC, OrderUpdateJitxAmountConsumer.class),
    24. ORDER_UPDATE_EXPRESSS(OrderUpdateExpressConsumer.TOPIC, OrderUpdateExpressConsumer.class),
    25. JIT_PUSH_CALLBACK(JitPushResultCallbackConsumer.TOPIC, JitPushResultCallbackConsumer.class),
    26. FOR_WAREHOUSE_BACK(ForWarehouseBackConsumer.TOPIC, ForWarehouseBackConsumer.class),
    27. // 电商组(与电商系统交互)
    28. ORDER_LOG(OrderLogConsumer.TOPIC, OrderLogConsumer.class),
    29. ORDER_FLOW_LOG(OrderLogFlowConsumer.TOPIC, OrderLogFlowConsumer.class),
    30. ORDER_REMARK(OrderRemarkConsumer.TOPIC, OrderRemarkConsumer.class),
    31. SMS_TASK(SmsTaskConsumer.TOPIC, SmsTaskConsumer.class),
    32. CREATE_NOTIFY(CreateOrderNotifyConsumer.TOPIC, CreateOrderNotifyConsumer.class),
    33. INVOICE_CREATE(InvoiceCreateConsumer.TOPIC, InvoiceCreateConsumer.class),
    34. CREATE_EXCEPTION(CreateExceptionConsumer.TOPIC, CreateExceptionConsumer.class),
    35. UPDATE_JIT_AMOUNT(UpdateJitAmountConsumer.TOPIC, UpdateJitAmountConsumer.class),
    36. SALE_ABNORMAL_TASK(SaleAbnormalTaskConsumer.TOPIC, SaleAbnormalTaskConsumer.class),
    37. ORDER_PREEMPTION(OrderPreemptionConsumer.TOPIC, OrderPreemptionConsumer.class),
    38. FMS_REFUND_NOTIFY(FmsRefundNotifyConsumer.TOPIC, FmsRefundNotifyConsumer.class),
    39. FMS_INVOICE_DELIVERED(FmsInvoiceDeliveredConsumer.TOPIC, FmsInvoiceDeliveredConsumer.class),
    40. WMS_QUALITY(WmsQualityConsumer.TOPIC, WmsQualityConsumer.class),
    41. PRODUCT_PREEMPTION(ProductPreemptionConsumer.TOPIC, ProductPreemptionConsumer.class),
    42. PREEMPTION_SPLIT_ORDER(PreemptionSplitOrderConsumer.TOPIC, PreemptionSplitOrderConsumer.class),
    43. FMS_CONFIRM_COMMAND(FmsConfirmCommandConsumer.TOPIC, FmsConfirmCommandConsumer.class),
    44. FOR_PUSH_AMOUNT(ForPushAmountConsumer.TOPIC, ForPushAmountConsumer.class),
    45. OCCUPIED_LIS(OccupiedLisConsumer.TOPIC, OccupiedLisConsumer.class),
    46. ORDER_TRACK_INFO_SHENCE(OrderTrackInfoShenceConsumer.TOPIC, OrderTrackInfoShenceConsumer.class),
    47. LMP_STOCK_INCREASE(LmpStockIncreaseConsumer.TOPIC, LmpStockIncreaseConsumer.class),
    48. MERGED_ORDER(MergedOrderConsumer.TOPIC, MergedOrderConsumer.class);
    49. /**
    50. * kafka集群的topic
    51. */
    52. private final String topic;
    53. /**
    54. * 处理类bean
    55. */
    56. private final Class dealBean;
    57. ConsumerEnum(String topic, Class dealBean) {
    58. this.topic = topic;
    59. this.dealBean = dealBean;
    60. }
    61. public String getTopic() {
    62. return topic;
    63. }
    64. public Class getDealBean() {
    65. return dealBean;
    66. }
    67. }

    topic跟具体处理类,通过枚举绑定关系,然后保存到hashmap,使用SpringContextHolder获取bean,再从hashMap获取对应bean;

    1. package com.yougou.ordercenter.web.listener.kafka.base;
    2. import com.yougou.framework.logger.Logger;
    3. import com.yougou.framework.logger.LoggerFactory;
    4. import com.yougou.tools.common.utils.SpringContextHolder;
    5. import org.springframework.stereotype.Service;
    6. import java.util.HashMap;
    7. import java.util.Map;
    8. /**
    9. * 静态工厂模式
    10. *
    11. * @author david
    12. * @site www.belle.net.cn
    13. * @company 百丽国际
    14. * @Date 创建时间:2021-04-13 21:36
    15. */
    16. @Service
    17. public class TopicConsumerFactory {
    18. private static final Logger logger = LoggerFactory.getLogger(TopicConsumerFactory.class);
    19. // 把topic对应的具体处理类,关系映射
    20. private static final Map<String, Class> dealMap = new HashMap<>(ConsumerEnum.values().length);
    21. static {
    22. try {
    23. for (ConsumerEnum type : ConsumerEnum.values()) {
    24. dealMap.put(type.getTopic(), type.getDealBean());
    25. }
    26. } catch (Exception e) {
    27. logger.error("实例化ConsumerFactory工厂类异常" + e.getMessage(), e);
    28. }
    29. }
    30. /**
    31. * 通过topic获取具体实现类
    32. *
    33. * @param topic
    34. * @return
    35. */
    36. public AbstractConsumerBase getConsumer(String topic) {
    37. return (AbstractConsumerBase) SpringContextHolder.getBean(dealMap.get(topic));
    38. }
    39. }

    topic相关业务配置,聚合到一起,通过枚举

    1. package com.yougou.ordercenter.enums.mq;
    2. import com.yougou.ordercenter.common.OrderOptTypeEnum;
    3. import com.yougou.ordercenter.enums.FullBizCodeEnum;
    4. /**
    5. * 生产者枚举类
    6. * 列出当前系统,所有生产端的topic列表,包括电商集群和公共集群
    7. *
    8. * @author david
    9. * @site www.belle.net.cn
    10. * @company 百丽国际
    11. * @Date 创建时间:2021-04-13 21:36
    12. */
    13. public enum ProducerEnum {
    14. // 零售组(与零售交互场景)
    15. ORDER_TO_ISP_NOTIFY_OUT_STORE(ChannelEnum.PUB, "order_to_isp_notify_out_store", "通知发货消息", OrderOptTypeEnum.DELIVERY, "/ios/order/online/pushOrder"),
    16. // 零售组(与零售握手场景)
    17. ORDER_TO_IOS_SYNC_DISPATCH_INFO_ACK(ChannelEnum.PUB, "order_to_ios_sync_dispatch_info_ack", "派单日志回传-握手", null, null),
    18. ORDER_TO_IOS_EXPRESS_UPDATE_ACK(ChannelEnum.PUB, "order_to_ios_express_update_ack", "更新物流公司-握手", null, null),
    19. ORDER_TO_IOS_EXCEPTION_BACK_ACK(ChannelEnum.PUB, "order_to_ios_exception_back_ack", "异常取消订单回写接口(缺货打回)-握手", null, null),
    20. ORDER_TO_IOS_FOR_WAREHOUSE_BACK_ACK(ChannelEnum.PUB, "order_to_ios_for_warehouse_back_ack", "jitx寻仓结果回传-握手", null, null),
    21. ORDER_TO_IOS_SPLIT_ORDER_RESULT_ACK(ChannelEnum.PUB, "order_to_ios_split_order_result_ack", "isp派单拆单接口(支持拆行)-握手", null, null),
    22. // 电商内部组(财务系统)
    23. DS_ORDER_TO_FMS_CONFIRM_COMMAND(ChannelEnum.DS, "ds_order_to_fms_confirm_command", FullBizCodeEnum.MH1004.getBizName(), null, null);
    24. /**
    25. * 集群渠道
    26. */
    27. private final ChannelEnum channelEnum;
    28. /**
    29. * topic名称
    30. */
    31. private final String topic;
    32. /**
    33. * topic对应的业务说明
    34. */
    35. private final String bizName;
    36. /**
    37. * 生产者消息表的biz_type
    38. */
    39. private final OrderOptTypeEnum bizType;
    40. /**
    41. * 灾备http接口
    42. */
    43. private final String url;
    44. ProducerEnum(ChannelEnum channelEnum, String topic, String bizName, OrderOptTypeEnum bizType, String url) {
    45. this.channelEnum = channelEnum;
    46. this.topic = topic;
    47. this.bizName = bizName;
    48. this.bizType = bizType;
    49. this.url = url;
    50. }
    51. public ChannelEnum getChannelEnum() {
    52. return channelEnum;
    53. }
    54. public String getTopic() {
    55. return topic;
    56. }
    57. public String getBizName() {
    58. return bizName;
    59. }
    60. public OrderOptTypeEnum getBizType() {
    61. return bizType;
    62. }
    63. public String getUrl() {
    64. return url;
    65. }
    66. }