MQ001.PNG

一. RocketMQ服务器安装


地址:RocketMQ官网地址

步骤:

  • 下载rocketmq,上传压缩包,解压。
  • bin目录下启动命名服务器,sh mqnamesrv
  • bin目录下启动消息服务器,sh mqbroker -n localhost:9876
  • 关闭服务:关闭namesrv服务:sh bin/mqshutdown namesrv;关闭broker服务:sh bin/mqshutdown broker

注意:修改runserver.sh和runbroker.sh文件中有关内存的配置(调整与当前虚拟机内存匹配即可,推荐256m-128m)**JDK一定要安装成8,不然问题巨多!!!

**

二. 消息发送与接收


2.1 一对一

  1. //创建一个发送消息的对象
  2. DefaultMQProducer producer = new DefaultMQProducer("group01");
  3. //设定发送的命名服务器地址
  4. producer.setNamesrvAddr("ip地址:9876");
  5. //启动发送的服务
  6. producer.start();
  7. //创建要发送消息的对象,指定topic,指定内容
  8. Message message = new Message("topic01", "Hello RocketMQ1".getBytes(StandardCharsets.UTF_8));
  9. SendResult send = producer.send(message);
  10. System.out.println(send);
  11. //关闭连接
  12. producer.shutdown();
  1. //创建一个接收消息的对象
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group01");
  3. //设定接收消息的命名服务器地址
  4. consumer.setNamesrvAddr("ip地址:9876");
  5. //使用消息选择器来过滤消息
  6. consumer.subscribe("topic01", "*");
  7. //开启监听,用于接收消息
  8. consumer.registerMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  11. for (MessageExt messageExt : list){
  12. //打印消息
  13. System.out.println(new String(messageExt.getBody()));
  14. }
  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  16. }
  17. });
  18. //启动接收消息服务
  19. consumer.start();
  20. }

2.2 一对多

  • 负载均衡模式(默认):消息平均发送给各个客户端
  • 广播模式:消息被所有客户端接收到

在消费者端监听代码之前设置:

  1. //设置当前消费者的消费模式(默认是负载均衡)
  2. // consumer.setMessageModel(MessageModel.CLUSTERING);
  3. //广播模式
  4. consumer.setMessageModel(MessageModel.BROADCASTING);

2.3 多对多

多个生产者的消息可以被同一个消费者接收。

三. 消息类别


  • 同步消息:即时性较强,重要的消息,且必须有回执的消息,例如:短信,通知(转账成功)
  • 异步消息:即时性较弱,但需要有回执的消息(不着急立即给与回执,但是必须要有回执)。

    1. //异步消息
    2. producer.send(message, new SendCallback() {
    3. //消息发送成功返回
    4. @Override
    5. public void onSuccess(SendResult sendResult) {
    6. }
    7. //消息发送失败返回
    8. @Override
    9. public void onException(Throwable throwable) {
    10. }
    11. });
  • 单向消息:不需要有回执的消息。

    1. //单向消息
    2. producer.sendOneway(message);

3.1 延时消息

  1. message.setDelayTimeLevel(3);

目前支持的消息延迟时间:
【1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h】
括号里面填的是上面列表的下标(从0开始,比如,括号填3,那么就代表30s)。

3.2 批量消息发送

  1. List<Message> list = new ArrayList<>();
  2. Message message1 = new Message("topic01", "Hello RocketMQ1".getBytes(StandardCharsets.UTF_8));
  3. Message message2 = new Message("topic01", "Hello RocketMQ2".getBytes(StandardCharsets.UTF_8));
  4. Message message3 = new Message("topic01", "Hello RocketMQ3".getBytes(StandardCharsets.UTF_8));
  5. list.add(message1);
  6. list.add(message2);
  7. list.add(message3);
  8. //批量消息
  9. producer.send(list);

