先看一下下面这段代码
from celery import Celery
broker_url = 'amqp://guest:guest@127.0.0.1:5672//'
def get_celery_client(broker_url=broker_url,
broker_ssl_enabled=False,
broker_ssl_cert_path=None,
max_retries=None):
from celery import Celery
celery_client = Celery(broker=broker_url,
backend=broker_url)
celery_client.conf.update(
CELERY_TASK_RESULT_EXPIRES=600)
if broker_ssl_enabled:
import ssl
celery_client.conf.BROKER_USE_SSL = {
'ca_certs': broker_ssl_cert_path,
'cert_reqs': ssl.CERT_REQUIRED,
}
celery_client.pool.connection.ensure_connection(max_retries=max_retries)
return celery_client
def ping(worker=None):
app = get_celery_client(broker_url, max_retries=3)
try:
destination = [worker] if worker else []
inspect = app.control.inspect(destination=[worker],
timeout=10)
resp = inspect.ping()
except BaseException:
return None
finally:
if app:
app.close()
return resp
resp = ping('celery@test_queue')
print(resp)
乍一看代码36行app.control.inspect(destination=[worker])
已经指定了要检查的目标队列
实际上,inspect.ping()
方法仍然会将ping消息发送给所有bind celery.pidbox 这个exchange的queue
以下从两个方向分析为什么ping消息是广播的:
- 查看RabbitMQ的exchange列表,找到celery.pidbox这个exchange,进到详情页面,可以看到celery.pidbox这个exchange 类型为fanout(广播类型)
- 查看celery源码
celery.app.control:Inspect.ping()
```python class Inspect(object): “””API for app.control.inspect.”””
…省略无关代码…
def _request(self, command, **kwargs):
return self._prepare(self.app.control.broadcast(
command,
arguments=kwargs,
destination=self.destination,
callback=self.callback,
connection=self.connection,
limit=self.limit,
timeout=self.timeout, reply=True,
pattern=self.pattern, matcher=self.matcher,
))
…省略无关代码…
def ping(self, destination=None):
return self._request('ping')
ping方法调用了app.control.broadcast方法<br />看到这里你还是会有些诧异,“broadcast方法里面不是可以指定destination嘛,为什么还是广播的呢?”,那么按图索骥,看看这个broadcast到底怎么工作的。
- `celery.app.control:Control.broadcast()`
```python
class Control(object):
"""Worker remote control client."""
# ...省略无关代码...
def broadcast(self, command, arguments=None, destination=None,
connection=None, reply=False, timeout=1.0, limit=None,
callback=None, channel=None, pattern=None, matcher=None,
**extra_kwargs):
"""Broadcast a control command to the celery workers.
Arguments:
command (str): Name of command to send.
arguments (Dict): Keyword arguments for the command.
destination (List): If set, a list of the hosts to send the
command to, when empty broadcast to all workers.
connection (kombu.Connection): Custom broker connection to use,
if not set, a connection will be acquired from the pool.
reply (bool): Wait for and return the reply.
timeout (float): Timeout in seconds to wait for the reply.
limit (int): Limit number of replies.
callback (Callable): Callback called immediately for
each reply received.
pattern (str): Custom pattern string to match
matcher (Callable): Custom matcher to run the pattern to match
"""
with self.app.connection_or_acquire(connection) as conn:
arguments = dict(arguments or {}, **extra_kwargs)
if pattern and matcher:
# tests pass easier without requiring pattern/matcher to
# always be sent in
return self.mailbox(conn)._broadcast(
command, arguments, destination, reply, timeout,
limit, callback, channel=channel,
pattern=pattern, matcher=matcher,
)
else:
return self.mailbox(conn)._broadcast(
command, arguments, destination, reply, timeout,
limit, callback, channel=channel,
)
我们可以继续查看mailbox._broadcast
kombu.pidbox:Mailbox._broadcast
class Mailbox(object)
def _publish(self, type, arguments, destination=None,
reply_ticket=None, channel=None, timeout=None,
serializer=None, producer=None):
message = {'method': type,
'arguments': arguments,
'destination': destination}
chan = channel or self.connection.default_channel
exchange = self.exchange
if reply_ticket:
maybe_declare(self.reply_queue(channel))
message.update(ticket=reply_ticket,
reply_to={'exchange': self.reply_exchange.name,
'routing_key': self.oid})
serializer = serializer or self.serializer
with self.producer_or_acquire(producer, chan) as producer:
producer.publish(
message, exchange=exchange.name, declare=[exchange],
headers={'clock': self.clock.forward(),
'expires': time() + timeout if timeout else 0},
serializer=serializer,
)
def _broadcast(self, command, arguments=None, destination=None,
reply=False, timeout=1, limit=None,
callback=None, channel=None, serializer=None):
if destination is not None and \
not isinstance(destination, (list, tuple)):
raise ValueError(
'destination must be a list/tuple not {0}'.format(
type(destination)))
arguments = arguments or {}
reply_ticket = reply and uuid() or None
chan = channel or self.connection.default_channel
# Set reply limit to number of destinations (if specified)
if limit is None and destination:
limit = destination and len(destination) or None
serializer = serializer or self.serializer
self._publish(command, arguments, destination=destination,
reply_ticket=reply_ticket,
channel=chan,
timeout=timeout,
serializer=serializer)
if reply_ticket:
return self._collect(reply_ticket, limit=limit,
timeout=timeout,
callback=callback,
channel=chan)
从这个_broadcast方法可以看出来,destination参数传给了publish方法,publish方法将destination放入了message body中,并且在调用producer的publish方法时未指定任何routing_key,消息直接发送给了celery.pidbox这个exchange。
从代码层面分析得知,inspect.ping()方法是将ping消息广播给了所有绑定在celery.pidbox这个exchange上的queue。