pom文件

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>ons-client</artifactId>
  4. <version>1.8.4.Final</version>
  5. </dependency>

配置文件

  1. 配置一些基础的配置信息
  1. #启动测试之前请替换如下 XXX 为您的配置
  2. rocketmq.accessKey=XXX
  3. rocketmq.secretKey=XX
  4. rocketmq.nameSrvAddr=XXX
  5. rocketmq.topic=XXX
  6. rocketmq.groupId=XXX
  7. rocketmq.tag=*
  8. rocketmq.orderTopic=XXX
  9. rocketmq.orderGroupId=XXX
  10. rocketmq.orderTag=*
  1. 创建一个配置项实体类,用于读取配置信息
  1. @Configuration
  2. @ConfigurationProperties(prefix = "rocketmq")
  3. @Date
  4. public class MqConfig {
  5. private String accessKey;
  6. private String secretKey;
  7. private String nameSrvAddr;
  8. private String topic;
  9. private String groupId;
  10. private String tag;
  11. private String orderTopic;
  12. private String orderGroupId;
  13. private String orderTag;
  14. public Properties getMqPropertie() {
  15. Properties properties = new Properties();
  16. properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
  17. properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
  18. properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
  19. return properties;
  20. }
  21. }

普通类型的生产者和消费者定义

生产者

建立连接

  1. @Configuration
  2. public class ProducerClient {
  3. @Autowired
  4. private MqConfig mqConfig;
  5. @Bean(initMethod = "start", destroyMethod = "shutdown")
  6. public ProducerBean buildProducer() {
  7. ProducerBean producer = new ProducerBean();
  8. producer.setProperties(mqConfig.getMqPropertie());
  9. return producer;
  10. }
  11. }

生产者发送消息

同步

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class SyncProducerTest {
  4. //普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
  5. @Autowired
  6. private ProducerBean producer;
  7. @Autowired
  8. private MqConfig mqConfig;
  9. @Test
  10. public void testSend() {
  11. //循环发送消息
  12. for (int i = 0; i < 100; i++) {
  13. Message msg = new Message( //
  14. // Message所属的Topic
  15. mqConfig.getTopic(),
  16. // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
  17. mqConfig.getTag(),
  18. // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
  19. // 需要Producer与Consumer协商好一致的序列化和反序列化方式
  20. "Hello MQ".getBytes());
  21. // 设置代表消息的业务关键属性,请尽可能全局唯一
  22. // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
  23. // 注意:不设置也不会影响消息正常收发
  24. msg.setKey("ORDERID_100");
  25. // 发送消息,只要不抛异常就是成功
  26. try {
  27. SendResult sendResult = producer.send(msg);
  28. assert sendResult != null;
  29. System.out.println(sendResult);
  30. } catch (ONSClientException e) {
  31. System.out.println("发送失败");
  32. //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
  33. }
  34. }
  35. }
  36. }

异步

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class AsyncProducerTest {
  4. //普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
  5. @Autowired
  6. private ProducerBean producer;
  7. @Autowired
  8. private MqConfig mqConfig;
  9. @Test
  10. public void testSend() {
  11. //对于使用异步接口,建议设置单独的回调处理线程池,拥有更灵活的配置和监控能力。
  12. //如下构造线程的方式请求队列为无界仅用作示例,有OOM的风险。
  13. //更合理的构造方式请参考阿里巴巴Java开发手册:https://github.com/alibaba/p3c
  14. producer.setCallbackExecutor(Executors.newFixedThreadPool(10));
  15. //循环发送消息
  16. for (int i = 0; i < 1; i++) {
  17. Message msg = new Message( //
  18. // Message所属的Topic
  19. mqConfig.getTopic(),
  20. // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
  21. mqConfig.getTag(),
  22. // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
  23. // 需要Producer与Consumer协商好一致的序列化和反序列化方式
  24. "Hello MQ".getBytes());
  25. // 设置代表消息的业务关键属性,请尽可能全局唯一
  26. // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
  27. // 注意:不设置也不会影响消息正常收发
  28. msg.setKey("ORDERID_100");
  29. // 发送消息,只要不抛异常就是成功
  30. try {
  31. producer.sendAsync(msg, new SendCallback() {
  32. @Override
  33. public void onSuccess(final SendResult sendResult) {
  34. assert sendResult != null;
  35. System.out.println(sendResult);
  36. }
  37. @Override
  38. public void onException(final OnExceptionContext context) {
  39. //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
  40. }
  41. });
  42. } catch (ONSClientException e) {
  43. System.out.println("发送失败");
  44. //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
  45. }
  46. }
  47. }
  48. }

