Python 消息队列 Celery

名字 官网 备注
celery https://github.com/celery/celery Celery 5.x >= CPython 3.5
huey https://github.com/coleifer/huey 轻量化,支持2.*
dramatiq https://github.com/Bogdanp/dramatiq >=3.5 https://dramatiq.io

设计

任务长度应该尽量小

长耗时任务还带来更新耗时更长的问题。当需要更新程序的时候,由于还有任务在执行,只能使用kill -9 **** 来强制结束进程(带来不可恢复的任务终止,在某些情况下是不可采取的操作)。

不可避免的长耗时任务处理方法

长耗时任务虽然不能说不好但应该尽量避免。
长耗时任务应该被分割开到独立的队列,并启动只处理指定队列的worker。

绑定Queue的task

1
2
3
 @app.task(queue='add')
def add(x, y):
return 2*x + 2*y

启动只处理指定队列的worker

1
celery -A tasks worker -Q add

并发模式

ISSUE

ROOT模式上不能启动

Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!

If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).

单元测试

实例

1
2
3
4
5
6
7
8
9
10
11
12
# file "tasks.py"

from celery import Celery
from time import sleep

BROKER_URL = ‘amqp://guest@localhost:5672//’
REDIS = ‘redis://localhost/0
app = Celery(‘test’, broker=BROKER_URL, backend=REDIS)

@app.task(name=”add”)
def add(x, y):
return x+y

本地调试 Celery 任务逻辑

1
2
3
4
5
6
7
# file "tests.py"

from tasks import add, app
import unittest

def test_local_run():
assert add.run(x=3, y=5) == 8

本地调试 Celery 任务执行状态

1
2
3
4
5
6
7
8
9
10
11
12
13
# file "tests.py"

from tasks import add, app
import unittest

class TestAddTask(unittest.TestCase):
def setUp(self):
self.task = add.apply_async(args=[3, 5])
self.results = self.task.get()
def test_task_state(self):
self.assertEqual(self.task.state, ‘SUCCESS’)
def test_addition(self):
self.assertEqual(self.results, 8)

启动 Celery celery -A tasks worker — loglevel=INFO
启动 单元测试

1
2
3
4
5
➜ python -m unittest discover
..
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Ran 2 tests in 5.353s
OK

调试远程 worker 的任务执行状态

1
2
3
4
5
6
7
8
9
10
11
12
13
# file "tests.py"

from tasks import add, app
import unittest

class TestAddTask(unittest.TestCase):
def setUp(self):
self.task = add.send_task(‘add’, args=[3, 5])
self.results = self.task.get()
def test_task_state(self):
self.assertEqual(self.task.state, ‘SUCCESS’)
def test_addition(self):
self.assertEqual(self.results, 8)
0%