1 简介

阿里开源, Java和Scala开发, 2016年后由Apache基金会维护
image.png

2 安装

(1) docker-compose.yml

  1. version: '3.5'
  2. services:
  3. rmqnamesrv:
  4. image: foxiswho/rocketmq:server
  5. container_name: rmqnamesrv
  6. ports:
  7. - 9876:9876
  8. volumes:
  9. - ./logs:/opt/logs
  10. - ./store:/opt/store
  11. command: sh mqnamesrv
  12. networks:
  13. rmq:
  14. aliases:
  15. - rmqnamesrv
  16. rmqbroker:
  17. image: foxiswho/rocketmq:broker
  18. container_name: rmqbroker
  19. ports:
  20. - 10909:10909
  21. - 10911:10911
  22. volumes:
  23. - ./logs:/opt/logs
  24. - ./store:/opt/store
  25. - ./conf/broker.conf:/etc/rocketmq/broker.conf
  26. command: sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
  27. environment:
  28. NAMESRV_ADDR: "rmqnamesrv:9876"
  29. JAVA_OPTS: " -Duser.home=/opt"
  30. JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn256m"
  31. command: mqbroker -c /etc/rocketmq/broker.conf
  32. depends_on:
  33. - rmqnamesrv
  34. networks:
  35. rmq:
  36. aliases:
  37. - rmqbroker
  38. rmqconsole:
  39. image: styletang/rocketmq-console-ng
  40. container_name: rmqconsole
  41. restart: always
  42. ports:
  43. - 8080:8080
  44. environment:
  45. JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
  46. depends_on:
  47. - rmqnamesrv
  48. volumes:
  49. - ./console/logs:/root/logs
  50. networks:
  51. rmq:
  52. aliases:
  53. - rmqconsole
  54. networks:
  55. rmq:
  56. name: rmq
  57. driver: bridge

(2) ./conf/broker.conf

  1. # nameServer地址,如果nameserver是多台集群的话,就用分号分割
  2. namesrvAddr=192.168.1.103:9876
  3. # 所属集群名字(同一主从下:Master和slave名称要一致)
  4. brokerClusterName=DefaultCluster
  5. # broker名字,注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b
  6. brokerName=broker-a
  7. # 0 表示 Master,>0 表示 Slave
  8. brokerId=0
  9. # 启动IP, 要设置为其它机器能访问的IP
  10. brokerIP1=192.168.1.103
  11. # Broker 对外服务的监听端口
  12. listenPort=10911
  13. # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4
  14. # defaultTopicQueueNums=4
  15. # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
  16. autoCreateTopicEnable=true
  17. # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
  18. autoCreateSubscriptionGroup=true
  19. # commitLog每个文件的大小默认1G
  20. mapedFileSizeCommitLog=1073741824
  21. # ConsumeQueue每个文件默认存30W条,根据业务情况调整
  22. mapedFileSizeConsumeQueue=300000
  23. # 检测可用的磁盘空间大小,当磁盘被占用超过95%,消息写入会直接报错
  24. diskMaxUsedSpaceRatio=95

(3) docker-compose up

image.png
image.png

3 管理后台

http://192.168.1.103:8080/

image.png

  • 添加topic

image.png

4 消息类型

(1) 普通消息(订阅)

image.png

(2) 顺序消息

image.png
image.png

(3) 延时消息

image.png

(4) 事务消息

image.png

5 搭建py开发环境

https://github.com/apache/rocketmq-client-python

(1) 安装librocketmq

wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb

(2) pip install rocketmq-client-python

(3) producer.py

  1. from rocketmq.client import Producer, Message
  2. producer = Producer('PID-XXX')
  3. producer.set_name_server_address('127.0.0.1:9876')
  4. producer.start()
  5. msg = Message('YOUR-TOPIC')
  6. msg.set_keys('XXX')
  7. msg.set_tags('XXX')
  8. msg.set_body('XXXX')
  9. # 延时消息, 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  10. # msg.set_delay_time_level(3)
  11. ret = producer.send_sync(msg)
  12. print(ret.status, ret.msg_id, ret.offset)
  13. producer.shutdown()

