RocketMQ环境做准备

拉取 rocketmq 镜像版本

  1. docker pull rocketmqinc/rocketmq:4.4.0

注意: Rocketmq 的服务端版本与客户端版本要一致! 不一致可能会出现异常的错误(难以发现)!

  • docker 部署 rocketmq namesrv 服务

    1. docker run -itd --name rocketmq_namesrv --restart=always \
    2. -p 9876:9876 \
    3. -v /etc/localtime:/etc/localtime:ro \
    4. -v /data/rocketmq/namesrv:/home/rocketmq \
    5. rocketmqinc/rocketmq:4.4.0 sh mqnamesrv autoCreateTopicEnable=true
  • docker 部署 broker 服务(注意配置的路径修改)

    1. docker run -d -p 10911:10911 -p 10909:10909 --restart=always \
    2. -v /data/rocketmq/broker:/home/rocketmq:rw \
    3. -v /deploy/etc/rocketmq-broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
    4. -v /etc/localtime:/etc/localtime:ro \
    5. --name rocketmq_broker -e "NAMESRV_ADDR=192.168.1.100:9876" \
    6. -e "MAX_POSSIBLE_HEAP=200000000" \
    7. rocketmqinc/rocketmq:4.4.0 sh mqbroker autoCreateTopicEnable=true -c /opt/rocketmq-4.4.0/conf/broker.conf

    rocketmq-broker.conf

    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    brokerIP1 = 192.168.1.100
    autoCreateTopicEnable=true
    
  • docker 部署 rocketmq console 控制台(注意namesrv的地址)

    docker run -itd --name $name -m 500m --restart=always \
             -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.1.100:9876 \
             -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
             -p 8080:8080 -t pangliang/rocketmq-console-ng
    

RocketMQ 常规消息

maven 工程添加库

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加库

compile 'org.apache.rocketmq:rocketmq-client:4.4.0'

