1. docker-compose部署
compose文件
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rocketmq-nameserver
ports:
- 9876:9876
volumes:
- ./volume/logs:/opt/logs
- ./volume/store:/opt/store
restart: always
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rocketmq-broker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./volume/logs:/opt/logs
- ./volume/store:/opt/store
#- ./volume/brokerconf/broker.conf:/etc/rocketmq/broker.conf
- ./volume/brokerconf/:/etc/rocketmq/
environment:
TZ: Asia/Shanghai
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
restart: always
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console
ports:
- 8081:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
restart: always
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
涉及到三个服务,分别是nameserver(9876端口)、broker(10911通信端口)、rocket-console(8081),因为我是单机部署在一台的服务器上,使用桥接方式并给在同一个network中,是三个容器可以进行通信。
数据挂载如下:
conf文件配置
# 所属集群名字
brokerClusterName=DefaultCluster
# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId=0
# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 启动IP,如果 docker 报 Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.25.0.3:10911> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);
# 解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP,默认使用的是内部ip (你自己宿主机的IP,请参考自己)
brokerIP1=192.168.1.148
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
#storePathRootDir=/opt/store
# commitLog 存储路径
#storePathCommitLog=/opt/logs/commitlog
# 消费队列存储
#storePathConsumeQueue=/opt/store/consumequeue
# 消息索引存储路径
#storePathIndex=/opt/store/index
# checkpoint 文件存储路径
#storeCheckpoint=/opt/store/checkpoint
# abort 文件存储路径
#abortFile=/opt/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 发消息线程池数量
sendMessageThreadPoolNums=128
# 拉消息线程池数量
pullMessageThreadPoolNums=128
如果出现了宿主机挂在文件挂在不进去,报错:权限问题:执行 chown -R 3000:3000 ./volume
2. k8s部署
部署所遇问题点
我这里没有用上面得那个镜像,使用得是官方镜像。但是部署得时候会出点小差错,下面是我部署出现得问题点。
1. 部署broker服务一直重启
挂载的文件没有出现在宿主机挂在的位置。我猜想原因是部署用的是rocketmq用户创建,所以没有权限挂在在宿主机指定位置。甚至当时我部署broker一直重启,也有可能是这个原因。所以请在宿主机挂在目录(我的是/root/project-server/backend/rocketmq/broker)执行chmod -R 777 ./
2. 部署console报错
部署console的时候一直报错:意思是我的console无法连接到我对应的nameserver。
java.lang.RuntimeException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed
经过排查发现并不是我nameserver没有连接上,而是我的nameserver连接上了但是它没有连接上对应的broker。我是通过beoker挂在的日志发现的。
安装部署参考:大佬的github (很多后台服务得安装,比如es、mq等等) 大佬的dockerhub (后台部署所需得镜像)
NameServer
StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rocketmq-nameserver
labels:
app: rocketmq-nameserver
namespace: prod
spec:
serviceName: rocketmq-nameserver
replicas: 1
selector:
matchLabels:
app: rocketmq-nameserver
template:
metadata:
labels:
app: rocketmq-nameserver
spec:
containers:
- name: rocketmq-nameserver
image: rocketmqinc/rocketmq:4.4.0
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 250m
memory: 1000Mi
requests:
cpu: 100m
memory: 768Mi
env:
- name: TZ
value: Asia/Shanghai
- name: JAVA_OPT
value: -Duser.home=/opt
- name: JAVA_OPT_EXT
value: -server -Xms1024m -Xmx1024m -Xmn512m
command: ["sh","mqnamesrv"]
ports:
- containerPort: 9876
volumeMounts:
- name: logs
mountPath: /opt/logs
volumes:
- name: logs
hostPath:
path: /root/project-server/backend/rocketmq/nameserver/logs
type: Directory
Service
apiVersion: v1
kind: Service
metadata:
namespace: prod
name: rocketmq-nameserver
labels:
app: rocketmq-nameserver
spec:
type: NodePort
ports:
- port: 9876
targetPort: 9876
nodePort: 30003
selector:
app: rocketmq-nameserver
broker
StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rocketmq-broker
namespace: prod
labels:
app: rocketmq-broker
spec:
serviceName: rocketmq-broker
replicas: 1
selector:
matchLabels:
app: rocketmq-broker
template:
metadata:
labels:
app: rocketmq-broker
spec:
containers:
- name: rocketmq-broker
image: rocketmqinc/rocketmq:4.4.0
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 440m
memory: 2000Mi
requests:
cpu: 400m
memory: 2000Mi
env:
- name: TZ
value: Asia/Shanghai
- name: JAVA_OPT
value: -server -XX:ParallelGCThreads=1
command: ["sh","mqbroker","-c","../conf/broker.conf"]
ports:
- containerPort: 30909
- containerPort: 30911
- containerPort: 30912
volumeMounts:
- name: logs
mountPath: /home/rocketmq/logs
- name: store
mountPath: /home/rocketmq/store
- name: broker-conf
mountPath: /opt/rocketmq-4.4.0/conf/broker.conf
volumes:
- name: logs
hostPath:
path: /root/project-server/backend/rocketmq/broker/logs
type: Directory
- name: store
hostPath:
path: /root/project-server/backend/rocketmq/broker/store
type: Directory
- name: broker-conf
hostPath:
path: /root/project-server/backend/rocketmq/broker/conf/broker.conf
type: File
Service
apiVersion: v1
kind: Service
metadata:
namespace: prod
name: rocketmq-broker
labels:
app: rocketmq-broker
spec:
type: NodePort
ports:
- port: 30909
targetPort: 30909
name: listen-1
nodePort: 30909
- port: 30911
targetPort: 30911
name: listen-2
nodePort: 30911
- port: 30912
targetPort: 30912
name: ha-service
nodePort: 30912
selector:
app: rocketmq-broker
# 所属集群名字
brokerClusterName=DefaultCluster
# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId=0
# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
namesrvAddr=10.10.10.111:30003
# 启动IP,如果 docker 报 Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.25.0.3:10911> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);
# 解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP,默认使用的是内部ip (你自己宿主机的IP,请参考自己)
brokerIP1=10.10.10.111
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口 listenPort=10911
listenPort=30911
#haService中使用 默认值为:listenPort + 1
#haListenPort=30912
#主要用于slave同步master listenPort - 2
#fastListenPort=30909
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间,生产上建议85左右
diskMaxUsedSpaceRatio=85
# 存储路径
#storePathRootDir=/opt/store
# commitLog 存储路径
#storePathCommitLog=/opt/logs/commitlog
# 消费队列存储
#storePathConsumeQueue=/opt/store/consumequeue
# 消息索引存储路径
#storePathIndex=/opt/store/index
# checkpoint 文件存储路径
#storeCheckpoint=/opt/store/checkpoint
# abort 文件存储路径
#abortFile=/opt/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 发消息线程池数量
sendMessageThreadPoolNums=128
# 拉消息线程池数量
pullMessageThreadPoolNums=128
console
StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rocketmq-console
namespace: prod
labels:
app: rocketmq-console
spec:
serviceName: rocketmq-console
replicas: 1
selector:
matchLabels:
app: rocketmq-console
template:
metadata:
labels:
app: rocketmq-console
spec:
containers:
- name: rocketmq-console
image: styletang/rocketmq-console-ng
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 450m
memory: 1768Mi
requests:
cpu: 100m
memory: 768Mi
env:
- name: TZ
value: Asia/Shanghai
- name: JAVA_OPTS
#value: -Drocketmq.namesrv.addr=rocketmq-nameserver:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Duser.home=/root -Xms768m -Xmx768m
value: -Drocketmq.namesrv.addr=10.10.10.145:30003 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Duser.home=/root -Xms768m -Xmx768m
ports:
- containerPort: 8080
volumeMounts:
- name: logs
mountPath: /root/logs
volumes:
- name: logs
hostPath:
path: /root/project-server/backend/rocketmq/console/logs
type: Directory
Service
apiVersion: v1
kind: Service
metadata:
namespace: prod
name: rocketmq-console
labels:
app: rocketmq-console
spec:
type: NodePort
ports:
- port: 8080
targetPort: 8080
nodePort: 30004
selector:
app: rocketmq-console
3. 整合springCloud stream
我这里示意starter组件的方式去做的,所以先编写starter组件提供给各个module服务使用
1. 引入依赖jar包
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
2. 编写生产者消费者、生产者消息生产通道
// 消费者通道
public interface ConsumeChannel {
@Input(MessageConstant.TEST_MESSAGE_INPUT)
SubscribableChannel testInput();
}
// 生产者通道
public interface ProduceChannel {
@Output(MessageConstant.TEST_MESSAGE_OUTPUT)
MessageChannel testOutput();
}
public class MessageConstant {
/**
* 测试demo生产者
*/
public static final String TEST_MESSAGE_OUTPUT = "test-output";
/**
* 测试demo消费者
*/
public static final String TEST_MESSAGE_INPUT = "test-input";
}
3. 配置mq配置文件(比如nameserver的地址、bindings、binder)
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.1.148:9876
# 对应 RocketMQBindingProperties 如果当前服务只是充当生产者/消费者 只需配置bindings中的一项即可
bindings:
test-out:
group: test-topic-produce-group #生产者组 命名规则
sync: true # 是否同步发送消息,默认为 false 异步
test-input:
consumer:
orderly: false # 是否使用顺序消费,默认false,这里直接并发消费
broadcasting: false # 是否使用广播消费,默认为 false
bindings:
# 自定义名称 则是我们的topic TODO 生产配置建议不要自动创建topic,broker配置文件默认配置了生产消费线程、默认队列数、
test-output: { destination: test-topic,content-type: application/json }
test-input: { destination: test-topic,content-type: text/plain,group: test-topic-consume-group,consumer.maxAttempts: 16 }
4. 我这里值启动了一个微服务(引入我们编写的starter组件),既做生生产者又做消费者
@SpringBootApplication
@EnableBinding({ProduceChannel.class, ConsumeChannel.class})
public class Order {
public static void main(String[] args) throws UnknownHostException {
SpringApplication.run(Order.class, args);
}
}
5. 编写生产者发消息
@ApiOperation(value = "测试mq发送消息")
@PostMapping("/sendMqDemo")
public String sendMqDemo(@RequestBody SendMqDemoReq req) {
return produceDemoService.sendMqDemo(req);
}
生产者业务类
@Service
public class ProduceDemoService {
private ProduceChannel produce;
private final RocketMQTemplate rocketMQTemplate;
public ProduceDemoService(ProduceChannel produce, RocketMQTemplate rocketMQTemplate) {
this.produce = produce;
this.rocketMQTemplate = rocketMQTemplate;
}
public String sendMqDemo(SendMqDemoReq req) {
log.info("nameserver地址:" + rocketMQTemplate.getProducer().getNamesrvAddr());
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "demo");//过滤
headers.put(MessageConst.PROPERTY_KEYS, "demo" + UUIDUtil.getUUID());//每条消息的唯一业务id
headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL,"3");//设置延迟级别为 3,10 秒后消费。
Message message = MessageBuilder.createMessage(req, new MessageHeaders(headers));
//Message<String> message = MessageBuilder.withPayload(messageInfo).setHeader(MessageConst.PROPERTY_TAGS, "xxx").build();
boolean send = produce.testOutput().send(message);
if (send) {
return "send ok";
}
return "send fail";
}
}
6. 编写生产者发消息
package com.jimushow.simone.mq.consume;
import com.jimushow.simone.starter.constant.MessageConstant;
import com.jimushow.simone.vo.req.testDemo.SendMqDemoReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
/**
* @author heian
* @date 2021/5/13 10:06
* @description 消费demo 演示
*/
@Service
@Slf4j
public class ConsumeDemoService {
/**
* 监控demo案例input
* @param message 消费文本内容
*/
@StreamListener(MessageConstant.TEST_MESSAGE_INPUT)
public void handler(@Payload SendMqDemoReq message) {
log.info("接收到demo的消息为:{}", message);
}
}
注意点
- 生产tag与消费tag不一致,造成消息丢失
我们发带tag类型的消息时,如果你代码中指定了tag接收,但是生产者发的tag不一致也会导致我们微服务去消费这条消息,消息状态也是consumed。比如这里发送的消息时closeLive,但是服务端只接受leaveRoom