1. docker-compose部署
compose文件
version: '3.5'services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rocketmq-nameserverports:- 9876:9876volumes:- ./volume/logs:/opt/logs- ./volume/store:/opt/storerestart: alwaysnetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rocketmq-brokerports:- 10909:10909- 10911:10911volumes:- ./volume/logs:/opt/logs- ./volume/store:/opt/store#- ./volume/brokerconf/broker.conf:/etc/rocketmq/broker.conf- ./volume/brokerconf/:/etc/rocketmq/environment:TZ: Asia/ShanghaiNAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn128m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvrestart: alwaysnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rocketmq-consoleports:- 8081:8080environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvrestart: alwaysnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge
涉及到三个服务,分别是nameserver(9876端口)、broker(10911通信端口)、rocket-console(8081),因为我是单机部署在一台的服务器上,使用桥接方式并给在同一个network中,是三个容器可以进行通信。
数据挂载如下:
conf文件配置
# 所属集群名字brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,# 在 broker-b.properties 使用: broker-bbrokerName=broker-a# 0 表示 Master,> 0 表示 SlavebrokerId=0# nameServer地址,分号分割# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876# 启动IP,如果 docker 报 Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.25.0.3:10911> failed# 解决方式1 加上一句 producer.setVipChannelEnabled(false);# 解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP,默认使用的是内部ip (你自己宿主机的IP,请参考自己)brokerIP1=192.168.1.148# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,falseautoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口listenPort=10911# 删除文件时间点,默认凌晨4点deleteWhen=04# 文件保留时间,默认48小时fileReservedTime=120# commitLog 每个文件的大小默认1GmapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000# redeleteHangedFileInterval=120000# 检测物理文件磁盘空间diskMaxUsedSpaceRatio=88# 存储路径#storePathRootDir=/opt/store# commitLog 存储路径#storePathCommitLog=/opt/logs/commitlog# 消费队列存储#storePathConsumeQueue=/opt/store/consumequeue# 消息索引存储路径#storePathIndex=/opt/store/index# checkpoint 文件存储路径#storeCheckpoint=/opt/store/checkpoint# abort 文件存储路径#abortFile=/opt/store/abort# 限制的消息大小maxMessageSize=65536# flushCommitLogLeastPages=4# flushConsumeQueueLeastPages=2# flushCommitLogThoroughInterval=10000# flushConsumeQueueThoroughInterval=60000# Broker 的角色# - ASYNC_MASTER 异步复制Master# - SYNC_MASTER 同步双写Master# - SLAVEbrokerRole=SYNC_MASTER# 刷盘方式# - ASYNC_FLUSH 异步刷盘# - SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH# 发消息线程池数量sendMessageThreadPoolNums=128# 拉消息线程池数量pullMessageThreadPoolNums=128
如果出现了宿主机挂在文件挂在不进去,报错:权限问题:执行 chown -R 3000:3000 ./volume
2. k8s部署
部署所遇问题点
我这里没有用上面得那个镜像,使用得是官方镜像。但是部署得时候会出点小差错,下面是我部署出现得问题点。
1. 部署broker服务一直重启
挂载的文件没有出现在宿主机挂在的位置。我猜想原因是部署用的是rocketmq用户创建,所以没有权限挂在在宿主机指定位置。甚至当时我部署broker一直重启,也有可能是这个原因。所以请在宿主机挂在目录(我的是/root/project-server/backend/rocketmq/broker)执行chmod -R 777 ./
2. 部署console报错
部署console的时候一直报错:意思是我的console无法连接到我对应的nameserver。
java.lang.RuntimeException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed
经过排查发现并不是我nameserver没有连接上,而是我的nameserver连接上了但是它没有连接上对应的broker。我是通过beoker挂在的日志发现的。
安装部署参考:大佬的github (很多后台服务得安装,比如es、mq等等) 大佬的dockerhub (后台部署所需得镜像)
NameServer
StatefulSet
apiVersion: apps/v1kind: StatefulSetmetadata:name: rocketmq-nameserverlabels:app: rocketmq-nameservernamespace: prodspec:serviceName: rocketmq-nameserverreplicas: 1selector:matchLabels:app: rocketmq-nameservertemplate:metadata:labels:app: rocketmq-nameserverspec:containers:- name: rocketmq-nameserverimage: rocketmqinc/rocketmq:4.4.0imagePullPolicy: IfNotPresentresources:limits:cpu: 250mmemory: 1000Mirequests:cpu: 100mmemory: 768Mienv:- name: TZvalue: Asia/Shanghai- name: JAVA_OPTvalue: -Duser.home=/opt- name: JAVA_OPT_EXTvalue: -server -Xms1024m -Xmx1024m -Xmn512mcommand: ["sh","mqnamesrv"]ports:- containerPort: 9876volumeMounts:- name: logsmountPath: /opt/logsvolumes:- name: logshostPath:path: /root/project-server/backend/rocketmq/nameserver/logstype: Directory
Service
apiVersion: v1kind: Servicemetadata:namespace: prodname: rocketmq-nameserverlabels:app: rocketmq-nameserverspec:type: NodePortports:- port: 9876targetPort: 9876nodePort: 30003selector:app: rocketmq-nameserver
broker
StatefulSet
apiVersion: apps/v1kind: StatefulSetmetadata:name: rocketmq-brokernamespace: prodlabels:app: rocketmq-brokerspec:serviceName: rocketmq-brokerreplicas: 1selector:matchLabels:app: rocketmq-brokertemplate:metadata:labels:app: rocketmq-brokerspec:containers:- name: rocketmq-brokerimage: rocketmqinc/rocketmq:4.4.0imagePullPolicy: IfNotPresentresources:limits:cpu: 440mmemory: 2000Mirequests:cpu: 400mmemory: 2000Mienv:- name: TZvalue: Asia/Shanghai- name: JAVA_OPTvalue: -server -XX:ParallelGCThreads=1command: ["sh","mqbroker","-c","../conf/broker.conf"]ports:- containerPort: 30909- containerPort: 30911- containerPort: 30912volumeMounts:- name: logsmountPath: /home/rocketmq/logs- name: storemountPath: /home/rocketmq/store- name: broker-confmountPath: /opt/rocketmq-4.4.0/conf/broker.confvolumes:- name: logshostPath:path: /root/project-server/backend/rocketmq/broker/logstype: Directory- name: storehostPath:path: /root/project-server/backend/rocketmq/broker/storetype: Directory- name: broker-confhostPath:path: /root/project-server/backend/rocketmq/broker/conf/broker.conftype: File
Service
apiVersion: v1kind: Servicemetadata:namespace: prodname: rocketmq-brokerlabels:app: rocketmq-brokerspec:type: NodePortports:- port: 30909targetPort: 30909name: listen-1nodePort: 30909- port: 30911targetPort: 30911name: listen-2nodePort: 30911- port: 30912targetPort: 30912name: ha-servicenodePort: 30912selector:app: rocketmq-broker
# 所属集群名字brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,# 在 broker-b.properties 使用: broker-bbrokerName=broker-a# 0 表示 Master,> 0 表示 SlavebrokerId=0# nameServer地址,分号分割# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#namesrvAddr=192.168.0.112:9876;192.168.0.112:9877namesrvAddr=10.10.10.111:30003# 启动IP,如果 docker 报 Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.25.0.3:10911> failed# 解决方式1 加上一句 producer.setVipChannelEnabled(false);# 解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP,默认使用的是内部ip (你自己宿主机的IP,请参考自己)brokerIP1=10.10.10.111# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,falseautoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口 listenPort=10911listenPort=30911#haService中使用 默认值为:listenPort + 1#haListenPort=30912#主要用于slave同步master listenPort - 2#fastListenPort=30909# 删除文件时间点,默认凌晨4点deleteWhen=04# 文件保留时间,默认48小时fileReservedTime=120# commitLog 每个文件的大小默认1GmapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000# redeleteHangedFileInterval=120000# 检测物理文件磁盘空间,生产上建议85左右diskMaxUsedSpaceRatio=85# 存储路径#storePathRootDir=/opt/store# commitLog 存储路径#storePathCommitLog=/opt/logs/commitlog# 消费队列存储#storePathConsumeQueue=/opt/store/consumequeue# 消息索引存储路径#storePathIndex=/opt/store/index# checkpoint 文件存储路径#storeCheckpoint=/opt/store/checkpoint# abort 文件存储路径#abortFile=/opt/store/abort# 限制的消息大小maxMessageSize=65536# flushCommitLogLeastPages=4# flushConsumeQueueLeastPages=2# flushCommitLogThoroughInterval=10000# flushConsumeQueueThoroughInterval=60000# Broker 的角色# - ASYNC_MASTER 异步复制Master# - SYNC_MASTER 同步双写Master# - SLAVEbrokerRole=SYNC_MASTER# 刷盘方式# - ASYNC_FLUSH 异步刷盘# - SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH# 发消息线程池数量sendMessageThreadPoolNums=128# 拉消息线程池数量pullMessageThreadPoolNums=128
console
StatefulSet
apiVersion: apps/v1kind: StatefulSetmetadata:name: rocketmq-consolenamespace: prodlabels:app: rocketmq-consolespec:serviceName: rocketmq-consolereplicas: 1selector:matchLabels:app: rocketmq-consoletemplate:metadata:labels:app: rocketmq-consolespec:containers:- name: rocketmq-consoleimage: styletang/rocketmq-console-ngimagePullPolicy: IfNotPresentresources:limits:cpu: 450mmemory: 1768Mirequests:cpu: 100mmemory: 768Mienv:- name: TZvalue: Asia/Shanghai- name: JAVA_OPTS#value: -Drocketmq.namesrv.addr=rocketmq-nameserver:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Duser.home=/root -Xms768m -Xmx768mvalue: -Drocketmq.namesrv.addr=10.10.10.145:30003 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Duser.home=/root -Xms768m -Xmx768mports:- containerPort: 8080volumeMounts:- name: logsmountPath: /root/logsvolumes:- name: logshostPath:path: /root/project-server/backend/rocketmq/console/logstype: Directory
Service
apiVersion: v1kind: Servicemetadata:namespace: prodname: rocketmq-consolelabels:app: rocketmq-consolespec:type: NodePortports:- port: 8080targetPort: 8080nodePort: 30004selector:app: rocketmq-console
3. 整合springCloud stream
我这里示意starter组件的方式去做的,所以先编写starter组件提供给各个module服务使用
1. 引入依赖jar包
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency>
2. 编写生产者消费者、生产者消息生产通道
// 消费者通道public interface ConsumeChannel {@Input(MessageConstant.TEST_MESSAGE_INPUT)SubscribableChannel testInput();}// 生产者通道public interface ProduceChannel {@Output(MessageConstant.TEST_MESSAGE_OUTPUT)MessageChannel testOutput();}public class MessageConstant {/*** 测试demo生产者*/public static final String TEST_MESSAGE_OUTPUT = "test-output";/*** 测试demo消费者*/public static final String TEST_MESSAGE_INPUT = "test-input";}
3. 配置mq配置文件(比如nameserver的地址、bindings、binder)
spring:cloud:stream:rocketmq:binder:name-server: 192.168.1.148:9876# 对应 RocketMQBindingProperties 如果当前服务只是充当生产者/消费者 只需配置bindings中的一项即可bindings:test-out:group: test-topic-produce-group #生产者组 命名规则sync: true # 是否同步发送消息,默认为 false 异步test-input:consumer:orderly: false # 是否使用顺序消费,默认false,这里直接并发消费broadcasting: false # 是否使用广播消费,默认为 falsebindings:# 自定义名称 则是我们的topic TODO 生产配置建议不要自动创建topic,broker配置文件默认配置了生产消费线程、默认队列数、test-output: { destination: test-topic,content-type: application/json }test-input: { destination: test-topic,content-type: text/plain,group: test-topic-consume-group,consumer.maxAttempts: 16 }
4. 我这里值启动了一个微服务(引入我们编写的starter组件),既做生生产者又做消费者
@SpringBootApplication@EnableBinding({ProduceChannel.class, ConsumeChannel.class})public class Order {public static void main(String[] args) throws UnknownHostException {SpringApplication.run(Order.class, args);}}
5. 编写生产者发消息
@ApiOperation(value = "测试mq发送消息")@PostMapping("/sendMqDemo")public String sendMqDemo(@RequestBody SendMqDemoReq req) {return produceDemoService.sendMqDemo(req);}
生产者业务类
@Servicepublic class ProduceDemoService {private ProduceChannel produce;private final RocketMQTemplate rocketMQTemplate;public ProduceDemoService(ProduceChannel produce, RocketMQTemplate rocketMQTemplate) {this.produce = produce;this.rocketMQTemplate = rocketMQTemplate;}public String sendMqDemo(SendMqDemoReq req) {log.info("nameserver地址:" + rocketMQTemplate.getProducer().getNamesrvAddr());Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_TAGS, "demo");//过滤headers.put(MessageConst.PROPERTY_KEYS, "demo" + UUIDUtil.getUUID());//每条消息的唯一业务idheaders.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL,"3");//设置延迟级别为 3,10 秒后消费。Message message = MessageBuilder.createMessage(req, new MessageHeaders(headers));//Message<String> message = MessageBuilder.withPayload(messageInfo).setHeader(MessageConst.PROPERTY_TAGS, "xxx").build();boolean send = produce.testOutput().send(message);if (send) {return "send ok";}return "send fail";}}
6. 编写生产者发消息
package com.jimushow.simone.mq.consume;
import com.jimushow.simone.starter.constant.MessageConstant;
import com.jimushow.simone.vo.req.testDemo.SendMqDemoReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
/**
* @author heian
* @date 2021/5/13 10:06
* @description 消费demo 演示
*/
@Service
@Slf4j
public class ConsumeDemoService {
/**
* 监控demo案例input
* @param message 消费文本内容
*/
@StreamListener(MessageConstant.TEST_MESSAGE_INPUT)
public void handler(@Payload SendMqDemoReq message) {
log.info("接收到demo的消息为:{}", message);
}
}
注意点
- 生产tag与消费tag不一致,造成消息丢失
我们发带tag类型的消息时,如果你代码中指定了tag接收,但是生产者发的tag不一致也会导致我们微服务去消费这条消息,消息状态也是consumed。比如这里发送的消息时closeLive,但是服务端只接受leaveRoom

