RocketMQ环境做准备
- 安装Linux 系统 ,配置ip为 192.168.1.100, ip对应
- 安装Docker环境
拉取 rocketmq 镜像版本
docker pull rocketmqinc/rocketmq:4.4.0
注意: Rocketmq 的服务端版本与客户端版本要一致! 不一致可能会出现异常的错误(难以发现)!
docker 部署 rocketmq namesrv 服务
docker run -itd --name rocketmq_namesrv --restart=always \-p 9876:9876 \-v /etc/localtime:/etc/localtime:ro \-v /data/rocketmq/namesrv:/home/rocketmq \rocketmqinc/rocketmq:4.4.0 sh mqnamesrv autoCreateTopicEnable=true
docker 部署 broker 服务(注意配置的路径修改)
docker run -d -p 10911:10911 -p 10909:10909 --restart=always \-v /data/rocketmq/broker:/home/rocketmq:rw \-v /deploy/etc/rocketmq-broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \-v /etc/localtime:/etc/localtime:ro \--name rocketmq_broker -e "NAMESRV_ADDR=192.168.1.100:9876" \-e "MAX_POSSIBLE_HEAP=200000000" \rocketmqinc/rocketmq:4.4.0 sh mqbroker autoCreateTopicEnable=true -c /opt/rocketmq-4.4.0/conf/broker.conf
rocketmq-broker.conf
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1 = 192.168.1.100 autoCreateTopicEnable=truedocker 部署 rocketmq console 控制台(注意namesrv的地址)
docker run -itd --name $name -m 500m --restart=always \ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.1.100:9876 \ -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ -p 8080:8080 -t pangliang/rocketmq-console-ng
RocketMQ 常规消息
maven 工程添加库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加库
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
注意: 要到控制台创建 Topic 队列名称
同步发送消息(Send Messages Synchronously)
public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置 name server 服务地址, 这里是是设置本机 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动实例 producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }异常发送消息(Send Messages Asynchronously)
public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置 name server 服务地址, 这里是是设置本机 producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }Send Messages in One-way Mode
public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Wait for sending to complete Thread.sleep(5000); producer.shutdown(); } }ClientConfig 配置初始化
private static ClientConfig initClientConfig() { ClientConfig mClientConfig = new ClientConfig(); // 客户端本机 IP 地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 // Name Server 地址列表,多个 NameServer 地址用分号 隔开 mClientConfig.setNamesrvAddr("127.0.0.1:9876"); // 客户端实例名称,客户端创建的多个 Producer、 Consumer 实际是共用一个内部实例(这个实例包含 // 网络连接、线程资源等),默认值:DEFAULT mClientConfig.setInstanceName("DEFAULT"); // 通信层异步回调线程数,默认值4 mClientConfig.setClientCallbackExecutorThreads(10); // 轮询 Name Server 间隔时间,单位毫秒,默认:30000 // mClientConfig.setPollNameServerInterval(30000); // 向 Broker 发送心跳间隔时间,单位毫秒,默认:30000 mClientConfig.setHeartbeatBrokerInterval(30000); // 持久化 Consumer 消费进度间隔时间,单位毫秒,默认:5000 mClientConfig.setPersistConsumerOffsetInterval(5000); return ClientConfig; }Producer 初始化
private static void initProducer() { try { DefaultMQProducer mProducer = new DefaultMQProducer(); ClientConfig config = initClientConfig(); mProducer.resetClientConfig(config); // 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 默认值 4 mProducer.setDefaultTopicQueueNums(4); // 发送消息超时时间,单位毫秒 : 默认值 10000 mProducer.setSendMsgTimeout(10000); // 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 默认值 4096 mProducer.setCompressMsgBodyOverHowmuch(4096); // 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 默认值 FALSE mProducer.setRetryAnotherBrokerWhenNotStoreOK(false); mProducer.setProducerGroup(DEFAULT_GROUP); // mProducer.setRetryTimesWhenSendAsyncFailed(3); mProducer.start(); } catch (Exception e) { LOG.error("init producer error:", e); } }发送消息
public boolean sendMessage(String queue, String body, String tags) { try { if(StringUtils.isEmpty(tags)) { tags = StringUtils.getEmpty(); } Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET)); // Call send message to deliver message to one of brokers. SendResult sendResult = mProducer.send(msg); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { return true; } LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult)); } catch (Exception e) { LOG.error("send queue error:", e); } return false; }
RocketMQ 顺序消息
maven 工程添加库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加库
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息例子(Send message sample code)
public class TestOrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
MQProducer producer = new DefaultMQProducer("example_group_name");
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
消费消息例子(Subscription message sample code)
public class TestOrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
RocketMQ 订阅消息
maven 工程添加库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加库
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息
public class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 100; i++){
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费消息
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//set to broadcast mode
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
RocketMQ 定时消息
maven 工程添加库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加库
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class TestScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
消费消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TestScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// 订阅所有消息
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch consumer
consumer.start();
}
}
验证消息:
10s之后 请看 storing time , 看看是不是成功了
RocketMQ 批量消息
maven 工程添加库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加库
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送批量消息
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
发送大量批量消息,请分批进行
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
}
RocketMQ 过虑消息
maven 工程添加库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加库
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
注意:
tag的使用!
要到控制台创建 Topic 队列名称
官方过虑消息例子:
http://rocketmq.apache.org/docs/filter-by-sql92-example/
发送消息
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 消息处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
RocketMQ OpenMessaging
OpenMessaging是阿里牵头发起的分布式消息模型标准,其api规范首先在rocketmq中落地实现。 通过这个标准可以简化各种消息中间件的高复杂性和不兼容性,提升消息中间件服务的易用性。
maven 工程添加库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加库
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息
public class OMSProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
System.out.printf("Producer startup OK%n");
{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
}
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
}
});
}
{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
producer.shutdown();
messagingAccessPoint.shutdown();
}
}
拉取消息消费
public class OMSPullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
consumer.startup();
System.out.printf("Consumer startup OK%n");
Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}
通过推送消费消息
public class OMSPushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});
}
}
RocketMQ 事务消息
maven 工程添加库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加库
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 线程池
producer.setExecutorService(executorService);
// 事务监听器
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
实现事务监听器
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
RocketMQ Java 客户端封装
RocketMQ 应用非常多,但是在实际代码开发过程,我们肯定不能以上面的代码在实际项目中应用, 肯定是要把它们都封装一下,由自己提供的Api来调用RocketMQ,这样才能更方便!
对于消息队列,我们关注的地方:
- 消息生产者 ``` 对于消息生产者,我们只关注两点
- 队列名称
- 要发送的消息 ```
- 消息消费者 ``` 对于消费者,我们只关注以下几点
- 定义一个接口:
上面我们定义了一个发送消息的方法: ```package com.pangugle.framework.mq; import com.pangugle.framework.service.Callback; public interface MQSupport{ /** * 对于rocketmq 没有用 * @param topic */ public void declareTopic(String topic); public void deleteTopic(String topic); /** * 消息消息 * @param topic * @param body * @return */ public boolean sendMessage(String topic, String body); public boolean sendMessage(String topic, String body, String tags); /** * 消费消息, 消息不重复消息 * @param tags * @param callback */ public void consume(String topic, String tags, Callback<String> callback); /** * 订阅消息,消息重复消费 * @param tags * @param callback */ public void subscribe(String topic, String tags, Callback<String> callback); }
- sendMessage(String topic, String body);
- sendMessage(String topic, String body, String tags);
和消费消息的方法: - consume(String topic, String tags, Callback
callback); - subscribe(String topic, String tags, Callback
callback); ```
Rocketmq 实现这个接口
package com.pangugle.framework.mq.impl; import java.util.List; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.common.RemotingHelper; import com.pangugle.framework.conf.MyConfiguration; import com.pangugle.framework.log.Log; import com.pangugle.framework.log.LogFactory; import com.pangugle.framework.mq.MQSupport; import com.pangugle.framework.service.Callback; import com.pangugle.framework.utils.FastJsonHelper; import com.pangugle.framework.utils.StringUtils; public class RocketMQImpl implements MQSupport { private static Log LOG = LogFactory.getLog(RocketMQImpl.class); private static String DEFAULT_GROUP = "pangule_default_group"; private static int DEFAULT_CONSMER_THREAD_SIZE = 5; private static String mServer = null; private static ClientConfig mClientConfig = new ClientConfig(); private static DefaultMQProducer mProducer; public RocketMQImpl() { synchronized (RocketMQImpl.class) { if (mProducer == null) { initClientConfig(); initProducer(); } } } @Override public void declareTopic(String queue) { } @Override public void deleteTopic(String queue) { } @Override public boolean sendMessage(String queue, String body, String tags) { try { if(StringUtils.isEmpty(tags)) { tags = StringUtils.getEmpty(); } Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET)); // Call send message to deliver message to one of brokers. SendResult sendResult = mProducer.send(msg); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { return true; } LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult)); } catch (Exception e) { LOG.error("send queue error:", e); } return false; } @Override public boolean sendMessage(String queue, String body) { return sendMessage(queue, body, null); } private static void initClientConfig() { mServer = MyConfiguration.getInstance().getString("mq.rocket.server"); LOG.info("rocketmq.server = " + mServer); // 客户端本机 IP 地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 // Name Server 地址列表,多个 NameServer 地址用分号 隔开 mClientConfig.setNamesrvAddr(mServer); // 客户端实例名称,客户端创建的多个 Producer、 Consumer 实际是共用一个内部实例(这个实例包含 // 网络连接、线程资源等),默认值:DEFAULT mClientConfig.setInstanceName("DEFAULT"); // 通信层异步回调线程数,默认值4 mClientConfig.setClientCallbackExecutorThreads(10); // 轮询 Name Server 间隔时间,单位毫秒,默认:30000 // mClientConfig.setPollNameServerInterval(30000); // 向 Broker 发送心跳间隔时间,单位毫秒,默认:30000 mClientConfig.setHeartbeatBrokerInterval(30000); // 持久化 Consumer 消费进度间隔时间,单位毫秒,默认:5000 mClientConfig.setPersistConsumerOffsetInterval(5000); } private static void initProducer() { try { mProducer = new DefaultMQProducer(); mProducer.resetClientConfig(mClientConfig); // 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 默认值 4 mProducer.setDefaultTopicQueueNums(4); // 发送消息超时时间,单位毫秒 : 默认值 10000 mProducer.setSendMsgTimeout(10000); // 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 默认值 4096 mProducer.setCompressMsgBodyOverHowmuch(4096); // 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 默认值 FALSE mProducer.setRetryAnotherBrokerWhenNotStoreOK(false); mProducer.setProducerGroup(DEFAULT_GROUP); // mProducer.setRetryTimesWhenSendAsyncFailed(3); mProducer.start(); } catch (Exception e) { LOG.error("init producer error:", e); } } @Override public void consume(String topic, String tags, Callback<String> callback) { try { if(StringUtils.isEmpty(tags)) { tags = StringUtils.getEmpty(); } DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // TODO Auto-generated method stub for(MessageExt ext : msgs) { try { String body = new String(ext.getBody()); callback.execute(body); } catch (Exception e) { } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (MQClientException e) { LOG.error("consume error:", e); } } @Override public void subscribe(String topic, String tags, Callback<String> callback) { try { if(StringUtils.isEmpty(tags)) { tags = StringUtils.getEmpty(); } DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // TODO Auto-generated method stub for(MessageExt ext : msgs) { try { String body = new String(ext.getBody()); callback.execute(body); } catch (Exception e) { } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (MQClientException e) { LOG.error("subscrebe error:", e); } } private static DefaultMQPushConsumer getConsumerInstance(String topic, String tags) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.resetClientConfig(mClientConfig); consumer.setConsumerGroup(topic + tags); consumer.setConsumeThreadMin(DEFAULT_CONSMER_THREAD_SIZE); consumer.setConsumeThreadMax(DEFAULT_CONSMER_THREAD_SIZE); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // mConsumer.subscribe(queue, "TagA || TagC || TagD"); consumer.subscribe(topic, tags); return consumer; } }简化调用:
package com.pangugle.framework.mq; import java.io.IOException; import java.util.Map; import com.google.common.collect.Maps; import com.pangugle.framework.mq.impl.RedisMQImpl; import com.pangugle.framework.mq.impl.RocketMQImpl; import com.pangugle.framework.service.Callback; import com.pangugle.framework.utils.ThreadUtils; public class MQManager{ Map<String, MQSupport> maps = Maps.newConcurrentMap(); private interface ManagerInternal { public MQManager mgr = new MQManager(); } public static MQManager getInstance() { return ManagerInternal.mgr; } private MQManager() { //maps.put(MQImpl.REDIS.name(), new RedisMQImpl()); maps.put(MQImpl.ROCKETMQ.name(), new RocketMQImpl()); } public MQSupport getMQ() { return maps.get(MQImpl.ROCKETMQ.name()); } public static enum MQImpl{ REDIS, // redis ROCKETMQ; // rocketmq } public static void main(String[] args) throws InterruptedException, IOException { // 定义消息队列 String queue = "pangugle_test"; // String tags = null; MQSupport mq = MQManager.getInstance().getMQ(); // 订阅消息消费 mq.subscribe(queue, tags, new Callback<String>() { public void execute(String o) { System.out.println("consuemr 1 " + o); } }); // 集群消息消息 // mq.consume(queue, tags, new Callback<String>() { // public void execute(String o) { // System.out.println("consuemr 1 " + o); // } // }); for(int i = 0; i < 1000; i ++) { mq.sendMessage(queue, "i = " + i, tags); ThreadUtils.sleep(1000); } System.in.read(); } }注意上面测试:
要到控制台创建 Topic 队列名称,也就是 pangugle_test 这个名称!
好了搞定了
现在我们使用消息队列就非常简单了:初始化消息队列 MQSupport mq = MQManager.getInstance().getMQ();
- 发送消息 sendMessage
- 消费消息
- subscribe
- consume
主流消息队列对比(kafka、Rabbitmq、Rocketmq)
主流的消息队列有以下几种:
- Kafka ``` Apache开源的消息队列,主要应用于大数据方向上;
- Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量;
- 常常用于日志收集和传输;
- 0.8开始支持复制,不支持事务,适合产生大量数据的互联网服务的数据收集业务; ```
- RabbitMQ ```
- RabbitMQ是一个AMQP实现,传统的messaging queue系统实现,基于Erlang。
- AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景 ```
RocketMQ
阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会CMQ
腾讯云分布式高可靠消息队列服务从上面我们看到,如果应用于大数据上,那么毫无疑问就是使用了kafka了, 如果追求对数据一致性、稳定性和可靠性要求很高的场景那么就选择Rabbitmq, 不过对于个人选择,我会选择rocketmq, 我认为Rocketmq真的太强大,是一个真正 在线上在规模应用的消息队列!
主流消息队列对比
|
- -
|
- - RabbitMQ
|
- - RocketMQ
|
- - CMQ
|
- - Kafka
|
| —- | —- | —- | —- | —- |
|
- - 模式
|
- - 发布订阅
|
- - 发布订阅
|
- - 传统 queue/发布订阅
|
- - 发布订阅
|
|
- - 同步算法
|
- - GM
|
- - 同步双写
|
- - Raft
|
- - ISR(Replica)
|
|
- - 分布式扩展
|
- - 否
|
- - 支持
|
- - 支持
|
- - 支持
|
|
- - 堆积能力
|
- - 磁盘容量
|
- - 磁盘容量
|
- - 磁盘(水平扩展)
|
- - 磁盘(水平扩展)
|
|
- - 性能
|
- - 中
|
- - 高
|
- - 高
|
- - 很高
|
|
- - 可靠性
|
- - 一般
|
- - 一般
|
- - 极高
|
- - 一般
|
|
- - 持久化
|
- - 内存 /硬盘
|
- - 磁盘
|
- - 磁盘
|
- - 磁盘
|
