docker compose搭建rockermq

在根目录下创建docker-env目录

  1. docker-env
  2. ├─conf
  3. ├─logs
  4. └─store

docker-env/conf/broker.conf文件内容

  1. brokerClusterName = DefaultCluster
  2. brokerName = broker-a
  3. brokerId = 0
  4. deleteWhen = 04
  5. fileReservedTime = 48
  6. brokerRole = ASYNC_MASTER
  7. flushDiskType = ASYNC_FLUSH
  8. # 主机IP
  9. brokerIP1 = 192.168.110.64

docker-compose.yaml内容

  1. version: "3"
  2. services:
  3. mqnamesrv:
  4. image: foxiswho/rocketmq:4.7.0
  5. container_name: mqnamesrv
  6. ports:
  7. - 9876:9876
  8. environment:
  9. JAVA_OPT: -server -Xms256m -Xmx256m
  10. command: sh mqnamesrv
  11. mqbroker:
  12. image: foxiswho/rocketmq:4.7.0
  13. container_name: mqbroker
  14. ports:
  15. - 10909:10909
  16. - 10911:10911
  17. volumes:
  18. - ./docker-env/logs:/opt/logs
  19. - ./docker-env/store:/opt/store
  20. - ./docker-env/conf/broker.conf:/etc/rocketmq/broker.conf
  21. environment:
  22. NAMESRV_ADDR: mqnamesrv:9876
  23. JAVA_OPT_EXT: -server -Xms256m -Xmx256m -Xmn128m
  24. command: sh mqbroker -n mqnamesrv:9876 -c /etc/rocketmq/broker.conf
  25. mqconsole:
  26. image: styletang/rocketmq-console-ng
  27. container_name: mqconsole
  28. ports:
  29. - 19876:8080
  30. environment:
  31. JAVA_OPTS: -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=falses

基本概念

官方文档:https://rocketmq.apache.org/zh/docs/introduction/02concepts
本文介绍 Apache RocketMQ 的基本概念,以便您更好地理解和使用 Apache RocketMQ 。

主题(Topic)

Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。更多信息,请参见主题(Topic)

消息类型(MessageType)

Apache RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。 Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
信息
Apache RocketMQ 从5.0版本开始,支持强制校验消息类型,即每个主题Topic只允许发送一种消息类型的消息,这样可以更好的运维和管理生产系统,避免混乱。但同时保证向下兼容4.x版本行为,强制校验功能默认关闭,推荐通过服务端参数 enableTopicMessageTypeCheck 手动开启校验。

消息队列(MessageQueue)

队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。更多信息,请参见队列(MessageQueue)

消息(Message)

消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。更多信息,请参见消息(Message)

消息视图(MessageView)

消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。

消息标签(MessageTag)

消息标签是Apache RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。更多信息,请参见消息过滤

消息位点(MessageQueueOffset)

消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。更多信息,请参见消费进度管理

消费位点(ConsumerOffset)

一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。更多信息,请参见消费进度管理

消息索引(MessageKey)

消息索引是Apache RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。

生产者(Producer)

生产者是Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。更多信息,请参见生产者(Producer)

事务检查器(TransactionChecker)

Apache RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。更多信息,请参见事务消息

事务状态(TransactionResolution)

Apache RocketMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。更多信息,请参见事务消息

消费者分组(ConsumerGroup)

消费者分组是Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。更多信息,请参见消费者分组(ConsumerGroup)

消费者(Consumer)

消费者是Apache RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。更多信息,请参见消费者(Consumer)

消费结果(ConsumeResult)

Apache RocketMQ 中PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。

订阅关系(Subscription)

订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。更多信息,请参见订阅关系(Subscription)

消息过滤

消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在Apache RocketMQ 的服务端完成。更多信息,请参见消息过滤

重置消费位点

以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到Apache RocketMQ 服务端的消息。更多信息,请参见重置消费位点

消息轨迹

在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由Apache RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。

消息堆积

生产者已经将消息发送到Apache RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。

事务消息

事务消息是Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

定时/延时消息

定时/延时消息是Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。

顺序消息

顺序消息是Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

example

安装第三方库:
go get github.com/apache/rocketmq-client-go/v2

发送普通消息

  1. // 发送普通消息
  2. func TestSendSimple(t *testing.T) {
  3. p, _ := rocketmq.NewProducer(
  4. producer.WithNameServer([]string{"127.0.0.1:9876"}),
  5. )
  6. err := p.Start()
  7. if err != nil {
  8. fmt.Printf("start producer error: %s", err.Error())
  9. os.Exit(1)
  10. }
  11. topic := "test"
  12. for i := 0; i < 10; i++ {
  13. msg := primitive.NewMessage(topic, []byte("Hello RocketMQ Go Client! "+strconv.Itoa(i)))
  14. res, err := p.SendSync(context.Background(), msg)
  15. if err != nil {
  16. fmt.Printf("send message error: %s\n", err)
  17. } else {
  18. fmt.Printf("send message success: result=%s\n", res.String())
  19. }
  20. }
  21. err = p.Shutdown()
  22. if err != nil {
  23. fmt.Printf("shutdown producer error: %s", err.Error())
  24. }
  25. }

