先看一下下面这段代码

  1. from celery import Celery
  2. broker_url = 'amqp://guest:guest@127.0.0.1:5672//'
  3. def get_celery_client(broker_url=broker_url,
  4. broker_ssl_enabled=False,
  5. broker_ssl_cert_path=None,
  6. max_retries=None):
  7. from celery import Celery
  8. celery_client = Celery(broker=broker_url,
  9. backend=broker_url)
  10. celery_client.conf.update(
  11. CELERY_TASK_RESULT_EXPIRES=600)
  12. if broker_ssl_enabled:
  13. import ssl
  14. celery_client.conf.BROKER_USE_SSL = {
  15. 'ca_certs': broker_ssl_cert_path,
  16. 'cert_reqs': ssl.CERT_REQUIRED,
  17. }
  18. celery_client.pool.connection.ensure_connection(max_retries=max_retries)
  19. return celery_client
  20. def ping(worker=None):
  21. app = get_celery_client(broker_url, max_retries=3)
  22. try:
  23. destination = [worker] if worker else []
  24. inspect = app.control.inspect(destination=[worker],
  25. timeout=10)
  26. resp = inspect.ping()
  27. except BaseException:
  28. return None
  29. finally:
  30. if app:
  31. app.close()
  32. return resp
  33. resp = ping('celery@test_queue')
  34. print(resp)

乍一看代码36行app.control.inspect(destination=[worker])已经指定了要检查的目标队列
实际上,inspect.ping()方法仍然会将ping消息发送给所有bind celery.pidbox 这个exchange的queue
以下从两个方向分析为什么ping消息是广播的:

  1. 查看RabbitMQ的exchange列表,找到celery.pidbox这个exchange,进到详情页面,可以看到celery.pidbox这个exchange 类型为fanout(广播类型)

image.png

  1. 查看celery源码
  • celery.app.control:Inspect.ping() ```python class Inspect(object): “””API for app.control.inspect.”””

…省略无关代码…

  1. def _request(self, command, **kwargs):
  2. return self._prepare(self.app.control.broadcast(
  3. command,
  4. arguments=kwargs,
  5. destination=self.destination,
  6. callback=self.callback,
  7. connection=self.connection,
  8. limit=self.limit,
  9. timeout=self.timeout, reply=True,
  10. pattern=self.pattern, matcher=self.matcher,
  11. ))

…省略无关代码…

  1. def ping(self, destination=None):
  2. return self._request('ping')
  1. ping方法调用了app.control.broadcast方法<br />看到这里你还是会有些诧异,“broadcast方法里面不是可以指定destination嘛,为什么还是广播的呢?”,那么按图索骥,看看这个broadcast到底怎么工作的。
  2. - `celery.app.control:Control.broadcast()`
  3. ```python
  4. class Control(object):
  5. """Worker remote control client."""
  6. # ...省略无关代码...
  7. def broadcast(self, command, arguments=None, destination=None,
  8. connection=None, reply=False, timeout=1.0, limit=None,
  9. callback=None, channel=None, pattern=None, matcher=None,
  10. **extra_kwargs):
  11. """Broadcast a control command to the celery workers.
  12. Arguments:
  13. command (str): Name of command to send.
  14. arguments (Dict): Keyword arguments for the command.
  15. destination (List): If set, a list of the hosts to send the
  16. command to, when empty broadcast to all workers.
  17. connection (kombu.Connection): Custom broker connection to use,
  18. if not set, a connection will be acquired from the pool.
  19. reply (bool): Wait for and return the reply.
  20. timeout (float): Timeout in seconds to wait for the reply.
  21. limit (int): Limit number of replies.
  22. callback (Callable): Callback called immediately for
  23. each reply received.
  24. pattern (str): Custom pattern string to match
  25. matcher (Callable): Custom matcher to run the pattern to match
  26. """
  27. with self.app.connection_or_acquire(connection) as conn:
  28. arguments = dict(arguments or {}, **extra_kwargs)
  29. if pattern and matcher:
  30. # tests pass easier without requiring pattern/matcher to
  31. # always be sent in
  32. return self.mailbox(conn)._broadcast(
  33. command, arguments, destination, reply, timeout,
  34. limit, callback, channel=channel,
  35. pattern=pattern, matcher=matcher,
  36. )
  37. else:
  38. return self.mailbox(conn)._broadcast(
  39. command, arguments, destination, reply, timeout,
  40. limit, callback, channel=channel,
  41. )

我们可以继续查看mailbox._broadcast

  • kombu.pidbox:Mailbox._broadcast

    1. class Mailbox(object)
    2. def _publish(self, type, arguments, destination=None,
    3. reply_ticket=None, channel=None, timeout=None,
    4. serializer=None, producer=None):
    5. message = {'method': type,
    6. 'arguments': arguments,
    7. 'destination': destination}
    8. chan = channel or self.connection.default_channel
    9. exchange = self.exchange
    10. if reply_ticket:
    11. maybe_declare(self.reply_queue(channel))
    12. message.update(ticket=reply_ticket,
    13. reply_to={'exchange': self.reply_exchange.name,
    14. 'routing_key': self.oid})
    15. serializer = serializer or self.serializer
    16. with self.producer_or_acquire(producer, chan) as producer:
    17. producer.publish(
    18. message, exchange=exchange.name, declare=[exchange],
    19. headers={'clock': self.clock.forward(),
    20. 'expires': time() + timeout if timeout else 0},
    21. serializer=serializer,
    22. )
    23. def _broadcast(self, command, arguments=None, destination=None,
    24. reply=False, timeout=1, limit=None,
    25. callback=None, channel=None, serializer=None):
    26. if destination is not None and \
    27. not isinstance(destination, (list, tuple)):
    28. raise ValueError(
    29. 'destination must be a list/tuple not {0}'.format(
    30. type(destination)))
    31. arguments = arguments or {}
    32. reply_ticket = reply and uuid() or None
    33. chan = channel or self.connection.default_channel
    34. # Set reply limit to number of destinations (if specified)
    35. if limit is None and destination:
    36. limit = destination and len(destination) or None
    37. serializer = serializer or self.serializer
    38. self._publish(command, arguments, destination=destination,
    39. reply_ticket=reply_ticket,
    40. channel=chan,
    41. timeout=timeout,
    42. serializer=serializer)
    43. if reply_ticket:
    44. return self._collect(reply_ticket, limit=limit,
    45. timeout=timeout,
    46. callback=callback,
    47. channel=chan)

    从这个_broadcast方法可以看出来,destination参数传给了publish方法,publish方法将destination放入了message body中,并且在调用producer的publish方法时未指定任何routing_key,消息直接发送给了celery.pidbox这个exchange。
    从代码层面分析得知,inspect.ping()方法是将ping消息广播给了所有绑定在celery.pidbox这个exchange上的queue。