之前的博客中简单的介绍了celery的安装配置以及如何在python程序中使用,这里记录一下我使用django结合celery以及rabbitmq提供web服务,同时使用flower进行监控的过程。至于这几样东西是什么、怎么安装这里就不再细说了。

涉及到的关键点如下:

  1. 如何在django中使用celery?
  2. 如何触发定时任务?
  3. 如何为不同的任务绑定不同的消息队列?
  4. 如何重试出错的任务?
  5. 如何发送报错邮件?
  6. 如何使用flower进行监控?

首先,项目依赖:

1
2
3
celery==3.1.20
Django==1.9.2
flower==0.8.4

注意版本,有可能你看到本文的时候相关配置会有所变化。

首先创建一个空的django项目,我这里就叫djce好了:

1
2
3
4
5
6
7
8
tree djce
djce
├── djce
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── manage.py

接下来在settings.py的同级目录中新建celery.py:

1
2
3
4
5
6
7
8
9
10
11
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djce.settings')
app = Celery('djce')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

修改__init__.py添加:

1
2
from __future__ import absolute_import
from .celery import app as celery_app

修改settings.py添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 添加celery配置
BROKER_URL ='amqp://guest@127.0.0.1//'
CELERY_RESULT_BACKEND = 'amqp://guest@127.0.0.1//'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_IGNORE_RESULT = True
# 启用报错邮件
CELERY_SEND_TASK_ERROR_EMAILS = True
# 分离2个任务队列
from kombu import Exchange, Queue
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('for_task_a', Exchange('for_task_a'), routing_key='for_task_a'),#这个是主动任务的队列
Queue('for_task_b', Exchange('for_task_b'), routing_key='for_task_b'),#这个是定时任务的队列
)
CELERY_ROUTES = {
'task_a': {'queue': 'for_task_a', 'routing_key': 'for_task_a'},
'task_b': {'queue': 'for_task_b', 'routing_key': 'for_task_b'},
}
# 收件人
ADMINS = (
('xxxx', 'xxxx@xxxx.com'),
)
SERVER_EMAIL = 'xxx@xxx.com'
EMAIL_HOST = 'xxxx'
EMAIL_PORT = 25
EMAIL_HOST_USER = 'xxxx@xx.com'
EMAIL_HOST_PASSWORD = 'asdasd'

在上面的设置中,我们启用了报错邮件并设置了2个队列用于接收不同的消息,记得把配置信息修改成你使用的环境相应配置。接下来我们新建一个app,这里就叫mytask好了,并在mytask中新建tasks.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from celery.utils.log import get_task_logger
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.decorators import task
logger = get_task_logger(__name__)
@task(name="task_a", default_retry_delay=5, max_retries=3, bind=True)
def task_a(self):
try:
print "in task_a"
except Exception as e:
# 隔5S重试,最多3次
logger.info(str(e))
raise self.retry(exc=e)
@periodic_task(
run_every=(crontab(minute='*/1')),
name="task_b",
ignore_result=True, bind=True)
def task_b(self):
#每1分钟执行一次
print "in task b"

在views.py中写一个最简单的逻辑即可:

1
2
3
4
5
6
from django.http import HttpResponse
from .tasks import task_a
class MyTask(View):
def get(self, request, *args, **kwargs):
task_a.delay() #发送消息,触发后台任务
return HttpResponse("django and celery!")

编写相应的urls.py并在settings.py中引入我们的mytask后(很简单,这里就不贴出来了),确定rabbitmq服务成功运行后,首先我们打开第一个终端运行django程序:
python manage.py runserver

然后打开第二个终端,运行worker处理task_a:
celery -A djce worker -l info -Q for_task_a

再然后打开第三个终端,运行worker处理task_b:
celery -A djce worker -l info -Q for_task_b

注意使用-Q参数绑定对应的队列。

再再然后打开第四个终端,用于触发我们的计划任务task_b:
celery -A djce beat -l info

至此,一个没卵用但是说明了整个逻辑的程序就完成了,只要我们访问相应的url,task_a的终端就会有输出,并且每分钟task_b的终端也会有输出。

接下来说说监控的flower,想使用这个,首先我们需要在rabbitmq中启用rabbitmq_management并重启服务

1
2
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart

其次我们需要新增一个管理员用户,怎么添加请看rabbitmq的文档。上述工作没问题的话打开第五个终端:
flower -A djce --port=5555 --broker_api=http://rbtuser:passwd@192.168.x.x:15672/api/

其中rbtuser,passwd以及服务器ip根据真实情况修改,这里需要注意的是,必须先启用worker后再启用flower,否则点击worker后会提示找不到对应的worker。

没问题的话,使用浏览器访问5555端口就可以看见flower的界面了。

其它问题:

  1. 如果修改了task的逻辑,必须重启worker才能生效。
  2. 想清空某个队列的话,可以直接访问rabbitmq服务器的15672端口并登录,选择“Queues”后点击“Purge”,注意:“delete”是删除这个队列的意思!
  3. 默认情况下,机器有多少个cpu则worker可以同时处理多少任务,可以在同一台机器上启动多个worker进程。
  4. 需要的终端有点多?请使用supervisor!