消息接收

订阅来自某个topic的消息 c.Subscribe(topic,consumer.MessageSelector{},func)

  1. // 消息接收
  2. func TestConsumer(t *testing.T) {
  3. sig := make(chan os.Signal)
  4. c, _ := rocketmq.NewPushConsumer(
  5. consumer.WithNameServer([]string{"127.0.0.1:9876"}),
  6. consumer.WithGroupName("test-group"),
  7. )
  8. err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
  9. msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
  10. for i := range msgs {
  11. fmt.Printf("subscribe callback: %v \n", msgs[i])
  12. }
  13. return consumer.ConsumeSuccess, nil
  14. })
  15. if err != nil {
  16. fmt.Println(err.Error())
  17. }
  18. // Note: start after subscribe
  19. err = c.Start()
  20. if err != nil {
  21. fmt.Println(err.Error())
  22. os.Exit(-1)
  23. }
  24. <-sig
  25. err = c.Shutdown()
  26. if err != nil {
  27. fmt.Printf("shutdown Consumer error: %s", err.Error())
  28. }
  29. }

发送延迟消息

需要传递延迟消息的时间levelmsg.WithDelayTimeLevel()

  1. // 发送延迟消息
  2. // DelayTimeLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  3. func TestSendDelay(t *testing.T) {
  4. p, _ := rocketmq.NewProducer(
  5. producer.WithNameServer([]string{"127.0.0.1:9876"}),
  6. )
  7. err := p.Start()
  8. if err != nil {
  9. fmt.Printf("start producer error: %s", err.Error())
  10. os.Exit(1)
  11. }
  12. topic := "test"
  13. for i := 0; i < 10; i++ {
  14. msg := primitive.NewMessage(topic, []byte("delay message: Hello RocketMQ Go Client! "+strconv.Itoa(i)))
  15. // 延迟
  16. msg.WithDelayTimeLevel(2)
  17. res, err := p.SendSync(context.Background(), msg)
  18. if err != nil {
  19. fmt.Printf("send message error: %s\n", err)
  20. } else {
  21. fmt.Printf("send message success: result=%s\n", res.String())
  22. }
  23. }
  24. err = p.Shutdown()
  25. if err != nil {
  26. fmt.Printf("shutdown producer error: %s", err.Error())
  27. }
  28. }

发送事务消息

参数传递需要实现ExecuteLocalTransactionCheckLocalTransaction方法的结构体参数

  1. // 发送事务消息
  2. func TestSendTransaction(t *testing.T) {
  3. p, _ := rocketmq.NewTransactionProducer(
  4. &OrderListener{},
  5. producer.WithNameServer([]string{"127.0.0.1:9876"}),
  6. )
  7. err := p.Start()
  8. if err != nil {
  9. fmt.Printf("start producer error: %s", err.Error())
  10. os.Exit(1)
  11. }
  12. topic := "test-transaction"
  13. for i := 0; i < 1; i++ {
  14. msg := primitive.NewMessage(topic, []byte("transaction message: Hello RocketMQ Go Client! "+strconv.Itoa(i)))
  15. res, err := p.SendMessageInTransaction(context.Background(), msg)
  16. if err != nil {
  17. fmt.Printf("send message error: %s\n", err)
  18. } else {
  19. fmt.Printf("send message success: result=%s\n", res.String())
  20. }
  21. }
  22. select {}
  23. err = p.Shutdown()
  24. if err != nil {
  25. fmt.Printf("shutdown producer error: %s", err.Error())
  26. }
  27. }
  28. type OrderListener struct {
  29. }
  30. // 正常发送消息执行
  31. func (receiver *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
  32. strings := make(chan string)
  33. go func() {
  34. strings <- "success"
  35. time.Sleep(time.Second * 1)
  36. strings <- "error"
  37. time.Sleep(time.Second * 2)
  38. strings <- "panic"
  39. close(strings)
  40. }()
  41. for s := range strings {
  42. if s == "success" {
  43. // 逻辑成功
  44. fmt.Println("success")
  45. // return primitive.CommitMessageState
  46. } else if s == "error" {
  47. // 逻辑失败
  48. fmt.Println("error")
  49. // return primitive.RollbackMessageState
  50. } else {
  51. // 故障 执行回查CheckLocalTransaction
  52. fmt.Println("panic")
  53. }
  54. }
  55. return primitive.CommitMessageState
  56. }
  57. // 代码异常、服务宕机、以及其他可能出现的意料之外的异常情况,会执行回查
  58. func (receiver *OrderListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {
  59. fmt.Println("rollback message")
  60. return primitive.RollbackMessageState
  61. }