应用场景
应用程序集成了多个kafka集群,需根据topic来发送具体的集群;可以抽象为kafka通道,公共kafka通道,http通道(因http是灾备才用);面对此场景,通过策略模式实现,示例代码如下:
通用接口
package com.yougou.order.kafka.channel;import com.yougou.order.kafka.model.ProducerModel;import org.apache.poi.hssf.record.formula.functions.T;/*** 生产者模板-抽象接口** @author david* @site www.belle.net.cn* @company 百丽国际* @Date 创建时间:2021-04-13 21:36*/public interface IProducerChannel {/*** 抽象接口,发送数据** @param dto* @return* @throws Exception*/boolean sendData(ProducerModel dto) throws Exception;}
package com.yougou.order.kafka.channel;import co.elastic.logging.ComplexLogGenerator;import com.alibaba.fastjson.JSON;import com.google.gson.Gson;import com.yougou.framework.logger.Logger;import com.yougou.framework.logger.LoggerFactory;import com.yougou.order.kafka.DsKafkaProducerServer;import com.yougou.order.kafka.KafkaMesConstant;import com.yougou.order.kafka.dto.KafkaDto;import com.yougou.order.kafka.model.ProducerModel;import com.yougou.tools.common.utils.UUIDGenerator;import org.apache.commons.lang3.StringUtils;import org.apache.poi.hssf.record.formula.functions.T;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture;import java.util.HashMap;import java.util.Map;/*** 公共kafka集群-生产者模板** @author david* @site www.belle.net.cn* @company 百丽国际* @Date 创建时间:2021-04-13 21:36*/@Servicepublic class PubKafkaChannel implements IProducerChannel {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowired@Qualifier("pubKafkaTemplate")private KafkaTemplate<String, String> pubKafkaTemplate;@Overridepublic boolean sendData(ProducerModel dto) {try {if (StringUtils.isBlank(dto.getProducerEnum().getTopic())) {logger.error(dto.getProducerEnum().getBizName() + "未配置topic");return false;}String msgId = UUIDGenerator.getUUID();KafkaDto kafkaDto = new KafkaDto();kafkaDto.setData(dto);kafkaDto.setMsgId(msgId);String reqJsonStr = JSON.toJSONString(kafkaDto);// 调用底层kafka接口logger.info("发送[pub]" + dto.getProducerEnum().getTopic() + "数据:" + reqJsonStr);pubKafkaTemplate.send(dto.getProducerEnum().getTopic(), reqJsonStr);return true;} catch (Exception e) {logger.error(dto.getProducerEnum().getBizName() + "发送异常{}", e.getMessage());return false;}}}
package com.yougou.order.kafka.channel;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.yougou.framework.logger.Logger;import com.yougou.framework.logger.LoggerFactory;import com.yougou.order.kafka.dto.KafkaDto;import com.yougou.order.kafka.model.ProducerModel;import com.yougou.ordercenter.common.PropertiesUtil;import com.yougou.ordercenter.utils.HttpClient;import com.yougou.tools.common.utils.UUIDGenerator;import org.apache.commons.lang3.StringUtils;import org.apache.poi.hssf.record.formula.functions.T;import org.springframework.stereotype.Service;import java.util.HashMap;import java.util.Map;/*** http-生产者模板** @author david* @site www.belle.net.cn* @company 百丽国际* @Date 创建时间:2021-04-13 21:36*/@Servicepublic class HttpChannel implements IProducerChannel {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean sendData(ProducerModel dto) {try {if (StringUtils.isBlank(dto.getProducerEnum().getUrl())) {logger.error(dto.getProducerEnum().getBizName() + "未配置url");return false;}String fullUrl = PropertiesUtil.getInstance().getIspHost() + dto.getProducerEnum().getUrl();String msgId = UUIDGenerator.getUUID();KafkaDto kafkaDto = new KafkaDto();kafkaDto.setData(dto);kafkaDto.setMsgId(msgId);String reqJsonStr = JSON.toJSONString(kafkaDto);// 调用底层kafka接口logger.info("发送[http]" + fullUrl + "数据:" + reqJsonStr);return sendByHttp(fullUrl, reqJsonStr);} catch (Exception e) {logger.error(dto.getProducerEnum().getBizName() + "发送异常{}", e.getMessage());return false;}}/*** 发送isp底层http接口** @param url* @param httpJson* @return* @throws Exception*/public boolean sendByHttp(String url, String httpJson) throws Exception {// 请求报文组包Map<String, String> reqParamMap = new HashMap<String, String>();// 组装发送到isp的报文reqParamMap.put("appKey", PropertiesUtil.getInstance().getIspKey());reqParamMap.put("orderDetail", httpJson);HttpClient httpClient = new HttpClient(url, 30000, 30000, true);String rev = httpClient.send(reqParamMap);JSONObject resultObj = JSON.parseObject(rev);if (resultObj != null && org.apache.commons.lang.StringUtils.equals(resultObj.getString("code"), "200")) {return true;} else {return false;}}}
具体调用方,声明用接口,然后用接口的子类来实现;
