代码地址: git@github.com:pengbiaobeyond/rocketmq.git
    1、概念
    rocketMQ是阿里巴巴旗下的一款分布式消息中间件,它的设计思想参考了kafka,
    但是他的开发语言是java,而kafka的编写言语是c,rabbitmq采用的是erlang语言开发的,对于开发应用的工程师来说更容易做二次开发;
    rabbitmq单台支持的并发量是万级的,而rocketmq是十万级别的,kafka是百万级别的,并且rocketmq和kafka的扩容是很容易的,因为都是采用mqServer注册到注册中心,然后client端向注册中心去找mqserver地址,但是kafka采用的是zookeeper注册中心,而rocketmq采用的是nameServer注册中心,只要还有一个naserver服务正常运行就可以正常运行,相比较于zookeeper,采用了去中心化的思想,保证了集群环境的高可用,而zookeeper无法实现;
    kafka无法保证消息的安全性,但并发性能较高,多用于大数据采集相关的业务中;而其他互联网对安全性能较高的场景中可以选用rocketmq,并且rocketmq也是经历了阿里双十一的考验,性能和安全性更高;
    图片1.png

    2、环境搭建:
    rocketmq-all-4.6.0-bin-release.zip
    rocketmq-externals-master.zip

    单机版:

    1. 上传最新的RocketMQ安装包 rocketmq-all-4.6.0-bin-release
    2. 1. 解压配置文件
    3. unzip rocketmq-all-4.6.0-bin-release.zip
    4. -bash: unzip: 未找到命令
    5. 解决办法:yum install -y unzip zip
    6. 2. 修改NameServerBroker服务器内存 默认为4g内存、8G
    7. # runserver.sh
    8. JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
    9. # runbroker.sh
    10. JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
    11. 3. 启动NameServer
    12. nohup sh bin/mqnamesrv &
    13. 4. 启动mqbroker
    14. nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 &

    图片2.png

    集群版:
    由于机器内存有限,就用一个nameServer,用多个broker Master
    将之前那台机器克隆两台;
    注意brokerClusterName必须相同,而brokerName不能相同,brokerId必须为0;
    nohup sh bin/mqbroker -c ./conf/broker.conf -n ip:9876 & ;

    #集群名称,可以区分不同集群,不同的业务可以建多个集群
    brokerClusterName=mayikt
    
    # Broker 的名称, Master 和Slave 通过使用相同的Broker 名称来表明相互关系,以说明某个Slave 是哪个Master 的Slave 。
    brokerName=broker-a
    
    #  一个Master Barker 可以有多个Slave, 0 表示Master ,大于0 表示不同Slave 的ID 。
    brokerId=0 
    #与fileReservedTim巳参数呼应,表明在几点做消息删除动作,默认值04 表示凌晨4 点。
    deleteWhen=04
    namesrvAddr=mqnameserver1:9876;mqnameserver2:9876
    autoCreateTopicEnable=true
    #topic默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许Broker自动创建订阅组,建议线下开启,线上关闭,默认【true】
    autoCreateSubscriptionGroup=true
    #Broker 监听的端口号,如果一台机器上启动了多个Broker , 则要设置不同的端口号,避免冲突。
    listenPort=10911
    brokerIp=192.168.1.1
    

    后台管理:
    rocketmq-externals-master.zip console模块
    将压缩包解压后,将其配置文件中nameServer地址和项目端口号修改后即可启动;

    3、代码集成

    <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.0.0.RELEASE</version>
    </parent>
    
    <dependencies>
      <!-- springboot-web组件 -->
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
      </dependency>
      <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
      </dependency>
      <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>1.1.1</version>
      </dependency>
      <!-- mysql 依赖 -->
      <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
      </dependency>
      <!-- 阿里巴巴数据源 -->
      <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.0.14</version>
      </dependency>
    
    </dependencies>
    
    rocketmq:
      ###连接地址nameServer
      name-server: 192.168.66.6:9876;
      producer:
        group: pengbiaoone
    
    spring:
      datasource:
        url: jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=UTF-8
        username: root
        password: root
        driver-class-name: com.mysql.jdbc.Driver
    server:
      port: 9092
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.List;
    
    
    @RestController
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @RequestMapping("/sendMsg")
        public String sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            Long orderId = System.currentTimeMillis();
            String insertSql = getSqlMsg("insert", orderId);
            String updateSql = getSqlMsg("update", orderId);
            String deleteSql = getSqlMsg("delete", orderId);
            Message insertMsg = new Message("pengbiaoone", insertSql.getBytes());
            Message updateMsg = new Message("pengbiaoone", updateSql.getBytes());
            Message deleteMsg = new Message("pengbiaoone", deleteSql.getBytes());
            DefaultMQProducer producer = rocketMQTemplate.getProducer();
    
            rocketMQTemplate.getProducer().send(insertMsg
                    , new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                                   Object arg) {
                            System.out.println("size================>>"+mqs.size());
                            Long orderId = (Long) arg;
                            long index = orderId % mqs.size();
                            return mqs.get((int) index);
                        }
                    }, orderId);
            rocketMQTemplate.getProducer().send(updateMsg
                    , new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                                   Object arg) {
                            Long orderId = (Long) arg;
                            long index = orderId % mqs.size();
                            return mqs.get((int) index);
    
                        }
                    }, orderId);
            rocketMQTemplate.getProducer().send(deleteMsg
                    , new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                                   Object arg) {
                            Long orderId = (Long) arg;
                            long index = orderId % mqs.size();
                            return mqs.get((int) index);
                        }
                    }, orderId);
    
            return orderId + "";
        }
    
        public String getSqlMsg(String type, Long orderId) {
            JSONObject dataObject = new JSONObject();
            dataObject.put("type", type);
            dataObject.put("orderId", orderId);
            return dataObject.toJSONString();
        }
    
    
    }
    
    
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @RocketMQMessageListener(topic = "pengbiaoone", consumerGroup = "pengbiaoone", consumeMode
            = ConsumeMode.ORDERLY, consumeThreadMax = 1
    )
    public class OrdeConsumer implements RocketMQListener<MessageExt> {
    
        @Override
        public void onMessage(MessageExt message) {
            System.out.println(Thread.currentThread().getName() + "," +
                    "队列" + message.getQueueId() + "," + new String(message.getBody()));
    //        message.getMsgId()
        }
    }
    

    4、事物
    图片3.png

    1.    生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费;
    2.    开始执行我们的本地事务,将本地事务执行的结果(回滚或者提交)发送给Broker
    3.    Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;
    4.    Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果。
    
    @RestController
    public class ProducerController {
        @Autowired
        private ProducerService producerService;
    
        @RequestMapping("/sendOrder")
        public String sendOrder() {
            return producerService.saveOrder();
        }
    }
    
    @Service
    public class ProducerService {
    
        @Autowired
        private OrderMapper orderMapper;
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public String saveOrder() {
            // 提前生成我们的订单id
            String orderId = System.currentTimeMillis() + "";
    
            /**
             * 1.提前生成我们的半消息
             *
             * 2.半消息发送成功之后,在执行我们的本地事务
             *
             */
            OrderEntity orderEntity = createOrder(orderId);
            String msg = JSONObject.toJSONString(orderEntity);
            MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(msg);
            stringMessageBuilder.setHeader("msg", msg);
            Message message = stringMessageBuilder.build();
            // 该消息不允许被消费者消费
            rocketMQTemplate.sendMessageInTransaction("mayiktProducer",
                    "orderTopic", message, null);
            return orderId;
    
        }
    
        public OrderEntity createOrder(String orderId) {
            OrderEntity orderEntity = new OrderEntity();
            orderEntity.setName("每特教育第六期平均就业薪资破10万");
            orderEntity.setOrderCreatetime(new Date());
            // 价格是300元
            orderEntity.setOrderMoney(300d);
            // 状态为 未支付
            orderEntity.setOrderState(0);
            Long commodityId = 30L;
            // 商品id
            orderEntity.setCommodityId(commodityId);
            orderEntity.setOrderId(orderId);
            return orderEntity;
        }
    }
    
    @Slf4j
    @Component
    @RocketMQTransactionListener(txProducerGroup = "mayiktProducer")
    public class SyncProducerListener implements RocketMQLocalTransactionListener {
        @Autowired
        private OrderMapper orderMapper;
        @Autowired
        private TransationalUtils transationalUtils;
    
        /**
         * 执行我们订单的事务
         *
         * @param msg
         * @param arg
         * @return
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    
            MessageHeaders headers = msg.getHeaders();
            Object object = headers.get("msg");
            if (object == null) {
                return null;
            }
            String orderMsg = (String) object;
            OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
            TransactionStatus begin = null;
            try {
                begin = transationalUtils.begin();
                int result = orderMapper.addOrder(orderEntity);
                transationalUtils.commit(begin);
                if (result <= 0) {
                    return RocketMQLocalTransactionState.ROLLBACK;
                }
    //            // 告诉我们的Broke可以消费者该消息
    //            return RocketMQLocalTransactionState.COMMIT;
                return null;
            } catch (Exception e) {
                if (begin != null) {
                    transationalUtils.rollback(begin);
                    return RocketMQLocalTransactionState.ROLLBACK;
                }
            }
            //add.Order
            return null;
        }
    
        /**
         * 提供给我们的Broker定时检查
         *
         * @param msg
         * @return
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            MessageHeaders headers = msg.getHeaders();
            Object object = headers.get("msg");
            if (object == null) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            String orderMsg = (String) object;
            OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
            String orderId = orderEntity.getOrderId();
            // 直接查询我们的数据库
            OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
            if (orderDbEntity == null) {
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            return RocketMQLocalTransactionState.COMMIT;
        }
    }
    

    RocketMQ解决分布式事务.docx