注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
注意: 要到控制台创建 Topic 队列名称

  • 同步发送消息(Send Messages Synchronously)

    public class SyncProducer {
      public static void main(String[] args) throws Exception {
          //Instantiate with a producer group name.
          DefaultMQProducer producer = new
              DefaultMQProducer("please_rename_unique_group_name");
          // 设置 name server 服务地址, 这里是是设置本机
          producer.setNamesrvAddr("127.0.0.1:9876");
          // 启动实例
          producer.start();
          for (int i = 0; i < 100; i++) {
              //Create a message instance, specifying topic, tag and message body.
              Message msg = new Message("TopicTest" /* Topic */,
                  "TagA" /* Tag */,
                  ("Hello RocketMQ " +
                      i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
              );
              //Call send message to deliver message to one of brokers.
              SendResult sendResult = producer.send(msg);
              System.out.printf("%s%n", sendResult);
          }
          //Shut down once the producer instance is not longer in use.
          producer.shutdown();
      }
    }
    
  • 异常发送消息(Send Messages Asynchronously)

    public class AsyncProducer {
      public static void main(String[] args) throws Exception {
          //Instantiate with a producer group name.
          DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          // 设置 name server 服务地址, 这里是是设置本机
          producer.setNamesrvAddr("localhost:9876");
          //Launch the instance.
          producer.start();
          producer.setRetryTimesWhenSendAsyncFailed(0);
          int messageCount = 100;
          final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
          for (int i = 0; i < messageCount; i++) {
              try {
                  final int index = i;
                  Message msg = new Message("Jodie_topic_1023",
                      "TagA",
                      "OrderID188",
                      "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                  producer.send(msg, new SendCallback() {
                      @Override
                      public void onSuccess(SendResult sendResult) {
                          countDownLatch.countDown();
                          System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                      }
                      @Override
                      public void onException(Throwable e) {
                          countDownLatch.countDown();
                          System.out.printf("%-10d Exception %s %n", index, e);
                          e.printStackTrace();
                      }
                  });
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
          countDownLatch.await(5, TimeUnit.SECONDS);
          producer.shutdown();
      }
    }
    
  • Send Messages in One-way Mode

    public class OnewayProducer {
      public static void main(String[] args) throws Exception{
          //Instantiate with a producer group name.
          DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          // Specify name server addresses.
          producer.setNamesrvAddr("localhost:9876");
          //Launch the instance.
          producer.start();
          for (int i = 0; i < 100; i++) {
              //Create a message instance, specifying topic, tag and message body.
              Message msg = new Message("TopicTest" /* Topic */,
                  "TagA" /* Tag */,
                  ("Hello RocketMQ " +
                      i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
              );
              //Call send message to deliver message to one of brokers.
              producer.sendOneway(msg);
          }
          //Wait for sending to complete
          Thread.sleep(5000);        
          producer.shutdown();
      }
    }
    

    ClientConfig 配置初始化

    private static ClientConfig initClientConfig() {
     ClientConfig mClientConfig = new ClientConfig();
    // 客户端本机 IP 地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定
    // Name Server 地址列表,多个 NameServer 地址用分号 隔开
    mClientConfig.setNamesrvAddr("127.0.0.1:9876");
    // 客户端实例名称,客户端创建的多个 Producer、 Consumer 实际是共用一个内部实例(这个实例包含
    // 网络连接、线程资源等),默认值:DEFAULT
    mClientConfig.setInstanceName("DEFAULT");
    // 通信层异步回调线程数,默认值4
    mClientConfig.setClientCallbackExecutorThreads(10);
    // 轮询 Name Server 间隔时间,单位毫秒,默认:30000
    // mClientConfig.setPollNameServerInterval(30000);
    // 向 Broker 发送心跳间隔时间,单位毫秒,默认:30000
    mClientConfig.setHeartbeatBrokerInterval(30000);
    // 持久化 Consumer 消费进度间隔时间,单位毫秒,默认:5000
    mClientConfig.setPersistConsumerOffsetInterval(5000);
    return ClientConfig;
    }
    

    Producer 初始化

    private static void initProducer() {
    try {
      DefaultMQProducer mProducer = new DefaultMQProducer();
      ClientConfig config = initClientConfig();
      mProducer.resetClientConfig(config);
      // 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 默认值 4
      mProducer.setDefaultTopicQueueNums(4);
      // 发送消息超时时间,单位毫秒 : 默认值 10000
      mProducer.setSendMsgTimeout(10000);
      // 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 默认值 4096
      mProducer.setCompressMsgBodyOverHowmuch(4096);
      // 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 默认值 FALSE
      mProducer.setRetryAnotherBrokerWhenNotStoreOK(false);
      mProducer.setProducerGroup(DEFAULT_GROUP);
    //            mProducer.setRetryTimesWhenSendAsyncFailed(3);
      mProducer.start();
    } catch (Exception e) {
      LOG.error("init producer error:", e);
    }
    }
    

    发送消息

    public boolean sendMessage(String queue, String body, String tags) {
    try {
      if(StringUtils.isEmpty(tags))
      {
        tags = StringUtils.getEmpty();
      }
      Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
      // Call send message to deliver message to one of brokers.
      SendResult sendResult = mProducer.send(msg);
      if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
        return true;
      }
      LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult));
    } catch (Exception e) {
      LOG.error("send queue error:", e);
    }
    return false;
    }
    

RocketMQ 顺序消息

maven 工程添加库

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加库

compile 'org.apache.rocketmq:rocketmq-client:4.4.0'

注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息例子(Send message sample code)

public class TestOrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId);
            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}

消费消息例子(Subscription message sample code)

public class TestOrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

RocketMQ 订阅消息

maven 工程添加库

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加库

compile 'org.apache.rocketmq:rocketmq-client:4.4.0'

注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息

public class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();
        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

消费消息

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

RocketMQ 定时消息

maven 工程添加库

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加库

compile 'org.apache.rocketmq:rocketmq-client:4.4.0'

注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
 public class TestScheduledMessageProducer {
     public static void main(String[] args) throws Exception {
         // Instantiate a producer to send scheduled messages
         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
             // This message will be delivered to consumer 10 seconds later.
             message.setDelayTimeLevel(3);
             // Send the message
             producer.send(message);
         }
         // Shutdown producer after use.
         producer.shutdown();
     }
 }

消费消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.common.message.MessageExt;
 import java.util.List;
 public class TestScheduledMessageConsumer {
     public static void main(String[] args) throws Exception {
         // Instantiate message consumer
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
         // 订阅所有消息
         consumer.subscribe("TestTopic", "*");
         // Register message listener
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                 for (MessageExt message : messages) {
                     // Print approximate delay time period
                     System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                             + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         // Launch consumer
         consumer.start();
     }
 }

验证消息:
10s之后 请看 storing time , 看看是不是成功了

RocketMQ 批量消息

maven 工程添加库

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加库

compile 'org.apache.rocketmq:rocketmq-client:4.4.0'

注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送批量消息

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}

发送大量批量消息,请分批进行

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //handle the error
   }
}

RocketMQ 过虑消息

maven 工程添加库

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加库

