RocketMQ 是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在 2016 年底捐赠给 Apache 开源基金会成为孵化项目,经过不到一年时间正式成为了 Apache 顶级项目;早期阿里曾经基于 ActiveMQ 研发消息系统, 随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过 Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了 RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ 和 Kafka 在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ 默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。
Cloudopt Next 为 RocketMQ 提供了插件,您只需要在配置文件中配置后并启动插件既可。
在使用前请先自行引用相应的依赖,请自行添加版本号。
RocketMQ 是 3.0.0.0-DEBUG 引入的,请确保版本高于等于 3.0.0.0-DEBUG.
<dependency>
<groupId>net.cloudopt.next</groupId>
<artifactId>cloudopt-next-rocketmq</artifactId>
<version>${version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${version}</version>
</dependency>
在配置文件中仅需要根据自己需要增加 producer 的配置或者 consumer 配置即可。如果配置了 producer 会在插件启动时自动初始化 producer 实例。如果配置了 consumer 会在启动时自动初始化 consumer 实例。配置兼容大部分的 RocketMQ 的配置。详情见 ProducerConfig 或 ConsumerConfig。
{
"packageName": "net.cloudopt.next.rocketmq.test",
"port": 8888,
"rocketmq": {
"producer": {
"groupName": "TEST_PRODUCER_GROUP",
"namesrvAddr": "127.0.0.1:9876"
},
"consumer": {
"groupName": "TEST_CONSUMER_GROUP",
"namesrvAddr": "127.0.0.1:9876"
}
}
}
如果你需要 ACL 鉴权,可以参考下面的配置。
{
"packageName": "net.cloudopt.next.rocketmq.test",
"port": 8888,
"rocketmq": {
"producer": {
"groupName": "TEST_PRODUCER_GROUP",
"namesrvAddr": "127.0.0.1:9876",
"accessKey": "",
"accessSecret": ""
},
"consumer": {
"groupName": "TEST_CONSUMER_GROUP",
"namesrvAddr": "127.0.0.1:9876",
"accessKey": "",
"accessSecret": ""
}
}
}
fun main(args: Array<String>) {
NextServer.addPlugin(RocketMQPlugin())
NextServer.run()
}
生产者和消费者的实例都是 RocketMQManager 中管理的。
RocketMQManager.producer
RocketMQManager.consumer
如果你要发送一条普通的消息可以直接调用 RocketMQManager 中的 producer 来发送。
val msg = Message(
"test-topic" /* Topic */,
"TagA" /* Tag */,
"Hello RocketMQ".toByteArray(charset = Charsets.UTF_8)
)
val sendResult: SendResult = RocketMQManager.producer.send(msg)
RocketMQManager.producer.send(msg)
同理其它的需要操作生产者消费者实例的都可以这样操作。
为了方便大家使用,我们实现了消费者自动注册。任意一个类只要实现了 RocketMQListener 接口并放置了@AutoRocketMQ 注解,就会自动监听 @AutoRocketMQ 中设置的主题名。并在消息到达时自动执行。目前支持并发消息监听和顺序消息监听。如果需要监听多个主题,可以用英文逗号间隔。
@AutoRocketMQ("test-topic")
class TestListener : RocketMQListener {
override fun listener(msg: MessageExt) {
println("Receive New Messages: $msg")
}
}
@AutoRocketMQ 同时支持设置 subExpression。
@AutoRocketMQ("test-topic","*")
class TestListener : RocketMQListener {
override fun listener(msg: MessageExt) {
println("Receive New Messages: $msg")
}
}
您还可以在配置文件中进行更多设置,具体参数名请参考 RocketMQ 官方文档。