1. docker-compose部署

compose文件

  1. version: '3.5'
  2. services:
  3. rmqnamesrv:
  4. image: foxiswho/rocketmq:server
  5. container_name: rocketmq-nameserver
  6. ports:
  7. - 9876:9876
  8. volumes:
  9. - ./volume/logs:/opt/logs
  10. - ./volume/store:/opt/store
  11. restart: always
  12. networks:
  13. rmq:
  14. aliases:
  15. - rmqnamesrv
  16. rmqbroker:
  17. image: foxiswho/rocketmq:broker
  18. container_name: rocketmq-broker
  19. ports:
  20. - 10909:10909
  21. - 10911:10911
  22. volumes:
  23. - ./volume/logs:/opt/logs
  24. - ./volume/store:/opt/store
  25. #- ./volume/brokerconf/broker.conf:/etc/rocketmq/broker.conf
  26. - ./volume/brokerconf/:/etc/rocketmq/
  27. environment:
  28. TZ: Asia/Shanghai
  29. NAMESRV_ADDR: "rmqnamesrv:9876"
  30. JAVA_OPTS: " -Duser.home=/opt"
  31. JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn128m"
  32. command: mqbroker -c /etc/rocketmq/broker.conf
  33. depends_on:
  34. - rmqnamesrv
  35. restart: always
  36. networks:
  37. rmq:
  38. aliases:
  39. - rmqbroker
  40. rmqconsole:
  41. image: styletang/rocketmq-console-ng
  42. container_name: rocketmq-console
  43. ports:
  44. - 8081:8080
  45. environment:
  46. JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
  47. depends_on:
  48. - rmqnamesrv
  49. restart: always
  50. networks:
  51. rmq:
  52. aliases:
  53. - rmqconsole
  54. networks:
  55. rmq:
  56. name: rmq
  57. driver: bridge

涉及到三个服务,分别是nameserver(9876端口)、broker(10911通信端口)、rocket-console(8081),因为我是单机部署在一台的服务器上,使用桥接方式并给在同一个network中,是三个容器可以进行通信。
数据挂载如下:
image.png

conf文件配置

  1. # 所属集群名字
  2. brokerClusterName=DefaultCluster
  3. # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
  4. # 在 broker-b.properties 使用: broker-b
  5. brokerName=broker-a
  6. # 0 表示 Master,> 0 表示 Slave
  7. brokerId=0
  8. # nameServer地址,分号分割
  9. # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
  10. # 启动IP,如果 docker 报 Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.25.0.3:10911> failed
  11. # 解决方式1 加上一句 producer.setVipChannelEnabled(false);
  12. # 解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP,默认使用的是内部ip (你自己宿主机的IP,请参考自己)
  13. brokerIP1=192.168.1.148
  14. # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
  15. defaultTopicQueueNums=4
  16. # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
  17. autoCreateTopicEnable=true
  18. # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
  19. autoCreateSubscriptionGroup=true
  20. # Broker 对外服务的监听端口
  21. listenPort=10911
  22. # 删除文件时间点,默认凌晨4点
  23. deleteWhen=04
  24. # 文件保留时间,默认48小时
  25. fileReservedTime=120
  26. # commitLog 每个文件的大小默认1G
  27. mapedFileSizeCommitLog=1073741824
  28. # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
  29. mapedFileSizeConsumeQueue=300000
  30. # destroyMapedFileIntervalForcibly=120000
  31. # redeleteHangedFileInterval=120000
  32. # 检测物理文件磁盘空间
  33. diskMaxUsedSpaceRatio=88
  34. # 存储路径
  35. #storePathRootDir=/opt/store
  36. # commitLog 存储路径
  37. #storePathCommitLog=/opt/logs/commitlog
  38. # 消费队列存储
  39. #storePathConsumeQueue=/opt/store/consumequeue
  40. # 消息索引存储路径
  41. #storePathIndex=/opt/store/index
  42. # checkpoint 文件存储路径
  43. #storeCheckpoint=/opt/store/checkpoint
  44. # abort 文件存储路径
  45. #abortFile=/opt/store/abort
  46. # 限制的消息大小
  47. maxMessageSize=65536
  48. # flushCommitLogLeastPages=4
  49. # flushConsumeQueueLeastPages=2
  50. # flushCommitLogThoroughInterval=10000
  51. # flushConsumeQueueThoroughInterval=60000
  52. # Broker 的角色
  53. # - ASYNC_MASTER 异步复制Master
  54. # - SYNC_MASTER 同步双写Master
  55. # - SLAVE
  56. brokerRole=SYNC_MASTER
  57. # 刷盘方式
  58. # - ASYNC_FLUSH 异步刷盘
  59. # - SYNC_FLUSH 同步刷盘
  60. flushDiskType=ASYNC_FLUSH
  61. # 发消息线程池数量
  62. sendMessageThreadPoolNums=128
  63. # 拉消息线程池数量
  64. pullMessageThreadPoolNums=128