compile 'org.apache.rocketmq:rocketmq-client:4.4.0'

注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
注意:
tag的使用!
要到控制台创建 Topic 队列名称
官方过虑消息例子:
http://rocketmq.apache.org/docs/filter-by-sql92-example/
发送消息

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();

消费消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 消息处理
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

RocketMQ OpenMessaging

OpenMessaging是阿里牵头发起的分布式消息模型标准,其api规范首先在rocketmq中落地实现。 通过这个标准可以简化各种消息中间件的高复杂性和不兼容性,提升消息中间件服务的易用性。
maven 工程添加库

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加库

compile 'org.apache.rocketmq:rocketmq-client:4.4.0'

注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
        final Producer producer = messagingAccessPoint.createProducer();
        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        producer.startup();
        System.out.printf("Producer startup OK%n");
        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }
        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }
                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }
        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }
        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

拉取消息消费

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        consumer.startup();
        System.out.printf("Consumer startup OK%n");
        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }
        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

通过推送消费消息

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
        final PushConsumer consumer = messagingAccessPoint.
            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
            @Override
            public void onMessage(final Message message, final ReceivedMessageContext context) {
                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
                context.ack();
            }
        });
    }
}

RocketMQ 事务消息

maven 工程添加库

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加库

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

注意:
客户端版本要和服务端版本的一致,或者会发生一些奇怪的问题:
我遇到过版本不一致会发生,消息无法确认消息消费,也就是说 客户端已经消费了,也提交成功了,但是服务端没有同步到!
要到控制台创建 Topic 队列名称
发送消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        // 线程池
        producer.setExecutorService(executorService);
        // 事务监听器
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

实现事务监听器

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

RocketMQ Java 客户端封装

