2017-04-07 15:53

    python

    之前在用celery的时候,通常都是把任务直接丢进broker里异步处理的。今天网上看到篇文章说应该根据任务分类的不同分别使用不同的队列queue来处理。譬如说两种不同的任务,一个量多但不关键,一个量少但较为重要,如果放在同一个queue中,那么或多或少会带来相互之间的影响。所以应该将任务一放到queue1,而将另外的一个任务放在queue2中。这种方式我感觉很有道理诶。。。

    然后查了下文档,用官方文档上的例子学习并记录下来。

    目录结构如下

    1. proj
    2. ├── celery.py
    3. ├── celery.pyc
    4. ├── __init__.py
    5. ├── __init__.pyc
    6. ├── tasks.py
    7. └── tasks.pyc
    8. 0 directories, 6 files

    主要的celery.py代码如下

    1. #!/usr/bin/env python
    2. #-*- encoding: utf-8 -*-
    3. #celery.py
    4. from __future__ import absolute_import, unicode_literals
    5. from celery import Celery
    6. app = Celery('proj',
    7. broker='redis://192.168.188.129:9379/0',
    8. #broker='amqp://',
    9. backend='amqp://',
    10. include=['proj.tasks'])
    11. # Optional configuration, see the application user guide.
    12. app.conf.update(
    13. result_expires=3600,
    14. task_routes = {
    15. 'proj.tasks.add': {'queue': 'addtask'},
    16. 'proj.tasks.mul': {'queue': 'multask'},
    17. },
    18. )
    19. if __name__ == '__main__':
    20. app.start()

    tasks.py的代码如下

    1. #!/usr/bin/env python
    2. #-*- encoding: utf-8 -*-
    3. #tasks.py
    4. from __future__ import absolute_import, unicode_literals
    5. from .celery import app
    6. @app.task
    7. def add(x, y):
    8. return x + y
    9. @app.task
    10. def mul(x, y):
    11. return x * y
    12. @app.task
    13. def xsum(numbers):
    14. return sum(numbers)

    然后我们分别起两个worker,一个对应队列addtask,另外一个对应队列multask

    1. [hu@server01 celery]$ pwd
    2. /home/hu/celery
    3. [hu@server01 celery]$ celery -A proj worker --loglevel=info -Q addtask
    4. [hu@server01 celery]$ celery -A proj worker --loglevel=info -Q multask

    然后试着用下

    1. >>> from proj import tasks
    2. >>> tasks.add.apply_async((4,49),queue='addtask')
    3. <AsyncResult: 02793a5e-c1f6-41cd-8160-f20a5f7b652c>
    4. >>> tasks.mul.apply_async((2,7),queue='addtask')
    5. <AsyncResult: 94ef93e2-b2b1-4351-8d4e-9bb8b1e40092>
    6. >>> tasks.mul.apply_async((2,7),queue='multask')
    7. <AsyncResult: b0cdf415-403d-4ac8-9391-1c4ccae55708>

    这样就能够在前面起的两个worker打印的日志上分别看到他们是由指定的不同queue上取任务来工作的。

    1. #queue为 addtask的worker
    2. [2017-04-07 15:46:57,760: INFO/MainProcess] Received task: proj.tasks.add[02793a5e-c1f6-41cd-8160-f20a5f7b652c]
    3. [2017-04-07 15:46:57,808: INFO/PoolWorker-1] Task proj.tasks.add[02793a5e-c1f6-41cd-8160-f20a5f7b652c] succeeded in 0.00718647900067s: 53
    4. [2017-04-07 15:47:17,840: INFO/MainProcess] Received task: proj.tasks.mul[94ef93e2-b2b1-4351-8d4e-9bb8b1e40092]
    5. [2017-04-07 15:47:17,911: INFO/PoolWorker-1] Task proj.tasks.mul[94ef93e2-b2b1-4351-8d4e-9bb8b1e40092] succeeded in 0.0686908099997s: 14
    6. #queue为 multask的worker
    7. [2017-04-07 15:47:26,959: INFO/MainProcess] Received task: proj.tasks.mul[b0cdf415-403d-4ac8-9391-1c4ccae55708]
    8. [2017-04-07 15:47:26,997: INFO/PoolWorker-1] Task proj.tasks.mul[b0cdf415-403d-4ac8-9391-1c4ccae55708] succeeded in 0.0344336039998s: 14