如果出现了宿主机挂在文件挂在不进去,报错:权限问题:执行 chown -R 3000:3000 ./volume

2. k8s部署

部署所遇问题点

我这里没有用上面得那个镜像,使用得是官方镜像。但是部署得时候会出点小差错,下面是我部署出现得问题点。

1. 部署broker服务一直重启

挂载的文件没有出现在宿主机挂在的位置。我猜想原因是部署用的是rocketmq用户创建,所以没有权限挂在在宿主机指定位置。甚至当时我部署broker一直重启,也有可能是这个原因。所以请在宿主机挂在目录(我的是/root/project-server/backend/rocketmq/broker)执行chmod -R 777 ./

2. 部署console报错

部署console的时候一直报错:意思是我的console无法连接到我对应的nameserver。

  1. java.lang.RuntimeException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed

经过排查发现并不是我nameserver没有连接上,而是我的nameserver连接上了但是它没有连接上对应的broker。我是通过beoker挂在的日志发现的。
image.png

安装部署参考:大佬的github (很多后台服务得安装,比如es、mq等等) 大佬的dockerhub (后台部署所需得镜像)

NameServer

StatefulSet

  1. apiVersion: apps/v1
  2. kind: StatefulSet
  3. metadata:
  4. name: rocketmq-nameserver
  5. labels:
  6. app: rocketmq-nameserver
  7. namespace: prod
  8. spec:
  9. serviceName: rocketmq-nameserver
  10. replicas: 1
  11. selector:
  12. matchLabels:
  13. app: rocketmq-nameserver
  14. template:
  15. metadata:
  16. labels:
  17. app: rocketmq-nameserver
  18. spec:
  19. containers:
  20. - name: rocketmq-nameserver
  21. image: rocketmqinc/rocketmq:4.4.0
  22. imagePullPolicy: IfNotPresent
  23. resources:
  24. limits:
  25. cpu: 250m
  26. memory: 1000Mi
  27. requests:
  28. cpu: 100m
  29. memory: 768Mi
  30. env:
  31. - name: TZ
  32. value: Asia/Shanghai
  33. - name: JAVA_OPT
  34. value: -Duser.home=/opt
  35. - name: JAVA_OPT_EXT
  36. value: -server -Xms1024m -Xmx1024m -Xmn512m
  37. command: ["sh","mqnamesrv"]
  38. ports:
  39. - containerPort: 9876
  40. volumeMounts:
  41. - name: logs
  42. mountPath: /opt/logs
  43. volumes:
  44. - name: logs
  45. hostPath:
  46. path: /root/project-server/backend/rocketmq/nameserver/logs
  47. type: Directory

Service

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. namespace: prod
  5. name: rocketmq-nameserver
  6. labels:
  7. app: rocketmq-nameserver
  8. spec:
  9. type: NodePort
  10. ports:
  11. - port: 9876
  12. targetPort: 9876
  13. nodePort: 30003
  14. selector:
  15. app: rocketmq-nameserver

broker