RocketMQ 应用非常多,但是在实际代码开发过程,我们肯定不能以上面的代码在实际项目中应用, 肯定是要把它们都封装一下,由自己提供的Api来调用RocketMQ,这样才能更方便!
对于消息队列,我们关注的地方:

  • 消息生产者 ``` 对于消息生产者,我们只关注两点
  1. 队列名称
  2. 要发送的消息 ```
  • 消息消费者 ``` 对于消费者,我们只关注以下几点
  1. 订阅消息
  2. 集群消费消息 ```

    站在应用层上,调用方只关注Api调用,它不关注Rocketmq内部的具体实现,和初始化!

  • 定义一个接口:
    package com.pangugle.framework.mq;
    import com.pangugle.framework.service.Callback;
    public  interface MQSupport{
      /**
       * 对于rocketmq 没有用
       * @param topic
       */
      public  void declareTopic(String topic);
      public  void deleteTopic(String topic);
      /**
       * 消息消息
       * @param topic
       * @param body
       * @return
       */
      public  boolean sendMessage(String topic, String body);
      public  boolean sendMessage(String topic, String body, String tags);
      /**
       * 消费消息, 消息不重复消息
       * @param tags
       * @param callback
       */
      public void consume(String topic, String tags, Callback<String> callback);
      /**
       * 订阅消息,消息重复消费
       * @param tags
       * @param callback
       */
      public void subscribe(String topic, String tags, Callback<String> callback);
    }
    
    上面我们定义了一个发送消息的方法: ```
  1. sendMessage(String topic, String body);
  2. sendMessage(String topic, String body, String tags);
    和消费消息的方法:
    
  3. consume(String topic, String tags, Callback callback);
  4. subscribe(String topic, String tags, Callback callback); ```
  • Rocketmq 实现这个接口

    package com.pangugle.framework.mq.impl;
    import java.util.List;
    import org.apache.rocketmq.client.ClientConfig;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import com.pangugle.framework.conf.MyConfiguration;
    import com.pangugle.framework.log.Log;
    import com.pangugle.framework.log.LogFactory;
    import com.pangugle.framework.mq.MQSupport;
    import com.pangugle.framework.service.Callback;
    import com.pangugle.framework.utils.FastJsonHelper;
    import com.pangugle.framework.utils.StringUtils;
    public class RocketMQImpl implements MQSupport {
      private static Log LOG = LogFactory.getLog(RocketMQImpl.class);
      private static String DEFAULT_GROUP = "pangule_default_group";
      private static int DEFAULT_CONSMER_THREAD_SIZE = 5;
      private static String mServer = null;
      private static ClientConfig mClientConfig = new ClientConfig();
      private static DefaultMQProducer mProducer;
      public RocketMQImpl() {
          synchronized (RocketMQImpl.class) {
              if (mProducer == null) {
                  initClientConfig();
                  initProducer();
              }
          }
      }
      @Override
      public void declareTopic(String queue) {
      }
      @Override
      public void deleteTopic(String queue) {
      }
      @Override
      public boolean sendMessage(String queue, String body, String tags) {
          try {
              if(StringUtils.isEmpty(tags))
              {
                  tags = StringUtils.getEmpty();
              }
              Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
              // Call send message to deliver message to one of brokers.
              SendResult sendResult = mProducer.send(msg);
              if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                  return true;
              }
              LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult));
          } catch (Exception e) {
              LOG.error("send queue error:", e);
          }
          return false;
      }
      @Override
      public boolean sendMessage(String queue, String body) {
          return sendMessage(queue, body, null);
      }
      private static void initClientConfig() {
          mServer = MyConfiguration.getInstance().getString("mq.rocket.server");
          LOG.info("rocketmq.server = " + mServer);
          // 客户端本机 IP 地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定
          // Name Server 地址列表,多个 NameServer 地址用分号 隔开
          mClientConfig.setNamesrvAddr(mServer);
          // 客户端实例名称,客户端创建的多个 Producer、 Consumer 实际是共用一个内部实例(这个实例包含
          // 网络连接、线程资源等),默认值:DEFAULT
          mClientConfig.setInstanceName("DEFAULT");
          // 通信层异步回调线程数,默认值4
          mClientConfig.setClientCallbackExecutorThreads(10);
          // 轮询 Name Server 间隔时间,单位毫秒,默认:30000
          // mClientConfig.setPollNameServerInterval(30000);
          // 向 Broker 发送心跳间隔时间,单位毫秒,默认:30000
          mClientConfig.setHeartbeatBrokerInterval(30000);
          // 持久化 Consumer 消费进度间隔时间,单位毫秒,默认:5000
          mClientConfig.setPersistConsumerOffsetInterval(5000);
      }
      private static void initProducer() {
          try {
              mProducer = new DefaultMQProducer();
              mProducer.resetClientConfig(mClientConfig);
              // 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 默认值 4
              mProducer.setDefaultTopicQueueNums(4);
              // 发送消息超时时间,单位毫秒 : 默认值 10000
              mProducer.setSendMsgTimeout(10000);
              // 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 默认值 4096
              mProducer.setCompressMsgBodyOverHowmuch(4096);
              // 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 默认值 FALSE
              mProducer.setRetryAnotherBrokerWhenNotStoreOK(false);
              mProducer.setProducerGroup(DEFAULT_GROUP);
    //            mProducer.setRetryTimesWhenSendAsyncFailed(3);
              mProducer.start();
          } catch (Exception e) {
              LOG.error("init producer error:", e);
          }
      }
      @Override
      public void consume(String topic, String tags, Callback<String> callback) {
          try {
              if(StringUtils.isEmpty(tags))
              {
                  tags = StringUtils.getEmpty();
              }
              DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags);
              consumer.setMessageModel(MessageModel.CLUSTERING);
              consumer.registerMessageListener(new MessageListenerConcurrently() {
                  @Override
                  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                      // TODO Auto-generated method stub
                      for(MessageExt ext : msgs)
                      {
                          try {
                              String body =  new String(ext.getBody());
                              callback.execute(body);
                          } catch (Exception e) {
                          }
                      }
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  }
              });
              consumer.start();
          } catch (MQClientException e) {
              LOG.error("consume error:", e);
          }
      }
      @Override
      public void subscribe(String topic, String tags, Callback<String> callback) {
          try {
              if(StringUtils.isEmpty(tags))
              {
                  tags = StringUtils.getEmpty();
              }
              DefaultMQPushConsumer consumer = getConsumerInstance(topic, tags);
              consumer.setMessageModel(MessageModel.BROADCASTING);
              consumer.registerMessageListener(new MessageListenerConcurrently() {
                  @Override
                  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                      // TODO Auto-generated method stub
                      for(MessageExt ext : msgs)
                      {
                          try {
                              String body =  new String(ext.getBody());
                              callback.execute(body);
                          } catch (Exception e) {
                          }
                      }
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  }
              });
              consumer.start();
          } catch (MQClientException e) {
              LOG.error("subscrebe error:", e);
          }
      }
      private static DefaultMQPushConsumer getConsumerInstance(String topic, String tags) throws MQClientException
      {
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
          consumer.resetClientConfig(mClientConfig);
          consumer.setConsumerGroup(topic + tags);
          consumer.setConsumeThreadMin(DEFAULT_CONSMER_THREAD_SIZE);
          consumer.setConsumeThreadMax(DEFAULT_CONSMER_THREAD_SIZE);
          consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
          // mConsumer.subscribe(queue, "TagA || TagC || TagD");
          consumer.subscribe(topic, tags);
          return consumer;
      }
    }
    
  • 简化调用:

    package com.pangugle.framework.mq;
    import java.io.IOException;
    import java.util.Map;
    import com.google.common.collect.Maps;
    import com.pangugle.framework.mq.impl.RedisMQImpl;
    import com.pangugle.framework.mq.impl.RocketMQImpl;
    import com.pangugle.framework.service.Callback;
    import com.pangugle.framework.utils.ThreadUtils;
    public class MQManager{
      Map<String, MQSupport> maps = Maps.newConcurrentMap();
      private interface ManagerInternal {
          public MQManager mgr = new MQManager();
      }
      public static MQManager getInstance()
      {
          return ManagerInternal.mgr;
      }
      private MQManager()
      {
          //maps.put(MQImpl.REDIS.name(), new RedisMQImpl());
          maps.put(MQImpl.ROCKETMQ.name(), new RocketMQImpl());
      }
      public MQSupport getMQ()
      {
          return maps.get(MQImpl.ROCKETMQ.name());
      }
      public static enum MQImpl{
          REDIS, // redis
          ROCKETMQ; // rocketmq
      }
      public static void main(String[] args) throws InterruptedException, IOException
      {
      // 定义消息队列
          String queue = "pangugle_test";
      //
          String tags = null;
          MQSupport mq = MQManager.getInstance().getMQ();
      // 订阅消息消费
          mq.subscribe(queue, tags, new Callback<String>() {
              public void execute(String o) {
                  System.out.println("consuemr 1 " + o);
              }
          });
      // 集群消息消息
    //        mq.consume(queue, tags, new Callback<String>() {
    //            public void execute(String o) {
    //                System.out.println("consuemr 1 " + o);
    //            }
    //        });
          for(int i = 0; i < 1000; i ++)
          {
              mq.sendMessage(queue, "i = " + i, tags);
              ThreadUtils.sleep(1000);
          }
          System.in.read();
      }
    }
    

    注意上面测试:
    要到控制台创建 Topic 队列名称,也就是 pangugle_test 这个名称!
    好了搞定了
    现在我们使用消息队列就非常简单了:

  • 初始化消息队列 MQSupport mq = MQManager.getInstance().getMQ();

  • 发送消息 sendMessage
  • 消费消息
    • subscribe
    • consume