sql 过滤

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class SqlProducerTest {
  4. //普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
  5. @Autowired
  6. private ProducerBean producer;
  7. @Autowired
  8. private MqConfig mqConfig;
  9. @Test
  10. public void testSend() {
  11. //循环发送消息
  12. for (int i = 0; i < 100; i++) {
  13. String tag;
  14. int div = i % 3;
  15. if (div == 0) {
  16. tag = "TagA";
  17. } else if (div == 1) {
  18. tag = "TagB";
  19. } else {
  20. tag = "TagC";
  21. }
  22. Message msg = new Message( //
  23. // Message所属的Topic
  24. mqConfig.getTopic(),
  25. // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
  26. tag,
  27. // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
  28. // 需要Producer与Consumer协商好一致的序列化和反序列化方式
  29. "Hello MQ".getBytes());
  30. // 设置代表消息的业务关键属性,请尽可能全局唯一
  31. // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
  32. // 注意:不设置也不会影响消息正常收发
  33. msg.setKey("ORDERID_100");
  34. // 设置自定义属性,该属性可用于做SQL属性过滤
  35. msg.putUserProperties("a", String.valueOf(i));
  36. // 发送消息,只要不抛异常就是成功
  37. try {
  38. SendResult sendResult = producer.send(msg);
  39. assert sendResult != null;
  40. System.out.println(sendResult);
  41. } catch (ONSClientException e) {
  42. System.out.println("发送失败");
  43. //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
  44. }
  45. }
  46. }
  47. }

消费者

监听

  1. @Component
  2. public class DemoMessageListener implements MessageListener {
  3. @Override
  4. public Action consume(Message message, ConsumeContext context) {
  5. System.out.println("Receive: " + message);
  6. try {
  7. //do something..
  8. return Action.CommitMessage;
  9. } catch (Exception e) {
  10. //消费失败
  11. return Action.ReconsumeLater;
  12. }
  13. }
  14. }

普通消费方式

  1. //项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
  2. public class ConsumerClient {
  3. @Autowired
  4. private MqConfig mqConfig;
  5. @Autowired
  6. private DemoMessageListener messageListener;
  7. @Bean(initMethod = "start", destroyMethod = "shutdown")
  8. public ConsumerBean buildConsumer() {
  9. ConsumerBean consumerBean = new ConsumerBean();
  10. //配置文件
  11. Properties properties = mqConfig.getMqPropertie();
  12. properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
  13. //将消费者线程数固定为20个 20为默认值
  14. properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
  15. consumerBean.setProperties(properties);
  16. //订阅关系
  17. Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
  18. Subscription subscription = new Subscription();
  19. subscription.setTopic(mqConfig.getTopic());
  20. subscription.setExpression(mqConfig.getTag());
  21. subscriptionTable.put(subscription, messageListener);
  22. //订阅多个topic如上面设置
  23. consumerBean.setSubscriptionTable(subscriptionTable);
  24. return consumerBean;
  25. }
  26. }

sql 过滤消费

  1. //正式开发时可以加上 @Configuration 注解,这样服务启动时consumer也启动了
  2. //sql92只有mq铂金版才支持
  3. public class SqlConsumerClient {
  4. @Autowired
  5. private MqConfig mqConfig;
  6. @Autowired
  7. private DemoMessageListener messageListener;
  8. @Bean(initMethod = "start", destroyMethod = "shutdown")
  9. public ConsumerBean buildSqlConsumer() {
  10. ConsumerBean consumerBean = new ConsumerBean();
  11. //配置文件
  12. Properties properties = mqConfig.getMqPropertie();
  13. properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
  14. consumerBean.setProperties(properties);
  15. //订阅关系
  16. Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
  17. Subscription subscription = new Subscription();
  18. subscription.setTopic(mqConfig.getTopic());
  19. // 表示需要使用SQL来过滤消息
  20. subscription.setType("SQL92");
  21. //需要消息的tag是'TagA'或'TagB'并且自定义属性a(在发送消息的时候通过putUserProperties方法放入)需要在[0,3]
  22. //SQL过滤同样可以使用消息的tag作为过滤条件(消息的tag在消息的属性中叫做 TAGS)
  23. //SQL过滤同样可以在顺序消费中使用
  24. subscription.setExpression("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)");
  25. subscriptionTable.put(subscription, messageListener);
  26. //订阅多个topic如上面设置
  27. consumerBean.setSubscriptionTable(subscriptionTable);
  28. return consumerBean;
  29. }
  30. }