StatefulSet

  1. apiVersion: apps/v1
  2. kind: StatefulSet
  3. metadata:
  4. name: rocketmq-broker
  5. namespace: prod
  6. labels:
  7. app: rocketmq-broker
  8. spec:
  9. serviceName: rocketmq-broker
  10. replicas: 1
  11. selector:
  12. matchLabels:
  13. app: rocketmq-broker
  14. template:
  15. metadata:
  16. labels:
  17. app: rocketmq-broker
  18. spec:
  19. containers:
  20. - name: rocketmq-broker
  21. image: rocketmqinc/rocketmq:4.4.0
  22. imagePullPolicy: IfNotPresent
  23. resources:
  24. limits:
  25. cpu: 440m
  26. memory: 2000Mi
  27. requests:
  28. cpu: 400m
  29. memory: 2000Mi
  30. env:
  31. - name: TZ
  32. value: Asia/Shanghai
  33. - name: JAVA_OPT
  34. value: -server -XX:ParallelGCThreads=1
  35. command: ["sh","mqbroker","-c","../conf/broker.conf"]
  36. ports:
  37. - containerPort: 30909
  38. - containerPort: 30911
  39. - containerPort: 30912
  40. volumeMounts:
  41. - name: logs
  42. mountPath: /home/rocketmq/logs
  43. - name: store
  44. mountPath: /home/rocketmq/store
  45. - name: broker-conf
  46. mountPath: /opt/rocketmq-4.4.0/conf/broker.conf
  47. volumes:
  48. - name: logs
  49. hostPath:
  50. path: /root/project-server/backend/rocketmq/broker/logs
  51. type: Directory
  52. - name: store
  53. hostPath:
  54. path: /root/project-server/backend/rocketmq/broker/store
  55. type: Directory
  56. - name: broker-conf
  57. hostPath:
  58. path: /root/project-server/backend/rocketmq/broker/conf/broker.conf
  59. type: File

Service

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. namespace: prod
  5. name: rocketmq-broker
  6. labels:
  7. app: rocketmq-broker
  8. spec:
  9. type: NodePort
  10. ports:
  11. - port: 30909
  12. targetPort: 30909
  13. name: listen-1
  14. nodePort: 30909
  15. - port: 30911
  16. targetPort: 30911
  17. name: listen-2
  18. nodePort: 30911
  19. - port: 30912
  20. targetPort: 30912
  21. name: ha-service
  22. nodePort: 30912
  23. selector:
  24. app: rocketmq-broker
  1. # 所属集群名字
  2. brokerClusterName=DefaultCluster
  3. # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
  4. # 在 broker-b.properties 使用: broker-b
  5. brokerName=broker-a
  6. # 0 表示 Master,> 0 表示 Slave
  7. brokerId=0
  8. # nameServer地址,分号分割
  9. # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
  10. #namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
  11. namesrvAddr=10.10.10.111:30003
  12. # 启动IP,如果 docker 报 Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.25.0.3:10911> failed
  13. # 解决方式1 加上一句 producer.setVipChannelEnabled(false);
  14. # 解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP,默认使用的是内部ip (你自己宿主机的IP,请参考自己)
  15. brokerIP1=10.10.10.111
  16. # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
  17. defaultTopicQueueNums=4
  18. # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
  19. autoCreateTopicEnable=true
  20. # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
  21. autoCreateSubscriptionGroup=true
  22. # Broker 对外服务的监听端口 listenPort=10911
  23. listenPort=30911
  24. #haService中使用 默认值为:listenPort + 1
  25. #haListenPort=30912
  26. #主要用于slave同步master listenPort - 2
  27. #fastListenPort=30909
  28. # 删除文件时间点,默认凌晨4点
  29. deleteWhen=04
  30. # 文件保留时间,默认48小时
  31. fileReservedTime=120
  32. # commitLog 每个文件的大小默认1G
  33. mapedFileSizeCommitLog=1073741824
  34. # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
  35. mapedFileSizeConsumeQueue=300000
  36. # destroyMapedFileIntervalForcibly=120000
  37. # redeleteHangedFileInterval=120000
  38. # 检测物理文件磁盘空间,生产上建议85左右
  39. diskMaxUsedSpaceRatio=85
  40. # 存储路径
  41. #storePathRootDir=/opt/store
  42. # commitLog 存储路径
  43. #storePathCommitLog=/opt/logs/commitlog
  44. # 消费队列存储
  45. #storePathConsumeQueue=/opt/store/consumequeue
  46. # 消息索引存储路径
  47. #storePathIndex=/opt/store/index
  48. # checkpoint 文件存储路径
  49. #storeCheckpoint=/opt/store/checkpoint
  50. # abort 文件存储路径
  51. #abortFile=/opt/store/abort
  52. # 限制的消息大小
  53. maxMessageSize=65536
  54. # flushCommitLogLeastPages=4
  55. # flushConsumeQueueLeastPages=2
  56. # flushCommitLogThoroughInterval=10000
  57. # flushConsumeQueueThoroughInterval=60000
  58. # Broker 的角色
  59. # - ASYNC_MASTER 异步复制Master
  60. # - SYNC_MASTER 同步双写Master
  61. # - SLAVE
  62. brokerRole=SYNC_MASTER
  63. # 刷盘方式
  64. # - ASYNC_FLUSH 异步刷盘
  65. # - SYNC_FLUSH 同步刷盘
  66. flushDiskType=ASYNC_FLUSH
  67. # 发消息线程池数量
  68. sendMessageThreadPoolNums=128
  69. # 拉消息线程池数量
  70. pullMessageThreadPoolNums=128

