RocketMQ 是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在 2016 年底捐赠给 Apache 开源基金会成为孵化项目,经过不到一年时间正式成为了 Apache 顶级项目;早期阿里曾经基于 ActiveMQ 研发消息系统, 随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过 Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了 RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ 和 Kafka 在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ 默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。

    Cloudopt Next 为 RocketMQ 提供了插件,您只需要在配置文件中配置后并启动插件既可。

    在使用前请先自行引用相应的依赖,请自行添加版本号。

    RocketMQ 是 3.0.0.0-DEBUG 引入的,请确保版本高于等于 3.0.0.0-DEBUG.

    1. <dependency>
    2. <groupId>net.cloudopt.next</groupId>
    3. <artifactId>cloudopt-next-rocketmq</artifactId>
    4. <version>${version}</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.rocketmq</groupId>
    8. <artifactId>rocketmq-client</artifactId>
    9. <version>${version}</version>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.apache.rocketmq</groupId>
    13. <artifactId>rocketmq-acl</artifactId>
    14. <version>${version}</version>
    15. </dependency>

    在配置文件中仅需要根据自己需要增加 producer 的配置或者 consumer 配置即可。如果配置了 producer 会在插件启动时自动初始化 producer 实例。如果配置了 consumer 会在启动时自动初始化 consumer 实例。配置兼容大部分的 RocketMQ 的配置。详情见 ProducerConfigConsumerConfig

    1. {
    2. "packageName": "net.cloudopt.next.rocketmq.test",
    3. "port": 8888,
    4. "rocketmq": {
    5. "producer": {
    6. "groupName": "TEST_PRODUCER_GROUP",
    7. "namesrvAddr": "127.0.0.1:9876"
    8. },
    9. "consumer": {
    10. "groupName": "TEST_CONSUMER_GROUP",
    11. "namesrvAddr": "127.0.0.1:9876"
    12. }
    13. }
    14. }

    如果你需要 ACL 鉴权,可以参考下面的配置。

    1. {
    2. "packageName": "net.cloudopt.next.rocketmq.test",
    3. "port": 8888,
    4. "rocketmq": {
    5. "producer": {
    6. "groupName": "TEST_PRODUCER_GROUP",
    7. "namesrvAddr": "127.0.0.1:9876",
    8. "accessKey": "",
    9. "accessSecret": ""
    10. },
    11. "consumer": {
    12. "groupName": "TEST_CONSUMER_GROUP",
    13. "namesrvAddr": "127.0.0.1:9876",
    14. "accessKey": "",
    15. "accessSecret": ""
    16. }
    17. }
    18. }
    1. fun main(args: Array<String>) {
    2. NextServer.addPlugin(RocketMQPlugin())
    3. NextServer.run()
    4. }

    生产者和消费者的实例都是 RocketMQManager 中管理的。

    1. RocketMQManager.producer
    2. RocketMQManager.consumer

    如果你要发送一条普通的消息可以直接调用 RocketMQManager 中的 producer 来发送。

    1. val msg = Message(
    2. "test-topic" /* Topic */,
    3. "TagA" /* Tag */,
    4. "Hello RocketMQ".toByteArray(charset = Charsets.UTF_8)
    5. )
    6. val sendResult: SendResult = RocketMQManager.producer.send(msg)
    7. RocketMQManager.producer.send(msg)

    同理其它的需要操作生产者消费者实例的都可以这样操作。

    为了方便大家使用,我们实现了消费者自动注册。任意一个类只要实现了 RocketMQListener 接口并放置了@AutoRocketMQ 注解,就会自动监听 @AutoRocketMQ 中设置的主题名。并在消息到达时自动执行。目前支持并发消息监听和顺序消息监听。如果需要监听多个主题,可以用英文逗号间隔。

    1. @AutoRocketMQ("test-topic")
    2. class TestListener : RocketMQListener {
    3. override fun listener(msg: MessageExt) {
    4. println("Receive New Messages: $msg")
    5. }
    6. }

    @AutoRocketMQ 同时支持设置 subExpression。

    1. @AutoRocketMQ("test-topic","*")
    2. class TestListener : RocketMQListener {
    3. override fun listener(msg: MessageExt) {
    4. println("Receive New Messages: $msg")
    5. }
    6. }

    您还可以在配置文件中进行更多设置,具体参数名请参考 RocketMQ 官方文档。