开始
想要搞通SimpleMessageListenerContainer的原理,就要深入到源码里面。
发现,简单的消息容器,继承了抽象的消息容器
我们再点进去这个抽象的消息容器
ApplicationContextAware这个接口我们比较熟悉了。
这个接口里面有setApplicationContext方法。
最关键的接口是MessageListenerContainer
这个接口,它又继承了SmartLifecycle
Lifecycle是Spring框架的接口。
它有一个start方法。在这个Bean被交给Spring框架的时候,这个Bean被启动的时候,Spring框架会自动的调用这个Bean的或者叫做这个对象的start方法。
我们一直往上找到Lifecycle就表示,这个简单消息容器会被调用它的start的方法在启用的时候。
我们就去找它的structure里面去找,找他有没有start方法。
没有找到start方法,说明start方法在它的父类里面实现的。
我们进入到这个抽象类里面
进入这里再找start方法
找到了start方法,还可以看到是继承自Lifecycle这个接口。
双击打开这个start方法,这个方法逻辑是什么呢?判断这个Bean有没有被初始化。
这里是最重要的 ,它调用了doStart方法
我们按住arl+alt+B看下实现
我们用的这个SimpleMessageListenerContainer简单消息监听容器也实现了doStart .所以它调用的时候会自动调用到子类的doStart方法。
我们来看下子类的doStart方法做了什么。首先它调用了父类的doStart方法。
我们点进去initializeConsumers
它新建了一个HashSet
然后在HashSet里面createBlockingQueueComsumer
看下createBlockingQueueComsumer
专业的消费者,包装了有关这个消息Broker的connection。就是它将spring boot应用和rabbitMQ之间的链接的消息和信息都包装起来了。而且它有自己的生命周期。就是说我们所有的监听类都要通过在这个BlockingQueueConsumer来实现。它包装了connection和channel
我们new出来的consumer被返回回去了。
返到了这个地方
也就是说,这个这个consumers是SimpleMessageListenerContainer的成员,里面持有了一堆BlockingQueueConsumer类。BlockingQueueConsumer这个类里面又封装了我们跟真正的rabbitMQ通信的channel和connection
这里new了一些 AsyncMessageProcessingConsumer
进入类AsyncMessageProcessingConsumer
我们发现AsyncMessageProcessingConsumer是SimpleMessageListenerContainer的内部类
内部类有两个成员, 持有BlockingQueueConsumer .这列的主要作用是持有我们新建的BlockingQueueConsumer
返回的是父类里面的成员。就是Executor 线程池这个类。
这个类的父类有线程池,可以执行内部类叫做AsyncMessageProcessingConsumer。也就是说这个线程池是由Spring boot的类,给我们实现的。不需要我们再实现线程池了。
我们回到这个内部类。实现了Runnable
Runnable接口里面有run方法。也就是这个内部类放到线程池里面 它的run方法会自动的执行。
我们的SimpleMessageListenerContainer被新建了之后会自动的执行它的start方法,start执行执行的时候会新建BlockingQueueConsumer。
BlockingQueueConsumer它持有了和RabbitMQ通信的channel用它来通信。
用它来通信呢,我们又启用了内部类。内部类实现了Runable接口,它被放进线程池里面进行循环的调用。调用这个run方法
run方法里面有个 initialize方法。
进入这个初始化方法
调用了comsumer.start方法
这里的consumer.start方法调用的就是 BlockingQueueComsumer内的start方法。
BlockingQueueComsumer有自己的声明周期。调用start就是开始了。
start方法里面最后调用了
再往下找看到了熟悉的channel.basicComsumer
也就是说我们的SimpleMessageListenerContianer它持有了BlockingQueueComsumer类,BlockingQueueComsumer这个类负责监听rabbitMQ的一切的链接操作,
BlockingQueueComsumer里面的start方法。在初始化的时候会被定义
会调用
进入这个方法再往下找。
最后调用了 channel.basicConsume
它的回调方法
这里内部类里面有个 handleDelivery是真正的底层的回调方法,这个方法在收到消息的时候会被回调。
它的核心就是这一行代码。它将收到的新消息压到queue这个成员里面。
这个类成员就是BlockingQueue一个阻塞的队列。也就是我们收到的消息会压到这个阻塞队列里面。
那么这个阻塞队列什么时候被取用呢?我们要回到SimpleMessageListenerContainer找到run方法这里。run方法会被调用,
调用的时候,首先它会initialize方法 就是初始化。初始化的时候我们把BlockingQueueComsumer做了channel.basicComsume把它的回调函数也注册上了。每一次它收到新消息就会把消息压到BlockQueue阻塞的队列里面去。
压到BlockQueue里面之后,进入死循环,然后调用了mainLoop这个主循环。
主循环这个方法就是我们的消费者起来之后,不断的被循环调用的方法。
中间的关键代码
进入方法,往下找
又调用了comsumer的nextMessage方法
这个consumer就是BlockingQueueComsumer这个类
再点进来
最核心的代码在这里。this.queue.poll就是从队列里面拿出来。也就是说它将这个queue对象(也就是BlockingQueueConsumer这个对象,)将消息的接收和取出,完全的分离开了。消息的接收是通过basicConsume接收,但是等他去用的时候,是通过SimpleMessageListenerContainer里面的内部类的 mainLoop方法,去取用这个队列里面的消息,也就是说不光在RabbitMQ中有个主队列。Spring Boot在我们的应用里面,在内存里面,它做了一个抽象的子队列。这个抽象队列基本是作为缓冲和缓存来使用的。也就是当这个RabbitMQ有消息的时候,它先把消息取到我们的内存里面,然后等待消息进一步的被消费
消息从我们本地缓存取出来之后。它是怎么调用会我们的onMessage方法的
我们返回到这里
走到这里的executeListener的方法。
点击那里继续往下找
doExecuteListener这是核心方法,再进去
再往下找
最后调用了 invokeListener
点进去发现他是一个 函数式的接口。所以它不是方法体。
this.delegate是什么
如果类型是MessageListener类型的
最后调用了listener.onMessage方法
最后调用的就是这个方法