console

StatefulSet

  1. apiVersion: apps/v1
  2. kind: StatefulSet
  3. metadata:
  4. name: rocketmq-console
  5. namespace: prod
  6. labels:
  7. app: rocketmq-console
  8. spec:
  9. serviceName: rocketmq-console
  10. replicas: 1
  11. selector:
  12. matchLabels:
  13. app: rocketmq-console
  14. template:
  15. metadata:
  16. labels:
  17. app: rocketmq-console
  18. spec:
  19. containers:
  20. - name: rocketmq-console
  21. image: styletang/rocketmq-console-ng
  22. imagePullPolicy: IfNotPresent
  23. resources:
  24. limits:
  25. cpu: 450m
  26. memory: 1768Mi
  27. requests:
  28. cpu: 100m
  29. memory: 768Mi
  30. env:
  31. - name: TZ
  32. value: Asia/Shanghai
  33. - name: JAVA_OPTS
  34. #value: -Drocketmq.namesrv.addr=rocketmq-nameserver:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Duser.home=/root -Xms768m -Xmx768m
  35. value: -Drocketmq.namesrv.addr=10.10.10.145:30003 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Duser.home=/root -Xms768m -Xmx768m
  36. ports:
  37. - containerPort: 8080
  38. volumeMounts:
  39. - name: logs
  40. mountPath: /root/logs
  41. volumes:
  42. - name: logs
  43. hostPath:
  44. path: /root/project-server/backend/rocketmq/console/logs
  45. type: Directory

Service

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. namespace: prod
  5. name: rocketmq-console
  6. labels:
  7. app: rocketmq-console
  8. spec:
  9. type: NodePort
  10. ports:
  11. - port: 8080
  12. targetPort: 8080
  13. nodePort: 30004
  14. selector:
  15. app: rocketmq-console

3. 整合springCloud stream

我这里示意starter组件的方式去做的,所以先编写starter组件提供给各个module服务使用
image.png

1. 引入依赖jar包

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  4. </dependency>

2. 编写生产者消费者、生产者消息生产通道

  1. // 消费者通道
  2. public interface ConsumeChannel {
  3. @Input(MessageConstant.TEST_MESSAGE_INPUT)
  4. SubscribableChannel testInput();
  5. }
  6. // 生产者通道
  7. public interface ProduceChannel {
  8. @Output(MessageConstant.TEST_MESSAGE_OUTPUT)
  9. MessageChannel testOutput();
  10. }
  11. public class MessageConstant {
  12. /**
  13. * 测试demo生产者
  14. */
  15. public static final String TEST_MESSAGE_OUTPUT = "test-output";
  16. /**
  17. * 测试demo消费者
  18. */
  19. public static final String TEST_MESSAGE_INPUT = "test-input";
  20. }

