pom文件
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
配置文件
- 配置一些基础的配置信息
#启动测试之前请替换如下 XXX 为您的配置
rocketmq.accessKey=XXX
rocketmq.secretKey=XX
rocketmq.nameSrvAddr=XXX
rocketmq.topic=XXX
rocketmq.groupId=XXX
rocketmq.tag=*
rocketmq.orderTopic=XXX
rocketmq.orderGroupId=XXX
rocketmq.orderTag=*
- 创建一个配置项实体类,用于读取配置信息
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
@Date
public class MqConfig {
private String accessKey;
private String secretKey;
private String nameSrvAddr;
private String topic;
private String groupId;
private String tag;
private String orderTopic;
private String orderGroupId;
private String orderTag;
public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
return properties;
}
}
普通类型的生产者和消费者定义
生产者
建立连接
@Configuration
public class ProducerClient {
@Autowired
private MqConfig mqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(mqConfig.getMqPropertie());
return producer;
}
}
生产者发送消息
同步
@RunWith(SpringRunner.class)
@SpringBootTest
public class SyncProducerTest {
//普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
@Autowired
private ProducerBean producer;
@Autowired
private MqConfig mqConfig;
@Test
public void testSend() {
//循环发送消息
for (int i = 0; i < 100; i++) {
Message msg = new Message( //
// Message所属的Topic
mqConfig.getTopic(),
// Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
mqConfig.getTag(),
// Message Body 可以是任何二进制形式的数据, MQ不做任何干预
// 需要Producer与Consumer协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
// 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_100");
// 发送消息,只要不抛异常就是成功
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.println(sendResult);
} catch (ONSClientException e) {
System.out.println("发送失败");
//出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
}
}
}
}
异步
@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncProducerTest {
//普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
@Autowired
private ProducerBean producer;
@Autowired
private MqConfig mqConfig;
@Test
public void testSend() {
//对于使用异步接口,建议设置单独的回调处理线程池,拥有更灵活的配置和监控能力。
//如下构造线程的方式请求队列为无界仅用作示例,有OOM的风险。
//更合理的构造方式请参考阿里巴巴Java开发手册:https://github.com/alibaba/p3c
producer.setCallbackExecutor(Executors.newFixedThreadPool(10));
//循环发送消息
for (int i = 0; i < 1; i++) {
Message msg = new Message( //
// Message所属的Topic
mqConfig.getTopic(),
// Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
mqConfig.getTag(),
// Message Body 可以是任何二进制形式的数据, MQ不做任何干预
// 需要Producer与Consumer协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
// 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_100");
// 发送消息,只要不抛异常就是成功
try {
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
assert sendResult != null;
System.out.println(sendResult);
}
@Override
public void onException(final OnExceptionContext context) {
//出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
}
});
} catch (ONSClientException e) {
System.out.println("发送失败");
//出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
}
}
}
}
sql 过滤
@RunWith(SpringRunner.class)
@SpringBootTest
public class SqlProducerTest {
//普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
@Autowired
private ProducerBean producer;
@Autowired
private MqConfig mqConfig;
@Test
public void testSend() {
//循环发送消息
for (int i = 0; i < 100; i++) {
String tag;
int div = i % 3;
if (div == 0) {
tag = "TagA";
} else if (div == 1) {
tag = "TagB";
} else {
tag = "TagC";
}
Message msg = new Message( //
// Message所属的Topic
mqConfig.getTopic(),
// Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
tag,
// Message Body 可以是任何二进制形式的数据, MQ不做任何干预
// 需要Producer与Consumer协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
// 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_100");
// 设置自定义属性,该属性可用于做SQL属性过滤
msg.putUserProperties("a", String.valueOf(i));
// 发送消息,只要不抛异常就是成功
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.println(sendResult);
} catch (ONSClientException e) {
System.out.println("发送失败");
//出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
}
}
}
}
消费者
监听
@Component
public class DemoMessageListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
try {
//do something..
return Action.CommitMessage;
} catch (Exception e) {
//消费失败
return Action.ReconsumeLater;
}
}
}
普通消费方式
//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
public class ConsumerClient {
@Autowired
private MqConfig mqConfig;
@Autowired
private DemoMessageListener messageListener;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
//将消费者线程数固定为20个 20为默认值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getTopic());
subscription.setExpression(mqConfig.getTag());
subscriptionTable.put(subscription, messageListener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}
sql 过滤消费
//正式开发时可以加上 @Configuration 注解,这样服务启动时consumer也启动了
//sql92只有mq铂金版才支持
public class SqlConsumerClient {
@Autowired
private MqConfig mqConfig;
@Autowired
private DemoMessageListener messageListener;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildSqlConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getTopic());
// 表示需要使用SQL来过滤消息
subscription.setType("SQL92");
//需要消息的tag是'TagA'或'TagB'并且自定义属性a(在发送消息的时候通过putUserProperties方法放入)需要在[0,3]
//SQL过滤同样可以使用消息的tag作为过滤条件(消息的tag在消息的属性中叫做 TAGS)
//SQL过滤同样可以在顺序消费中使用
subscription.setExpression("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)");
subscriptionTable.put(subscription, messageListener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}