1. import org.apache.rocketmq.client.producer.SendResult;
    2. import org.apache.rocketmq.client.producer.TransactionSendResult;
    3. import org.apache.rocketmq.spring.core.RocketMQTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.messaging.support.MessageBuilder;
    6. import org.springframework.stereotype.Component;
    7. @Component
    8. public class RocketMqClient {
    9. @Autowired
    10. private RocketMQTemplate rocketMQTemplate;
    11. /**
    12. * 同步发送消息
    13. * @param destination topic:tags
    14. * @param msg
    15. */
    16. public void syncSend(String destination, Object msg) {
    17. rocketMQTemplate.send(destination, MessageBuilder.withPayload(msg).build());
    18. }
    19. /**
    20. * 同步发送消息
    21. * @param destination topic:tags
    22. * @param msg
    23. * @Returen SendResult 返回结果
    24. */
    25. public SendResult send(String destination, Object msg) {
    26. return rocketMQTemplate.syncSend(destination, msg);
    27. }
    28. /**
    29. * 发送延迟短信
    30. * @param destination
    31. * @param msg
    32. * @param delayLevel messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    33. * @return
    34. */
    35. public SendResult syncSendDelay(String destination, Object msg, int delayLevel) {
    36. return rocketMQTemplate.syncSend(destination,MessageBuilder.withPayload(msg).build(),3000,delayLevel);
    37. }
    38. /**
    39. * 发送顺序消息
    40. * @param destination
    41. * @param payload
    42. * @param hashKey
    43. * @return
    44. */
    45. public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
    46. return rocketMQTemplate.syncSendOrderly(destination,payload,hashKey);
    47. }
    48. /**
    49. * 只发送一次不关心结果
    50. * @param destination
    51. * @param payload
    52. */
    53. public void sendOneWay(String destination, Object payload) {
    54. rocketMQTemplate.sendOneWay(destination, payload);
    55. }
    56. /**
    57. * 发送事务消息
    58. * @param destination
    59. * @param payload
    60. * @param arg
    61. * @return
    62. */
    63. public TransactionSendResult sendMessageInTransaction(String destination, Object payload, Object arg) {
    64. return rocketMQTemplate.sendMessageInTransaction(destination,MessageBuilder.withPayload(payload).build(),arg);
    65. }
    66. }
    1. import javax.annotation.Nullable;
    2. import javax.validation.constraints.NotBlank;
    3. import java.io.Serializable;
    4. import java.util.Map;
    5. import java.util.Objects;
    6. import java.util.StringJoiner;
    7. @Getter
    8. @Setter
    9. @Slf4j
    10. public class MessageTO implements Serializable {
    11. private static final long serialVersionUID = -1923863198201966526L;
    12. /**
    13. * 业务主键
    14. */
    15. @NotBlank
    16. private String businessKey;
    17. /**
    18. * 用户Id
    19. */
    20. @NotBlank
    21. private String userId;
    22. /**
    23. * 数据
    24. */
    25. @Nullable
    26. private Map<String, Object> data;
    27. public MessageTO() {
    28. }
    29. @Override
    30. public boolean equals(Object o) {
    31. if (this == o) return true;
    32. if (!(o instanceof MessageTO)) return false;
    33. MessageTO messageTO = (MessageTO) o;
    34. if (!businessKey.equals(messageTO.businessKey)) return false;
    35. return Objects.equals(data, messageTO.data);
    36. }
    37. @Override
    38. public int hashCode() {
    39. int result = businessKey.hashCode();
    40. result = 31 * result + (data != null ? data.hashCode() : 0);
    41. return result;
    42. }
    43. @Override
    44. public String toString() {
    45. return new StringJoiner(", ", MessageTO.class.getSimpleName() + "[", "]")
    46. .add("businessKey='" + businessKey + "'")
    47. .add("data=" + data)
    48. .add("userId=" + userId)
    49. .toString();
    50. }
    51. }