import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RocketMqClient {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送消息
* @param destination topic:tags
* @param msg
*/
public void syncSend(String destination, Object msg) {
rocketMQTemplate.send(destination, MessageBuilder.withPayload(msg).build());
}
/**
* 同步发送消息
* @param destination topic:tags
* @param msg
* @Returen SendResult 返回结果
*/
public SendResult send(String destination, Object msg) {
return rocketMQTemplate.syncSend(destination, msg);
}
/**
* 发送延迟短信
* @param destination
* @param msg
* @param delayLevel messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* @return
*/
public SendResult syncSendDelay(String destination, Object msg, int delayLevel) {
return rocketMQTemplate.syncSend(destination,MessageBuilder.withPayload(msg).build(),3000,delayLevel);
}
/**
* 发送顺序消息
* @param destination
* @param payload
* @param hashKey
* @return
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
return rocketMQTemplate.syncSendOrderly(destination,payload,hashKey);
}
/**
* 只发送一次不关心结果
* @param destination
* @param payload
*/
public void sendOneWay(String destination, Object payload) {
rocketMQTemplate.sendOneWay(destination, payload);
}
/**
* 发送事务消息
* @param destination
* @param payload
* @param arg
* @return
*/
public TransactionSendResult sendMessageInTransaction(String destination, Object payload, Object arg) {
return rocketMQTemplate.sendMessageInTransaction(destination,MessageBuilder.withPayload(payload).build(),arg);
}
}
import javax.annotation.Nullable;
import javax.validation.constraints.NotBlank;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
@Getter
@Setter
@Slf4j
public class MessageTO implements Serializable {
private static final long serialVersionUID = -1923863198201966526L;
/**
* 业务主键
*/
@NotBlank
private String businessKey;
/**
* 用户Id
*/
@NotBlank
private String userId;
/**
* 数据
*/
@Nullable
private Map<String, Object> data;
public MessageTO() {
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof MessageTO)) return false;
MessageTO messageTO = (MessageTO) o;
if (!businessKey.equals(messageTO.businessKey)) return false;
return Objects.equals(data, messageTO.data);
}
@Override
public int hashCode() {
int result = businessKey.hashCode();
result = 31 * result + (data != null ? data.hashCode() : 0);
return result;
}
@Override
public String toString() {
return new StringJoiner(", ", MessageTO.class.getSimpleName() + "[", "]")
.add("businessKey='" + businessKey + "'")
.add("data=" + data)
.add("userId=" + userId)
.toString();
}
}