先看一下下面这段代码
from celery import Celerybroker_url = 'amqp://guest:guest@127.0.0.1:5672//'def get_celery_client(broker_url=broker_url,broker_ssl_enabled=False,broker_ssl_cert_path=None,max_retries=None):from celery import Celerycelery_client = Celery(broker=broker_url,backend=broker_url)celery_client.conf.update(CELERY_TASK_RESULT_EXPIRES=600)if broker_ssl_enabled:import sslcelery_client.conf.BROKER_USE_SSL = {'ca_certs': broker_ssl_cert_path,'cert_reqs': ssl.CERT_REQUIRED,}celery_client.pool.connection.ensure_connection(max_retries=max_retries)return celery_clientdef ping(worker=None):app = get_celery_client(broker_url, max_retries=3)try:destination = [worker] if worker else []inspect = app.control.inspect(destination=[worker],timeout=10)resp = inspect.ping()except BaseException:return Nonefinally:if app:app.close()return respresp = ping('celery@test_queue')print(resp)
乍一看代码36行app.control.inspect(destination=[worker])已经指定了要检查的目标队列
实际上,inspect.ping()方法仍然会将ping消息发送给所有bind celery.pidbox 这个exchange的queue
以下从两个方向分析为什么ping消息是广播的:
- 查看RabbitMQ的exchange列表,找到celery.pidbox这个exchange,进到详情页面,可以看到celery.pidbox这个exchange 类型为fanout(广播类型)

- 查看celery源码
celery.app.control:Inspect.ping()```python class Inspect(object): “””API for app.control.inspect.”””
…省略无关代码…
def _request(self, command, **kwargs):return self._prepare(self.app.control.broadcast(command,arguments=kwargs,destination=self.destination,callback=self.callback,connection=self.connection,limit=self.limit,timeout=self.timeout, reply=True,pattern=self.pattern, matcher=self.matcher,))
…省略无关代码…
def ping(self, destination=None):return self._request('ping')
ping方法调用了app.control.broadcast方法<br />看到这里你还是会有些诧异,“broadcast方法里面不是可以指定destination嘛,为什么还是广播的呢?”,那么按图索骥,看看这个broadcast到底怎么工作的。- `celery.app.control:Control.broadcast()````pythonclass Control(object):"""Worker remote control client."""# ...省略无关代码...def broadcast(self, command, arguments=None, destination=None,connection=None, reply=False, timeout=1.0, limit=None,callback=None, channel=None, pattern=None, matcher=None,**extra_kwargs):"""Broadcast a control command to the celery workers.Arguments:command (str): Name of command to send.arguments (Dict): Keyword arguments for the command.destination (List): If set, a list of the hosts to send thecommand to, when empty broadcast to all workers.connection (kombu.Connection): Custom broker connection to use,if not set, a connection will be acquired from the pool.reply (bool): Wait for and return the reply.timeout (float): Timeout in seconds to wait for the reply.limit (int): Limit number of replies.callback (Callable): Callback called immediately foreach reply received.pattern (str): Custom pattern string to matchmatcher (Callable): Custom matcher to run the pattern to match"""with self.app.connection_or_acquire(connection) as conn:arguments = dict(arguments or {}, **extra_kwargs)if pattern and matcher:# tests pass easier without requiring pattern/matcher to# always be sent inreturn self.mailbox(conn)._broadcast(command, arguments, destination, reply, timeout,limit, callback, channel=channel,pattern=pattern, matcher=matcher,)else:return self.mailbox(conn)._broadcast(command, arguments, destination, reply, timeout,limit, callback, channel=channel,)
我们可以继续查看mailbox._broadcast
kombu.pidbox:Mailbox._broadcastclass Mailbox(object)def _publish(self, type, arguments, destination=None,reply_ticket=None, channel=None, timeout=None,serializer=None, producer=None):message = {'method': type,'arguments': arguments,'destination': destination}chan = channel or self.connection.default_channelexchange = self.exchangeif reply_ticket:maybe_declare(self.reply_queue(channel))message.update(ticket=reply_ticket,reply_to={'exchange': self.reply_exchange.name,'routing_key': self.oid})serializer = serializer or self.serializerwith self.producer_or_acquire(producer, chan) as producer:producer.publish(message, exchange=exchange.name, declare=[exchange],headers={'clock': self.clock.forward(),'expires': time() + timeout if timeout else 0},serializer=serializer,)def _broadcast(self, command, arguments=None, destination=None,reply=False, timeout=1, limit=None,callback=None, channel=None, serializer=None):if destination is not None and \not isinstance(destination, (list, tuple)):raise ValueError('destination must be a list/tuple not {0}'.format(type(destination)))arguments = arguments or {}reply_ticket = reply and uuid() or Nonechan = channel or self.connection.default_channel# Set reply limit to number of destinations (if specified)if limit is None and destination:limit = destination and len(destination) or Noneserializer = serializer or self.serializerself._publish(command, arguments, destination=destination,reply_ticket=reply_ticket,channel=chan,timeout=timeout,serializer=serializer)if reply_ticket:return self._collect(reply_ticket, limit=limit,timeout=timeout,callback=callback,channel=chan)
从这个_broadcast方法可以看出来,destination参数传给了publish方法,publish方法将destination放入了message body中,并且在调用producer的publish方法时未指定任何routing_key,消息直接发送给了celery.pidbox这个exchange。
从代码层面分析得知,inspect.ping()方法是将ping消息广播给了所有绑定在celery.pidbox这个exchange上的queue。
