代码地址: git@github.com:pengbiaobeyond/rocketmq.git
1、概念
rocketMQ是阿里巴巴旗下的一款分布式消息中间件,它的设计思想参考了kafka,
但是他的开发语言是java,而kafka的编写言语是c,rabbitmq采用的是erlang语言开发的,对于开发应用的工程师来说更容易做二次开发;
rabbitmq单台支持的并发量是万级的,而rocketmq是十万级别的,kafka是百万级别的,并且rocketmq和kafka的扩容是很容易的,因为都是采用mqServer注册到注册中心,然后client端向注册中心去找mqserver地址,但是kafka采用的是zookeeper注册中心,而rocketmq采用的是nameServer注册中心,只要还有一个naserver服务正常运行就可以正常运行,相比较于zookeeper,采用了去中心化的思想,保证了集群环境的高可用,而zookeeper无法实现;
kafka无法保证消息的安全性,但并发性能较高,多用于大数据采集相关的业务中;而其他互联网对安全性能较高的场景中可以选用rocketmq,并且rocketmq也是经历了阿里双十一的考验,性能和安全性更高;
2、环境搭建:
rocketmq-all-4.6.0-bin-release.zip
rocketmq-externals-master.zip
单机版:
上传最新的RocketMQ安装包 rocketmq-all-4.6.0-bin-release
1. 解压配置文件
unzip rocketmq-all-4.6.0-bin-release.zip
-bash: unzip: 未找到命令
解决办法:yum install -y unzip zip
2. 修改NameServer、Broker服务器内存 默认为4g内存、8G
# runserver.sh
JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
# runbroker.sh
JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
3. 启动NameServer
nohup sh bin/mqnamesrv &
4. 启动mqbroker
nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 &
集群版:
由于机器内存有限,就用一个nameServer,用多个broker Master
将之前那台机器克隆两台;
注意brokerClusterName必须相同,而brokerName不能相同,brokerId必须为0;
nohup sh bin/mqbroker -c ./conf/broker.conf -n ip:9876 & ;
#集群名称,可以区分不同集群,不同的业务可以建多个集群
brokerClusterName=mayikt
# Broker 的名称, Master 和Slave 通过使用相同的Broker 名称来表明相互关系,以说明某个Slave 是哪个Master 的Slave 。
brokerName=broker-a
# 一个Master Barker 可以有多个Slave, 0 表示Master ,大于0 表示不同Slave 的ID 。
brokerId=0
#与fileReservedTim巳参数呼应,表明在几点做消息删除动作,默认值04 表示凌晨4 点。
deleteWhen=04
namesrvAddr=mqnameserver1:9876;mqnameserver2:9876
autoCreateTopicEnable=true
#topic默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭,默认【true】
autoCreateSubscriptionGroup=true
#Broker 监听的端口号,如果一台机器上启动了多个Broker , 则要设置不同的端口号,避免冲突。
listenPort=10911
brokerIp=192.168.1.1
后台管理:
rocketmq-externals-master.zip console模块
将压缩包解压后,将其配置文件中nameServer地址和项目端口号修改后即可启动;
3、代码集成
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!-- springboot-web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<!-- mysql 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- 阿里巴巴数据源 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.14</version>
</dependency>
</dependencies>
rocketmq:
###连接地址nameServer
name-server: 192.168.66.6:9876;
producer:
group: pengbiaoone
spring:
datasource:
url: jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=UTF-8
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
server:
port: 9092
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/sendMsg")
public String sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Long orderId = System.currentTimeMillis();
String insertSql = getSqlMsg("insert", orderId);
String updateSql = getSqlMsg("update", orderId);
String deleteSql = getSqlMsg("delete", orderId);
Message insertMsg = new Message("pengbiaoone", insertSql.getBytes());
Message updateMsg = new Message("pengbiaoone", updateSql.getBytes());
Message deleteMsg = new Message("pengbiaoone", deleteSql.getBytes());
DefaultMQProducer producer = rocketMQTemplate.getProducer();
rocketMQTemplate.getProducer().send(insertMsg
, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object arg) {
System.out.println("size================>>"+mqs.size());
Long orderId = (Long) arg;
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderId);
rocketMQTemplate.getProducer().send(updateMsg
, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object arg) {
Long orderId = (Long) arg;
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderId);
rocketMQTemplate.getProducer().send(deleteMsg
, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object arg) {
Long orderId = (Long) arg;
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderId);
return orderId + "";
}
public String getSqlMsg(String type, Long orderId) {
JSONObject dataObject = new JSONObject();
dataObject.put("type", type);
dataObject.put("orderId", orderId);
return dataObject.toJSONString();
}
}
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "pengbiaoone", consumerGroup = "pengbiaoone", consumeMode
= ConsumeMode.ORDERLY, consumeThreadMax = 1
)
public class OrdeConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println(Thread.currentThread().getName() + "," +
"队列" + message.getQueueId() + "," + new String(message.getBody()));
// message.getMsgId()
}
}
4、事物
1. 生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费;
2. 开始执行我们的本地事务,将本地事务执行的结果(回滚或者提交)发送给Broker
3. Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;
4. Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果。
@RestController
public class ProducerController {
@Autowired
private ProducerService producerService;
@RequestMapping("/sendOrder")
public String sendOrder() {
return producerService.saveOrder();
}
}
@Service
public class ProducerService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
public String saveOrder() {
// 提前生成我们的订单id
String orderId = System.currentTimeMillis() + "";
/**
* 1.提前生成我们的半消息
*
* 2.半消息发送成功之后,在执行我们的本地事务
*
*/
OrderEntity orderEntity = createOrder(orderId);
String msg = JSONObject.toJSONString(orderEntity);
MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(msg);
stringMessageBuilder.setHeader("msg", msg);
Message message = stringMessageBuilder.build();
// 该消息不允许被消费者消费
rocketMQTemplate.sendMessageInTransaction("mayiktProducer",
"orderTopic", message, null);
return orderId;
}
public OrderEntity createOrder(String orderId) {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setName("每特教育第六期平均就业薪资破10万");
orderEntity.setOrderCreatetime(new Date());
// 价格是300元
orderEntity.setOrderMoney(300d);
// 状态为 未支付
orderEntity.setOrderState(0);
Long commodityId = 30L;
// 商品id
orderEntity.setCommodityId(commodityId);
orderEntity.setOrderId(orderId);
return orderEntity;
}
}
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "mayiktProducer")
public class SyncProducerListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private TransationalUtils transationalUtils;
/**
* 执行我们订单的事务
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
Object object = headers.get("msg");
if (object == null) {
return null;
}
String orderMsg = (String) object;
OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
TransactionStatus begin = null;
try {
begin = transationalUtils.begin();
int result = orderMapper.addOrder(orderEntity);
transationalUtils.commit(begin);
if (result <= 0) {
return RocketMQLocalTransactionState.ROLLBACK;
}
// // 告诉我们的Broke可以消费者该消息
// return RocketMQLocalTransactionState.COMMIT;
return null;
} catch (Exception e) {
if (begin != null) {
transationalUtils.rollback(begin);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//add.Order
return null;
}
/**
* 提供给我们的Broker定时检查
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
Object object = headers.get("msg");
if (object == null) {
return RocketMQLocalTransactionState.ROLLBACK;
}
String orderMsg = (String) object;
OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
String orderId = orderEntity.getOrderId();
// 直接查询我们的数据库
OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
if (orderDbEntity == null) {
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.COMMIT;
}
}