RocketMQ原生API使用
1:项目搭建
**maven
**
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
其中源码中 example项目有详细的测试代码
生产者消费者需要依赖的是NameServer
多个nameServer分号分隔
2、RocketMQ的编程模型
生产者
- 创建生产者 producer,制定生产者组名
- 制定nameServer地址
- producer启动
- 创建消息对象,制定topic, tag ,消息体
- 发送消息
-
消费者
创建consumer,制定消费组名
- 制定nameServer地址
- 订阅topic, tag
- 设置回调函数,处理消息
- 启动消费者
3、RocketMQ的消息样例
基本样例
发送方式
1:同步发送
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.232.128:9876");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 20; i++) {
try {
Message msg = new Message("lite_pull_consumer_test",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步传递消息,消息会发给集群中的一个Broker节点。
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
//返回结果,执行下面的code
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
2:异步发送
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
//由于是异步发送,这里引入一个countDownLatch,
//保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
// 成功的回调
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
//异常的回调
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
3:单向发送
没有结果,没有回调,只管发送出去
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
消费模式:
1:主动拉模式
//已经删除了,不推荐了
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Jodie_topic_1023");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
2:等待broker推模式push
实际是pull模式封装的
//也已经过期了,替换的类是DefaultLitePullConsumerImpl
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("lite_pull_consumer_test", "*");
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
//consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
消息类型
1:顺序消息
只保证局部有序,不能保证圈住有序
发送者默认会round robin轮询把消息发送到不同的MessageQueue(分区队列)
消费者也会从messageQueue上拉取,只有一组有序的消息发在同一个MessageQueue才能利用MessageQueue
先进先出保证有序
brokerz中一个队列内的消息是保证有序的
消费者:会从多个消息队列上取消息,所以多个队列上的消息是无序的。
消费者要有序,需要一个队列一个队列的取数据。
消费者注入MessageListenerOrderly 会通过锁队列,一个一个的取消息
MessageListenerConcurrently 不会锁队列,只会随机取一批无法保证顺序
try {
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
2:广播消息
默认是集群状态MessageModel.CLUSTERING: 一个消息只会被消费组中的一个实例消费
广播模式MessageModel.BROADCASTING:消费组里的所有实例都会消费这个消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
3:延迟消息
是等一会
message.setDelayTimeLevel(3); 3:是隔离级别
分别对应:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
开源版本一共18个隔离级别,底层实现是18个队列实现的
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
4:批量消息
把多个消息一次发送出去,减少IO
如果批量消息大于1M就不要一个批次了,可以拆分
实际情况是不要大于4M,需要是同一个topic的消息,等
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
5:过滤消息
tag过滤
生产者:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
消费者:
RocketMQ最佳实践 ,一个应用可以用一个Topic,不同的业务用tag区分
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//订阅 tagA tagC的消息
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
SQL过滤
s生产者,没啥区别的
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
消费者
使用MessageSelector.bySql
按照SQL92标准,
SQL可以使用的tage和生产者加入的属性
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
SQL92语法
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
6:事务消息
这是事务消息被分成了两个事务:发送时,消费时
只能保证发送消息与本地事务的两个操作的原子性
类: TransactionMQProducer,TransactionListener
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TestTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
需要有个监听的实现
class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
//half消息发送成功后回调此方法,执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
//do 本地事务,本地数据库的操作
boolean isSuccess = true;
//本地事务crud成功,就提交消息
if(isSuccess){
//会立刻被消费者消费
return LocalTransactionState.COMMIT_MESSAGE;
}else if(!isSuccess){
//如果本地事务失败,就回滚mq消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}else{
return LocalTransactionState.UNKNOW;
}
}
//COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//回调,查询本地事务的状态吧,算是对上面的一种补充
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
不支持延迟消息,不支持批量消息
单个消息检查的次数是15次,可以在broker tansactionCheckMax配置
默认超过max会丢弃,也是重写AbstractTransactionCheckListener改变行为
实际检查的次数会在messge保持的用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES
配置文件中transactionMsgTimeout 指定特定时间后被检查
可以在用户属性CHECK_IMMUNITY_TIME_IN_SECONDS 修改时间限制
BrokerConfig.transactionTimeOut参数配置,默认6秒,可在broker.conf配置
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”);
设置未为10s
事务消息可能不止一次被检查消费
建议使用双重写入机制
事务机制的顺序图:
在发送half办消息时:其实是吧消息存入 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic
对消费者不可见,最终提交才会转存到对应的topic
4:ACL权限控制
文档说明: docs/cn/acl/user_guide.md
主要在broker.conf配置,
打开acl标志:aclEnable=true
plain_acl.yml进行权限配置,热加载不需要重启
全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.
- 192.168.0.
accounts:
#第一个账户
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY #默认Topic访问策略是拒绝
defaultGroupPerm: SUB #默认Group访问策略是只允许订阅
topicPerms:
- topicA=DENY #topicA拒绝
- topicB=PUB|SUB #topicB允许发布和订阅消息
- topicC=SUB #topicC只允许订阅
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
#第二个账户,只要是来自192.168.1.的IP,就可以访问所有资源
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.
# if it is admin, it could access all resources
admin: true
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.shutdown();
//设置用户和密码
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}