应用场景
应用程序集成了多个kafka集群,需根据topic来发送具体的集群;可以抽象为kafka通道,公共kafka通道,http通道(因http是灾备才用);面对此场景,通过策略模式实现,示例代码如下:
通用接口
package com.yougou.order.kafka.channel;
import com.yougou.order.kafka.model.ProducerModel;
import org.apache.poi.hssf.record.formula.functions.T;
/**
* 生产者模板-抽象接口
*
* @author david
* @site www.belle.net.cn
* @company 百丽国际
* @Date 创建时间:2021-04-13 21:36
*/
public interface IProducerChannel {
/**
* 抽象接口,发送数据
*
* @param dto
* @return
* @throws Exception
*/
boolean sendData(ProducerModel dto) throws Exception;
}
package com.yougou.order.kafka.channel;
import co.elastic.logging.ComplexLogGenerator;
import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;
import com.yougou.framework.logger.Logger;
import com.yougou.framework.logger.LoggerFactory;
import com.yougou.order.kafka.DsKafkaProducerServer;
import com.yougou.order.kafka.KafkaMesConstant;
import com.yougou.order.kafka.dto.KafkaDto;
import com.yougou.order.kafka.model.ProducerModel;
import com.yougou.tools.common.utils.UUIDGenerator;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.hssf.record.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
/**
* 公共kafka集群-生产者模板
*
* @author david
* @site www.belle.net.cn
* @company 百丽国际
* @Date 创建时间:2021-04-13 21:36
*/
@Service
public class PubKafkaChannel implements IProducerChannel {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
@Qualifier("pubKafkaTemplate")
private KafkaTemplate<String, String> pubKafkaTemplate;
@Override
public boolean sendData(ProducerModel dto) {
try {
if (StringUtils.isBlank(dto.getProducerEnum().getTopic())) {
logger.error(dto.getProducerEnum().getBizName() + "未配置topic");
return false;
}
String msgId = UUIDGenerator.getUUID();
KafkaDto kafkaDto = new KafkaDto();
kafkaDto.setData(dto);
kafkaDto.setMsgId(msgId);
String reqJsonStr = JSON.toJSONString(kafkaDto);
// 调用底层kafka接口
logger.info("发送[pub]" + dto.getProducerEnum().getTopic() + "数据:" + reqJsonStr);
pubKafkaTemplate.send(dto.getProducerEnum().getTopic(), reqJsonStr);
return true;
} catch (Exception e) {
logger.error(dto.getProducerEnum().getBizName() + "发送异常{}", e.getMessage());
return false;
}
}
}
package com.yougou.order.kafka.channel;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yougou.framework.logger.Logger;
import com.yougou.framework.logger.LoggerFactory;
import com.yougou.order.kafka.dto.KafkaDto;
import com.yougou.order.kafka.model.ProducerModel;
import com.yougou.ordercenter.common.PropertiesUtil;
import com.yougou.ordercenter.utils.HttpClient;
import com.yougou.tools.common.utils.UUIDGenerator;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.hssf.record.formula.functions.T;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* http-生产者模板
*
* @author david
* @site www.belle.net.cn
* @company 百丽国际
* @Date 创建时间:2021-04-13 21:36
*/
@Service
public class HttpChannel implements IProducerChannel {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean sendData(ProducerModel dto) {
try {
if (StringUtils.isBlank(dto.getProducerEnum().getUrl())) {
logger.error(dto.getProducerEnum().getBizName() + "未配置url");
return false;
}
String fullUrl = PropertiesUtil.getInstance().getIspHost() + dto.getProducerEnum().getUrl();
String msgId = UUIDGenerator.getUUID();
KafkaDto kafkaDto = new KafkaDto();
kafkaDto.setData(dto);
kafkaDto.setMsgId(msgId);
String reqJsonStr = JSON.toJSONString(kafkaDto);
// 调用底层kafka接口
logger.info("发送[http]" + fullUrl + "数据:" + reqJsonStr);
return sendByHttp(fullUrl, reqJsonStr);
} catch (Exception e) {
logger.error(dto.getProducerEnum().getBizName() + "发送异常{}", e.getMessage());
return false;
}
}
/**
* 发送isp底层http接口
*
* @param url
* @param httpJson
* @return
* @throws Exception
*/
public boolean sendByHttp(String url, String httpJson) throws Exception {
// 请求报文组包
Map<String, String> reqParamMap = new HashMap<String, String>();
// 组装发送到isp的报文
reqParamMap.put("appKey", PropertiesUtil.getInstance().getIspKey());
reqParamMap.put("orderDetail", httpJson);
HttpClient httpClient = new HttpClient(url, 30000, 30000, true);
String rev = httpClient.send(reqParamMap);
JSONObject resultObj = JSON.parseObject(rev);
if (resultObj != null && org.apache.commons.lang.StringUtils.equals(resultObj.getString("code"), "200")) {
return true;
} else {
return false;
}
}
}
具体调用方,声明用接口,然后用接口的子类来实现;