开始
它是一个装监听器的容器。
监听消息数量,不是它监听几个队列。而是指的是他有并发的多少线程一起去监听消息。这就相当于我们最早实现的线程池。现在它可以自动的去配置。
运行中修改监听器的配置,修改后可以立即生效。
代码实现
下面来做接收消息的工作。先把我们老的监听消息的地方给屏蔽掉。
deliverCallback
在这里注册的
我们把这里注释掉,deliverCallBack这个函数就不会再被调用了。
我们用的是这个channelConsume的底层的操作。以后讲原理的时候,这个地方很重要。
新建SimpleMessageLintenerContiner
首先把这个类的对象给new 出来
它有两个构造 一个需要传connectionFactory
注入ConnectionFactory,这里加不加@Autowired都可以的。
传入ConnectionFactory
new出来之后,我们还需要进行配置。看到它可以set这么一堆东西。
setQueueNames传入的是3个点 可变长度的的参数,也就是说可以传一个stirng 也可以传多个String也就是说它可以监听多个队列。
我们只需要监听这一个队列就可以了
并发的消费者。也就是它同时有几个消费者线程在消费这个队列。这个是可以限制的。相当于我们以前线程池里面做的线程数。
这里我们设置3 ,一般的微服务业务,不需要给太多。
最大并发消费者。
需要传一个枚举
先让它自动确认。
实际的回调方法。
New一个MessageListener后,把它自动的变成了一个匿名类。
又是一个函数式的接口。
这样受到消息后,业务逻辑在这里写
之前的业务逻辑,我们是写在这里的。
这里我们只是打印一个日志
最终返回这个。这样一个最简单的消息容器就设置完成了,它可以设置很多的参数。也可以设置回调的方法去监听容器。非常的方便。
测试
上加微服务一直启动着
运行订单服务。
手动确认
设置手动的确认
之前我们做消息确认的时候,可以拿到这个channel。因为我们的消息接收用的就是channel.basic. 现在spring的amqp的包封装了底层的一些实现。那么我们怎么拿到channel?
看看这个MessageListener接口里面有没有其他的实现或者继承。看看有没有能够拿到channel的
cltr+alt+p还是B的
发现这个。这是个能拿到channel的message监听。
多了一个入参channel
不管打log还要对消息进行确认。用channel.basicAck做消息的确认。然后业务流程就可以正常的跑了。
重启项目测试
postman发请求测试
发送了消息 ,同时收到了商家服务回发的消息
我现在要确认它的ack。打开queue.order这个队列。
所有的消息都被正确的消费了和正确的ack了。说明我们的手动确认机制也生效了。
我们实现了消费端确认的高级特性。
消费端的限流
限流每次1个消息
重启order服务
查看log发现运行没有问题。
那么怎么证明我们确实设置了限流呢?怎么确定哪几个channel是我们使用的呢?
我们刚才设置的preFetch是1. 说明我设置的参数已经生效了。