应用场景

应用程序集成了多个kafka集群,需根据topic来发送具体的集群;可以抽象为kafka通道,公共kafka通道,http通道(因http是灾备才用);面对此场景,通过策略模式实现,示例代码如下:

通用接口

  1. package com.yougou.order.kafka.channel;
  2. import com.yougou.order.kafka.model.ProducerModel;
  3. import org.apache.poi.hssf.record.formula.functions.T;
  4. /**
  5. * 生产者模板-抽象接口
  6. *
  7. * @author david
  8. * @site www.belle.net.cn
  9. * @company 百丽国际
  10. * @Date 创建时间:2021-04-13 21:36
  11. */
  12. public interface IProducerChannel {
  13. /**
  14. * 抽象接口,发送数据
  15. *
  16. * @param dto
  17. * @return
  18. * @throws Exception
  19. */
  20. boolean sendData(ProducerModel dto) throws Exception;
  21. }
  1. package com.yougou.order.kafka.channel;
  2. import co.elastic.logging.ComplexLogGenerator;
  3. import com.alibaba.fastjson.JSON;
  4. import com.google.gson.Gson;
  5. import com.yougou.framework.logger.Logger;
  6. import com.yougou.framework.logger.LoggerFactory;
  7. import com.yougou.order.kafka.DsKafkaProducerServer;
  8. import com.yougou.order.kafka.KafkaMesConstant;
  9. import com.yougou.order.kafka.dto.KafkaDto;
  10. import com.yougou.order.kafka.model.ProducerModel;
  11. import com.yougou.tools.common.utils.UUIDGenerator;
  12. import org.apache.commons.lang3.StringUtils;
  13. import org.apache.poi.hssf.record.formula.functions.T;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.beans.factory.annotation.Qualifier;
  16. import org.springframework.kafka.core.KafkaTemplate;
  17. import org.springframework.kafka.support.SendResult;
  18. import org.springframework.stereotype.Service;
  19. import org.springframework.util.concurrent.ListenableFuture;
  20. import java.util.HashMap;
  21. import java.util.Map;
  22. /**
  23. * 公共kafka集群-生产者模板
  24. *
  25. * @author david
  26. * @site www.belle.net.cn
  27. * @company 百丽国际
  28. * @Date 创建时间:2021-04-13 21:36
  29. */
  30. @Service
  31. public class PubKafkaChannel implements IProducerChannel {
  32. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  33. @Autowired
  34. @Qualifier("pubKafkaTemplate")
  35. private KafkaTemplate<String, String> pubKafkaTemplate;
  36. @Override
  37. public boolean sendData(ProducerModel dto) {
  38. try {
  39. if (StringUtils.isBlank(dto.getProducerEnum().getTopic())) {
  40. logger.error(dto.getProducerEnum().getBizName() + "未配置topic");
  41. return false;
  42. }
  43. String msgId = UUIDGenerator.getUUID();
  44. KafkaDto kafkaDto = new KafkaDto();
  45. kafkaDto.setData(dto);
  46. kafkaDto.setMsgId(msgId);
  47. String reqJsonStr = JSON.toJSONString(kafkaDto);
  48. // 调用底层kafka接口
  49. logger.info("发送[pub]" + dto.getProducerEnum().getTopic() + "数据:" + reqJsonStr);
  50. pubKafkaTemplate.send(dto.getProducerEnum().getTopic(), reqJsonStr);
  51. return true;
  52. } catch (Exception e) {
  53. logger.error(dto.getProducerEnum().getBizName() + "发送异常{}", e.getMessage());
  54. return false;
  55. }
  56. }
  57. }
  1. package com.yougou.order.kafka.channel;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.yougou.framework.logger.Logger;
  5. import com.yougou.framework.logger.LoggerFactory;
  6. import com.yougou.order.kafka.dto.KafkaDto;
  7. import com.yougou.order.kafka.model.ProducerModel;
  8. import com.yougou.ordercenter.common.PropertiesUtil;
  9. import com.yougou.ordercenter.utils.HttpClient;
  10. import com.yougou.tools.common.utils.UUIDGenerator;
  11. import org.apache.commons.lang3.StringUtils;
  12. import org.apache.poi.hssf.record.formula.functions.T;
  13. import org.springframework.stereotype.Service;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. /**
  17. * http-生产者模板
  18. *
  19. * @author david
  20. * @site www.belle.net.cn
  21. * @company 百丽国际
  22. * @Date 创建时间:2021-04-13 21:36
  23. */
  24. @Service
  25. public class HttpChannel implements IProducerChannel {
  26. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  27. @Override
  28. public boolean sendData(ProducerModel dto) {
  29. try {
  30. if (StringUtils.isBlank(dto.getProducerEnum().getUrl())) {
  31. logger.error(dto.getProducerEnum().getBizName() + "未配置url");
  32. return false;
  33. }
  34. String fullUrl = PropertiesUtil.getInstance().getIspHost() + dto.getProducerEnum().getUrl();
  35. String msgId = UUIDGenerator.getUUID();
  36. KafkaDto kafkaDto = new KafkaDto();
  37. kafkaDto.setData(dto);
  38. kafkaDto.setMsgId(msgId);
  39. String reqJsonStr = JSON.toJSONString(kafkaDto);
  40. // 调用底层kafka接口
  41. logger.info("发送[http]" + fullUrl + "数据:" + reqJsonStr);
  42. return sendByHttp(fullUrl, reqJsonStr);
  43. } catch (Exception e) {
  44. logger.error(dto.getProducerEnum().getBizName() + "发送异常{}", e.getMessage());
  45. return false;
  46. }
  47. }
  48. /**
  49. * 发送isp底层http接口
  50. *
  51. * @param url
  52. * @param httpJson
  53. * @return
  54. * @throws Exception
  55. */
  56. public boolean sendByHttp(String url, String httpJson) throws Exception {
  57. // 请求报文组包
  58. Map<String, String> reqParamMap = new HashMap<String, String>();
  59. // 组装发送到isp的报文
  60. reqParamMap.put("appKey", PropertiesUtil.getInstance().getIspKey());
  61. reqParamMap.put("orderDetail", httpJson);
  62. HttpClient httpClient = new HttpClient(url, 30000, 30000, true);
  63. String rev = httpClient.send(reqParamMap);
  64. JSONObject resultObj = JSON.parseObject(rev);
  65. if (resultObj != null && org.apache.commons.lang.StringUtils.equals(resultObj.getString("code"), "200")) {
  66. return true;
  67. } else {
  68. return false;
  69. }
  70. }
  71. }

具体调用方,声明用接口,然后用接口的子类来实现;