(4) consumer.py

  1. from rocketmq.client import PushConsumer, ConsumeStatus
  2. def callback(msg):
  3. print(msg.id, msg.body)
  4. return ConsumeStatus.CONSUME_SUCCESS
  5. consumer = PushConsumer('CID_XXX')
  6. consumer.set_name_server_address('127.0.0.1:9876')
  7. consumer.subscribe('YOUR-TOPIC', callback)
  8. consumer.start()
  9. import time
  10. time.sleep(2)
  11. consumer.shutdown()

0 配置属性详解

NameServer配置属性
参数名 参数类型 描述 默认参数(时间为单位ms,数据单位为byte)
rocketmqHome String RockerMQ主目录,默认用户主目录
namesrvAddr String NameServer地址
kvConfigPath String kv配置文件路径,包含顺序消息主题的配置信息
configStorePath String NameServer配置文件路径,建议使用-c指定NameServer配置文件路径
clusterTest boolean 是否开启集群测试,默认为false
orderMessageEnable boolean 是否支持顺序消息,默认为false
NameServer、Broker、filter网络配置属性
accessMessageInMemoryMaxRatio int 访问消息在内存中比率,默认为40 40
adminBrokerThreadPoolNums int 服务端处理控制台管理命令线程池线程数量 16
autoCreateSubscriptionGroup boolean 是否自动创建消费组 true
autoCreateTopicEnable boolean 是否自动创建主题 true
bitMapLengthConsumeQueueExt int ConsumeQueue扩展过滤bitmap大小 112
brokerClusterName String Broker集群名称 TestCluster
brokerFastFailureEnable boolean 是否支持broker快速失败 如果为true表示会立即清除发送消息线程池,消息拉取线程池中排队任务 ,直接返回系统错误 true
brokerId int brokerID 0表示主节点 大于0表示从节点 0
brokerIP1 String Broker服务地址
brokerIP2 String BrokerHAIP地址,供slave同步消息的地址
brokerName String Broker服务器名称morning服务器hostname broker-a
brokerPermission int Broker权限 默认为6表示可读可写 6
brokerRole enum broker角色,分为 ASYNC_MASTER SYNC_MASTER, SLAVE ASYNC_MASTER
brokerTopicEnable boolean broker名称是否可以用做主体使用 true
channelNotActiveInterval long 60000
checkCRCOnRecover boolean 文件恢复时是否校验CRC true
cleanFileForciblyEnable boolean 是否支持强行删除过期文件 true
cleanResourceInterval int 清除过期文件线程调度频率 10000
clientAsyncSemaphoreValue int 65535
clientCallbackExecutorThreads int 8
clientChannelMaxIdleTimeSeconds int 120
clientCloseSocketIfTimeout boolean false
clientManagerThreadPoolQueueCapacity int 客户端管理线程池任务队列初始大小 1000000
clientManageThreadPoolNums int 服务端处理客户端管理(心跳 注册 取消注册线程数量) 32
clientOnewaySemaphoreValue int 65535
clientPooledByteBufAllocatorEnable boolean false
clientSocketRcvBufSize long 客户端socket接收缓冲区大小 131072
clientSocketSndBufSize long 客户端socket发送缓冲区大小 131072
clientWorkerThreads int 4
clusterTopicEnable boolean 集群名称是否可用在主题使用 true
commercialBaseCount int 1
commercialBigCount int 1
commercialEnable boolean true
commercialTimerCount int 1
commercialTransCount int 1
commitCommitLogLeastPages int 一次提交至少需要脏页的数量,默认4页,针对 commitlog文件 4
commitCommitLogThoroughInterval int Commitlog两次提交的最大间隔,如果超过该间隔,将忽略commitCommitLogLeastPages直接提交 200
commitIntervalCommitLog int commitlog提交频率 200
compressedRegister boolean false
connectTimeoutMillis long 链接超时时间 3000
consumerFallbehindThreshold long 消息消费堆积阈值默认16GB在disableConsumeifConsumeIfConsumerReadSlowly为true时生效 17179869184
consumerManagerThreadPoolQueueCapacity int 消费管理线程池任务队列大小 1000000
consumerManageThreadPoolNums int 服务端处理消费管理 获取消费者列表 更新消费者进度查询消费进度等 32
debugLockEnable boolean 是否支持 PutMessage Lock锁打印信息 false
defaultQueryMaxNum int 查询消息默认返回条数,默认为32 32
defaultTopicQueueNums int 主体在一个broker上创建队列数量 8
deleteCommitLogFilesInterval int 删除commitlog文件的时间间隔,删除一个文件后等一下再删除一个文件 100
deleteConsumeQueueFilesInterval int 删除consumequeue文件时间间隔 100
deleteWhen String 磁盘文件空间充足情况下,默认每天什么时候执行删除过期文件,默认04表示凌晨4点 04
destroyMapedFileIntervalForcibly int 销毁MappedFile被拒绝的最大存活时间,默认120s。清除过期文件线程在初次销毁mappedfile时,如果该文件被其他线程引用,引用次数大于0.则设置MappedFile的可用状态为false,并设置第一次删除时间,下一次清理任务到达时,如果系统时间大于初次删除时间加上本参数,则将ref次数一次减1000,知道引用次数小于0,则释放物理资源 120000
disableConsumeIfConsumerReadSlowly boolean 如果消费组消息消费堆积是否禁用该消费组继续消费消息 false
diskFallRecorded boolean 是否统计磁盘的使用情况,默认为true true
diskMaxUsedSpaceRatio int commitlog目录所在分区的最大使用比例,如果commitlog目录所在的分区使用比例大于该值,则触发过期文件删除 75
duplicationEnable boolean 是否允许重复复制,默认为 false false
enableCalcFilterBitMap boolean 是否开启比特位映射,这个属性不太明白 false
enableConsumeQueueExt boolean 是否启用ConsumeQueue扩展属性 false
enablePropertyFilter boolean 是否支持根据属性过滤 如果使用基于标准的sql92模式过滤消息则改参数必须设置为true false
endTransactionPoolQueueCapacity int 处理提交和回滚消息线程池线程队列大小 100000
endTransactionThreadPoolNums int 处理提交和回滚消息线程池 24
expectConsumerNumUseFilter boolean 布隆过滤器参数 32
fastFailIfNoBufferInStorePool boolean 从 transientStorepool中获取 ByteBuffer是否支持快速失败 false
fetchNamesrvAddrByAddressServer boolean 是否支持从服务器获取nameServer false
fileReservedTime String 文件保留时间,默认72小时,表示非当前写文件最后一次更新时间加上filereservedtime小与当前时间,该文件将被清理 120
filterDataCleanTimeSpan long 清除过滤数据的时间间隔 86400000
filterServerNums int broker服务器过滤服务器数量 0
filterSupportRetry boolean 消息过滤是否支持重试 false
flushCommitLogLeastPages int 一次刷盘至少需要脏页的数量,针对commitlog文件 4
flushCommitLogThoroughInterval int commitlog两次刷盘的最大间隔,如果超过该间隔,将fushCommitLogLeastPages要求直接执行刷盘操作 10000
flushCommitLogTimed boolean 表示await方法等待FlushIntervalCommitlog,如果为true表示使用Thread.sleep方法等待 false
flushConsumeQueueLeastPages int 一次刷盘至少需要脏页的数量,默认2页,针对 Consume文件 2
flushConsumeQueueThoroughInterval int Consume两次刷盘的最大间隔,如果超过该间隔,将忽略 60000
flushConsumerOffsetHistoryInterval int fushConsumeQueueLeastPages直接刷盘 60000
flushConsumerOffsetInterval int 持久化消息消费进度 consumerOffse.json文件的频率ms 5000
flushDelayOffsetInterval long 延迟队列拉取进度刷盘间隔。默认10s 10000
flushDiskType enum 刷盘方式,默认为 ASYNC_FLUSH(异步刷盘),可选值SYNC_FLUSH(同步刷盘) ASYNC_FLUSH
flushIntervalCommitLog int commitlog刷盘频率 500
flushIntervalConsumeQueue int consumuQueue文件刷盘频率 1000
flushLeastPagesWhenWarmMapedFile int 用字节0填充整个文件的,每多少页刷盘一次。默认4096页,异步刷盘模式生效 4096
forceRegister boolean 是否强制注册 true
haHousekeepingInterval int Master与save长连接空闲时间,超过该时间将关闭连接 20000
haListenPort int Master监听端口,从服务器连接该端口,默认为10912 10912
haMasterAddress String Master服务器IP地址与端口号
haSendHeartbeatInterval int Master与Slave心跳包发送间隔 5000
haSlaveFallbehindMax int 允许从服务器落户的最大偏移字节数,默认为256M。超过该值则表示该Slave不可用 268435456
haTransferBatchSize int 一次HA主从同步传输的最大字节长度,默认为32K 32768
heartbeatThreadPoolNums int 心跳线程池线程数 8
heartbeatThreadPoolQueueCapacity int 心跳线程队列数量 50000
highSpeedMode boolean 当前版本未使用 false
listenPort int 服务端监听端口 10911
longPollingEnable boolean 是否开启长轮训 true
mapedFileSizeCommitLog int 单个conmmitlog文件大小默认1GB 1073741824
mapedFileSizeConsumeQueue int 单个consumequeue文件大小默认30W*20表示单个Consumequeue文件中存储30W个ConsumeQueue条目 6000000
mappedFileSizeConsumeQueueExt int ConsumeQueue扩展文件大小默认48MB 50331648
maxDelayTime int 当前版本未使用 40
maxErrorRateOfBloomFilter int 布隆过滤器参数 20
maxHashSlotNum int 单个索引文件hash槽的个数,默认为五百万 5000000
maxIndexNum int 单个索引文件索引条目的个数,默认为两千万 20000000
maxMessageSize int 默认允许的最大消息体默认4M 4194304
maxMsgsNumBatch int 一次查询消息最大返回消息条数,默认64条 64
maxTransferBytesOnMessageInDisk 一次服务消息端消息拉取,消息在磁盘中传输允许的最大字节 65536
maxTransferBytesOnMessageInMemory int 一次服务端消息拉取,消息在内存中传输允许的最大传输字节数默认256kb 262144
maxTransferCountOnMessageInDisk int 一次消息服务端消息拉取,消息在磁盘中传输允许的最大条数,默认为8条 8
maxTransferCountOnMessageInMemory int 一次服务消息拉取,消息在内存中传输运行的最大消息条数,默认为32条 32
messageDelayLevel String 延迟队列等级(s=秒,m=分,h=小时) 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
messageIndexEnable boolean 是否支持消息索引文件 true
messageIndexSafe boolean 消息索引是否安全,默认为 false,文件恢复时选择文件检测点(commitlog.consumeque)的最小的与文件最后更新对比,如果为true,文件恢复时选择文件检测点保存的索引更新时间作为对比 false
messageStorePlugIn String 消息存储插件地址默认为空字符串
namesrvAddr String nameServer地址
notifyConsumerIdsChangedEnable 消费者数量变化后是否立即通知RebalenceService线程,以便马上进行重新负载 true
offsetCheckInSlave boolean 从服务器是否坚持 offset检测 false
osPageCacheBusyTimeOutMills long putMessage锁占用超过该时间,表示 PageCache忙 1000
pullMessageThreadPoolNums int 服务端处理消息拉取线程池线程数量 默认为16加上当前操作系统CPU核数的两倍 32
pullThreadPoolQueueCapacity int 消息拉去线程池任务队列初始大小 100000
putMsgIndexHightWater int 当前版本未使用 600000
queryMessageThreadPoolNums int 服务端处理查询消息线程池数量默认为8加上当前操作系统CPU核数的两倍 16
queryThreadPoolQueueCapacity int 查询消息线程池任务队列初始大小 20000
redeleteHangedFileInterval int 重试删除文件间隔,配合destorymapedfileintervalforcibly 120000
regionId String 消息区域 DefaultRegion
registerBrokerTimeoutMills int 注册broker超时时间 6000
registerNameServerPeriod int broker注册频率 大于1分钟为1分钟小于10秒为10秒 30000
rejectTransactionMessage boolean 是否拒绝事物消息 false
rocketmqHome String RocketMQ主目录 /home/rocketmq/rocketmq-all-4.3.2-bin-release
sendMessageThreadPoolNums int 服务端处理消息发送线程池数量 1
sendThreadPoolQueueCapacity int 消息发送线程池任务队列初始大小 10000
serverAsyncSemaphoreValue int 异步消息发送最大并发度 64
serverCallbackExecutorThreads int netty public任务线程池个数,netty网络设计没根据业务类型会创建不同线程池毛笔如处理发送消息,消息消费心跳检测等。如果业务类型(RequestCode)未注册线程池,则由public线程池执行 0
serverChannelMaxIdleTimeSeconds int 网络连接最大空闲时间。如果链接空闲时间超过此参数设置的值,连接将被关闭 120
serverOnewaySemaphoreValue int send oneway消息请求并发度 256
serverPooledByteBufAllocatorEnable boolean ByteBuffer是否开启缓存 true
serverSelectorThreads int IO线程池线程个数,主要是NameServer.broker端解析请求,返回相应的线程个数,这类县城主要是处理网络请求的,解析请求包。然后转发到各个业务线程池完成具体的业务无操作,然后将结果在返回调用方 3
serverSocketRcvBufSize int netty网络socket接收缓存区大小16MB 131072
serverSocketSndBufSize int netty网络socket发送缓存区大小16MB 131072
serverWorkerThreads int netty业务线程池个数 8
shortPollingTimeMills long 短轮训等待时间 1000
slaveReadEnable boolean 从节点是否可读 false
startAcceptSendRequestTimeStamp int 0
storePathCommitLog String Commitlog存储目录默认为${storePathRootDir}/commitlog /home/rocketmq/store/commitlog
storePathRootDir String broker存储目录 默认为用户的主目录/store /home/rocketmq/store
syncFlushTimeout long 同步刷盘超时时间 5000
traceOn boolean true
transactionCheckInterval long 事物回查周期 60000
transactionCheckMax int 事物回查次数 15
transactionTimeOut long 事物回查超时时间 6000
transferMsgByHeap boolean 消息传输是否使用堆内存 true
transientStorePoolEnable boolean Commitlog是否开启 transientStorePool机制,默认为 false false
transientStorePoolSize int transientStorePool中缓存 ByteBuffer个数,默认5个 5
useEpollNativeSelector boolean 是否启用Epoll IO模型。Linux环境建议开启 false
useReentrantLockWhenPutMessage boolean 消息存储到commitlog文件时获取锁类型,如果为true使用ReentrantLock否则使用自旋锁 false
useTLS boolean 是否使用安全传输层协议 false
waitTimeMillsInHeartbeatQueue long 清理broker心跳线程等待时间 31000
waitTimeMillsInPullQueue long 清除消息拉取线程池任务队列的等待时间。如果系统时间减去任务放入队列中的时间小于waitTimeMillsInPullQueue,本次请求任务暂时不移除该任务 5000
waitTimeMillsInSendQueue long 清除发送线程池任务队列的等待时间。如果系统时间减去任务放入队列中的时间小于waitTimeMillsInSendQueue,本次请求任务暂时不移除该任务 200
waitTimeMillsInTransactionQueue long 清理提交和回滚消息线程队列等待时间 3000
warmMapedFileEnable boolean 是否温和地使用 MappedFile如果为true,将不强制将内存映射文件锁定在内存中 false
connectWhichBroker String FilterServer连接的Broker地址
filterServerIP String FilterServerIP地址,默认为本地服务器IP
compressMsgBodyOverHowmuch int 如果消息Body超过该值则启用
zipCompresslevel int Zip压缩方式,默认为5,详细定义请参考java.util.Deflate中的定义
clientUploadFilterClassEnable boolean 是否支持客户端上传 FilterClass代码
filterClassRepertoryUrl String filterClass服务地址,如果 clientUploadFilterClassEnable为false,则需要提供一个地址从该服务器获取过滤类的代码
fsServerAsyncSemaphorevalue int FilterServer异步请求并发度,默认为2048
fsServerCallbackExecutorThreads int 处理回调任务的线程池数量,默认为64
fsServerWorkerThreads int 远程服务调用线程池数量,默认为64