3. 配置mq配置文件(比如nameserver的地址、bindings、binder)

  1. spring:
  2. cloud:
  3. stream:
  4. rocketmq:
  5. binder:
  6. name-server: 192.168.1.148:9876
  7. # 对应 RocketMQBindingProperties 如果当前服务只是充当生产者/消费者 只需配置bindings中的一项即可
  8. bindings:
  9. test-out:
  10. group: test-topic-produce-group #生产者组 命名规则
  11. sync: true # 是否同步发送消息,默认为 false 异步
  12. test-input:
  13. consumer:
  14. orderly: false # 是否使用顺序消费,默认false,这里直接并发消费
  15. broadcasting: false # 是否使用广播消费,默认为 false
  16. bindings:
  17. # 自定义名称 则是我们的topic TODO 生产配置建议不要自动创建topic,broker配置文件默认配置了生产消费线程、默认队列数、
  18. test-output: { destination: test-topic,content-type: application/json }
  19. test-input: { destination: test-topic,content-type: text/plain,group: test-topic-consume-group,consumer.maxAttempts: 16 }

4. 我这里值启动了一个微服务(引入我们编写的starter组件),既做生生产者又做消费者

  1. @SpringBootApplication
  2. @EnableBinding({ProduceChannel.class, ConsumeChannel.class})
  3. public class Order {
  4. public static void main(String[] args) throws UnknownHostException {
  5. SpringApplication.run(Order.class, args);
  6. }
  7. }

5. 编写生产者发消息

  1. @ApiOperation(value = "测试mq发送消息")
  2. @PostMapping("/sendMqDemo")
  3. public String sendMqDemo(@RequestBody SendMqDemoReq req) {
  4. return produceDemoService.sendMqDemo(req);
  5. }

生产者业务类

  1. @Service
  2. public class ProduceDemoService {
  3. private ProduceChannel produce;
  4. private final RocketMQTemplate rocketMQTemplate;
  5. public ProduceDemoService(ProduceChannel produce, RocketMQTemplate rocketMQTemplate) {
  6. this.produce = produce;
  7. this.rocketMQTemplate = rocketMQTemplate;
  8. }
  9. public String sendMqDemo(SendMqDemoReq req) {
  10. log.info("nameserver地址:" + rocketMQTemplate.getProducer().getNamesrvAddr());
  11. Map<String, Object> headers = new HashMap<>();
  12. headers.put(MessageConst.PROPERTY_TAGS, "demo");//过滤
  13. headers.put(MessageConst.PROPERTY_KEYS, "demo" + UUIDUtil.getUUID());//每条消息的唯一业务id
  14. headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL,"3");//设置延迟级别为 3,10 秒后消费。
  15. Message message = MessageBuilder.createMessage(req, new MessageHeaders(headers));
  16. //Message<String> message = MessageBuilder.withPayload(messageInfo).setHeader(MessageConst.PROPERTY_TAGS, "xxx").build();
  17. boolean send = produce.testOutput().send(message);
  18. if (send) {
  19. return "send ok";
  20. }
  21. return "send fail";
  22. }
  23. }

6. 编写生产者发消息

package com.jimushow.simone.mq.consume;

import com.jimushow.simone.starter.constant.MessageConstant;
import com.jimushow.simone.vo.req.testDemo.SendMqDemoReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

/**
 * @author heian
 * @date 2021/5/13 10:06
 * @description 消费demo 演示
 */
@Service
@Slf4j
public class ConsumeDemoService {

    /**
     * 监控demo案例input
     * @param message 消费文本内容
     */
    @StreamListener(MessageConstant.TEST_MESSAGE_INPUT)
    public void handler(@Payload SendMqDemoReq message) {
        log.info("接收到demo的消息为:{}", message);
    }

}

注意点

  1. 生产tag与消费tag不一致,造成消息丢失

我们发带tag类型的消息时,如果你代码中指定了tag接收,但是生产者发的tag不一致也会导致我们微服务去消费这条消息,消息状态也是consumed。比如这里发送的消息时closeLive,但是服务端只接受leaveRoom
image.png
image.png