主流消息队列对比(kafka、Rabbitmq、Rocketmq)

主流的消息队列有以下几种:

  • Kafka ``` Apache开源的消息队列,主要应用于大数据方向上;
  1. Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量;
  2. 常常用于日志收集和传输;
  3. 0.8开始支持复制,不支持事务,适合产生大量数据的互联网服务的数据收集业务; ```
  • RabbitMQ ```
  1. RabbitMQ是一个AMQP实现,传统的messaging queue系统实现,基于Erlang。
  2. AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景 ```
  • RocketMQ

    阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会
    
  • CMQ

    腾讯云分布式高可靠消息队列服务
    

    从上面我们看到,如果应用于大数据上,那么毫无疑问就是使用了kafka了, 如果追求对数据一致性、稳定性和可靠性要求很高的场景那么就选择Rabbitmq, 不过对于个人选择,我会选择rocketmq, 我认为Rocketmq真的太强大,是一个真正 在线上在规模应用的消息队列!
    主流消息队列对比

|
- -
|
- - RabbitMQ
|
- - RocketMQ
|
- - CMQ
|
- - Kafka
| | —- | —- | —- | —- | —- | |
- - 模式
|
- - 发布订阅
|
- - 发布订阅
|
- - 传统 queue/发布订阅
|
- - 发布订阅
| |
- - 同步算法
|
- - GM
|
- - 同步双写
|
- - Raft
|
- - ISR(Replica)
| |
- - 分布式扩展
|
- - 否
|
- - 支持
|
- - 支持
|
- - 支持
| |
- - 堆积能力
|
- - 磁盘容量
|
- - 磁盘容量
|
- - 磁盘(水平扩展)
|
- - 磁盘(水平扩展)
| |
- - 性能
|
- - 中
|
- - 高
|
- - 高
|
- - 很高
| |
- - 可靠性
|
- - 一般
|
- - 一般
|
- - 极高
|
- - 一般
| |
- - 持久化
|
- - 内存 /硬盘
|
- - 磁盘
|
- - 磁盘
|
- - 磁盘
|