
一. RocketMQ服务器安装
地址:RocketMQ官网地址
步骤:
- 下载rocketmq,上传压缩包,解压。
- bin目录下启动命名服务器,
sh mqnamesrv - bin目录下启动消息服务器,
sh mqbroker -n localhost:9876 - 关闭服务:关闭namesrv服务:
sh bin/mqshutdown namesrv;关闭broker服务:sh bin/mqshutdown broker
注意:修改runserver.sh和runbroker.sh文件中有关内存的配置(调整与当前虚拟机内存匹配即可,推荐256m-128m)**JDK一定要安装成8,不然问题巨多!!!
**
二. 消息发送与接收
2.1 一对一
//创建一个发送消息的对象DefaultMQProducer producer = new DefaultMQProducer("group01");//设定发送的命名服务器地址producer.setNamesrvAddr("ip地址:9876");//启动发送的服务producer.start();//创建要发送消息的对象,指定topic,指定内容Message message = new Message("topic01", "Hello RocketMQ1".getBytes(StandardCharsets.UTF_8));SendResult send = producer.send(message);System.out.println(send);//关闭连接producer.shutdown();
//创建一个接收消息的对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group01");//设定接收消息的命名服务器地址consumer.setNamesrvAddr("ip地址:9876");//使用消息选择器来过滤消息consumer.subscribe("topic01", "*");//开启监听,用于接收消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list){//打印消息System.out.println(new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动接收消息服务consumer.start();}
2.2 一对多
- 负载均衡模式(默认):消息平均发送给各个客户端
- 广播模式:消息被所有客户端接收到
在消费者端监听代码之前设置:
//设置当前消费者的消费模式(默认是负载均衡)// consumer.setMessageModel(MessageModel.CLUSTERING);//广播模式consumer.setMessageModel(MessageModel.BROADCASTING);
2.3 多对多
多个生产者的消息可以被同一个消费者接收。
三. 消息类别
- 同步消息:即时性较强,重要的消息,且必须有回执的消息,例如:短信,通知(转账成功)
异步消息:即时性较弱,但需要有回执的消息(不着急立即给与回执,但是必须要有回执)。
//异步消息producer.send(message, new SendCallback() {//消息发送成功返回@Overridepublic void onSuccess(SendResult sendResult) {}//消息发送失败返回@Overridepublic void onException(Throwable throwable) {}});
单向消息:不需要有回执的消息。
//单向消息producer.sendOneway(message);
3.1 延时消息
message.setDelayTimeLevel(3);
目前支持的消息延迟时间:
【1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h】
括号里面填的是上面列表的下标(从0开始,比如,括号填3,那么就代表30s)。
3.2 批量消息发送
List<Message> list = new ArrayList<>();Message message1 = new Message("topic01", "Hello RocketMQ1".getBytes(StandardCharsets.UTF_8));Message message2 = new Message("topic01", "Hello RocketMQ2".getBytes(StandardCharsets.UTF_8));Message message3 = new Message("topic01", "Hello RocketMQ3".getBytes(StandardCharsets.UTF_8));list.add(message1);list.add(message2);list.add(message3);//批量消息producer.send(list);
3.3 消息过滤
Tag过滤消息
Message message = new Message("topic01","tag1", "Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
consumer.subscribe("topic01", "tag1");
如果想加多个tag,那么可以这样写:
consumer.subscribe("topic01", "tag1 || tag2");
sql过滤消息
message.putUserProperty("vip","1");message.putUserProperty("age","18");
consumer.subscribe("topic01", MessageSelector.bySql("vip=1"));
注意:此处需修改conf文件夹中的broker.conf,否则不支持sql过滤消息。具体操作:增加
enablePropertyFilter=true;启动的时候需要增加:sh mqbroker -n localhost:9876 -c ../conf/broker.conf
四. 消息队列之顺序
//创建一个发送消息的对象DefaultMQProducer producer = new DefaultMQProducer("group01");//设定发送的命名服务器地址producer.setNamesrvAddr("ip地址:9876");//启动发送的服务producer.start();//创建要发送消息的对象,指定topic,指定内容Message message = new Message("topic01", "Hello RocketMQ1".getBytes(StandardCharsets.UTF_8));//发送消息是指定对应的消息队列SendResult send = producer.send(message, new MessageQueueSelector() {//设置当前消息发送时使用哪一个消息队列,消息队列都存在list当中@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {//根据发送的消息不同选择不同的消息队列int mqIndex = 1;//此处需要计算哪类消息使用那个队列,用数字表示。return list.get(mqIndex);}},null);//关闭连接producer.shutdown();
//创建一个接收消息的对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group01");//设定接收消息的命名服务器地址consumer.setNamesrvAddr("ip地址:9876");//使用消息选择器来过滤消息consumer.subscribe("topic01", "*");//使用单线程的模式从消息队列中取数据,一个线程绑定一个队列consumer.registerMessageListener(new MessageListenerOrderly() {//使用MessageListenerOrderly接口之后,对消息队列的处理由一个队列多个线程服务转化成一个队列一个线程服务@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt messageExt : list){System.out.println(messageExt.getBody());}return ConsumeOrderlyStatus.SUCCESS;}});//启动接收消息服务consumer.start();
五. 事务消息
//事务消息使用的生产者是TransactionMQProducerTransactionMQProducer producer = new TransactionMQProducer("group01");//设定发送消息的命名服务器地址producer.setNamesrvAddr("146.56.224.245:9876");//添加本地事务对应的监听producer.setTransactionListener(new TransactionListener() {//正常的事务过程public LocalTransactionState executeLocalTransaction(Message message, Object o) {//提交状态// return LocalTransactionState.COMMIT_MESSAGE;//回滚状态(如果是回滚状态,那么消费者是接收不到消息的)// return LocalTransactionState.ROLLBACK_MESSAGE;//未知状态(这里需要根据事务补偿过程的状态来决定自己的状态)return LocalTransactionState.UNKNOW;}//事务补偿过程public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {return LocalTransactionState.COMMIT_MESSAGE;// return LocalTransactionState.ROLLBACK_MESSAGE;// return LocalTransactionState.UNKNOW;}});//启动发送的服务器producer.start();//创建要发送的消息对象,指定topic,指定内容Message message = new Message("topic1","Hello RocketMQ".getBytes("utf-8"));//发送消息SendResult send = producer.send(message);//打印发送结果System.out.println(send);//事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿// producer.shutdown();