3.3 消息过滤


  • Tag过滤消息

    1. Message message = new Message("topic01","tag1", "Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
    1. consumer.subscribe("topic01", "tag1");

    如果想加多个tag,那么可以这样写:

    1. consumer.subscribe("topic01", "tag1 || tag2");
  • sql过滤消息

    1. message.putUserProperty("vip","1");
    2. message.putUserProperty("age","18");
    1. consumer.subscribe("topic01", MessageSelector.bySql("vip=1"));

    注意:此处需修改conf文件夹中的broker.conf,否则不支持sql过滤消息。具体操作:增加enablePropertyFilter=true;启动的时候需要增加:sh mqbroker -n localhost:9876 -c ../conf/broker.conf

四. 消息队列之顺序

  1. //创建一个发送消息的对象
  2. DefaultMQProducer producer = new DefaultMQProducer("group01");
  3. //设定发送的命名服务器地址
  4. producer.setNamesrvAddr("ip地址:9876");
  5. //启动发送的服务
  6. producer.start();
  7. //创建要发送消息的对象,指定topic,指定内容
  8. Message message = new Message("topic01", "Hello RocketMQ1".getBytes(StandardCharsets.UTF_8));
  9. //发送消息是指定对应的消息队列
  10. SendResult send = producer.send(message, new MessageQueueSelector() {
  11. //设置当前消息发送时使用哪一个消息队列,消息队列都存在list当中
  12. @Override
  13. public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
  14. //根据发送的消息不同选择不同的消息队列
  15. int mqIndex = 1;//此处需要计算哪类消息使用那个队列,用数字表示。
  16. return list.get(mqIndex);
  17. }
  18. },null);
  19. //关闭连接
  20. producer.shutdown();
  1. //创建一个接收消息的对象
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group01");
  3. //设定接收消息的命名服务器地址
  4. consumer.setNamesrvAddr("ip地址:9876");
  5. //使用消息选择器来过滤消息
  6. consumer.subscribe("topic01", "*");
  7. //使用单线程的模式从消息队列中取数据,一个线程绑定一个队列
  8. consumer.registerMessageListener(new MessageListenerOrderly() {
  9. //使用MessageListenerOrderly接口之后,对消息队列的处理由一个队列多个线程服务转化成一个队列一个线程服务
  10. @Override
  11. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
  12. for (MessageExt messageExt : list){
  13. System.out.println(messageExt.getBody());
  14. }
  15. return ConsumeOrderlyStatus.SUCCESS;
  16. }
  17. });
  18. //启动接收消息服务
  19. consumer.start();

五. 事务消息

  1. //事务消息使用的生产者是TransactionMQProducer
  2. TransactionMQProducer producer = new TransactionMQProducer("group01");
  3. //设定发送消息的命名服务器地址
  4. producer.setNamesrvAddr("146.56.224.245:9876");
  5. //添加本地事务对应的监听
  6. producer.setTransactionListener(new TransactionListener() {
  7. //正常的事务过程
  8. public LocalTransactionState executeLocalTransaction(Message message, Object o) {
  9. //提交状态
  10. // return LocalTransactionState.COMMIT_MESSAGE;
  11. //回滚状态(如果是回滚状态,那么消费者是接收不到消息的)
  12. // return LocalTransactionState.ROLLBACK_MESSAGE;
  13. //未知状态(这里需要根据事务补偿过程的状态来决定自己的状态)
  14. return LocalTransactionState.UNKNOW;
  15. }
  16. //事务补偿过程
  17. public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
  18. return LocalTransactionState.COMMIT_MESSAGE;
  19. // return LocalTransactionState.ROLLBACK_MESSAGE;
  20. // return LocalTransactionState.UNKNOW;
  21. }
  22. });
  23. //启动发送的服务器
  24. producer.start();
  25. //创建要发送的消息对象,指定topic,指定内容
  26. Message message = new Message("topic1","Hello RocketMQ".getBytes("utf-8"));
  27. //发送消息
  28. SendResult send = producer.send(message);
  29. //打印发送结果
  30. System.out.println(send);
  31. //事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿
  32. // producer.shutdown();