celery使用小记

之前的博客有关于celery以及相关概念的介绍,不过那篇文章没有具体的使用示例,今天补充一下。

目前celery的版本是3.1.19

这里我使用rabbitmq作为broker以及banckend。

首先安装rabbitmq以及celery:

sudo yum install rabbitmq-server
sudo service rabbitmq-server start
pip install celery

官网以及很多文章介绍celery时候为了简单都直接使用的app = Celery('tasks', broker='amqp://guest@localhost//')

这种配置虽然简单,但guest用户只能在本机使用,这样如果我们想进行远程链接就不能用了,所以我们需要新建rabbitmq用户、虚拟host以及权限设置:

sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags myuser mytag
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

然后重启rabbitmq服务。

接下来新建一个tasks.py:

from celery import Celery
import time
app = Celery('tasks', broker='amqp://myuser:mypassword@localhost/myvhost',backend='amqp://guest@localhost//')
@app.task
def add(x, y):
    time.sleep(10)
    return x + y

这里我为了展示不同的配置方式,所以虽然都用amqp但broker用我们新建的用户,backend使用默认用户,真实使用时需要根据实际情况替换。任务中的sleep是为了下面演示阻塞调用的。保存后运行 celery -A tasks worker --loglevel=info

成功运行的话可以看到类似如下的输出:

[2016-01-21 11:50:09,036: WARNING/MainProcess] /usr/lib/python2.7/site-packages/celery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.
The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
 -------------- celery@localhost.localdomain v3.1.19 (Cipater)
---- **** -----
--- * ***  * -- Linux-4.3.3-300.fc23.x86_64-x86_64-with-fedora-23-Twenty_Three
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fbe6d0fdad0
- ** ---------- .> transport:   amqp://rbtuser:**@localhost:5672/myvhost
- ** ---------- .> results:     amqp
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
[tasks]
  . task.add
[2016-01-21 11:50:09,052: INFO/MainProcess] Connected to amqp://rbtuser:**@127.0.0.1:5672/myvhost
[2016-01-21 11:50:09,063: INFO/MainProcess] mingle: searching for neighbors
[2016-01-21 11:50:10,071: INFO/MainProcess] mingle: all alone
[2016-01-21 11:50:10,079: WARNING/MainProcess] celery@localhost.localdomain ready.

然后进入python终端或者在同目录下建立一个测试用的py文件,内容如下:

from task import add
import time
if __name__ == "__main__":
    a = add.delay(1,2)
    print 1
    b = add.delay(1,888)
    print 2
    c = add.delay(1,4)
    print 3
    d = add.delay(1,5)
    print 4
    #print b.get(timeout=1)
    #print d.get(propagate=False)
    #time.sleep(4)
    print a.ready()
    print b.ready()
    print c.ready()
    print d.ready()

执行后,celery的终端可以看到如下输出:

[2016-01-21 12:00:50,423: INFO/MainProcess] Received task: test_celery.add[5b643950-6eae-4938-ba56-5efc3490cfb7]
[2016-01-21 12:00:50,424: INFO/MainProcess] Received task: test_celery.add[697d6b01-a781-4deb-ba80-a683ab8e2713]
[2016-01-21 12:00:50,425: INFO/MainProcess] Received task: test_celery.add[c9207188-1f67-4589-a01c-e1ce9cfdcd02]
[2016-01-21 12:00:50,425: INFO/MainProcess] Received task: test_celery.add[bbc53156-7f97-4ffd-87b1-209ba6689592]
[2016-01-21 12:01:00,444: INFO/MainProcess] Task test_celery.add[697d6b01-a781-4deb-ba80-a683ab8e2713] succeeded in 10.018047123s: 889
[2016-01-21 12:01:00,444: INFO/MainProcess] Task test_celery.add[bbc53156-7f97-4ffd-87b1-209ba6689592] succeeded in 10.018303534s: 6
[2016-01-21 12:01:00,444: INFO/MainProcess] Task test_celery.add[5b643950-6eae-4938-ba56-5efc3490cfb7] succeeded in 10.0187868s: 3
[2016-01-21 12:01:00,447: INFO/MainProcess] Task test_celery.add[c9207188-1f67-4589-a01c-e1ce9cfdcd02] succeeded in 10.021561263s: 5

而我们测试用的py文件输出如下:

1
2
3
4
False
False
False
False

ready()方法就是查看任务是否完成的,由于我们在add函数中进行了sleep(10),所以输出都为False。

现在取消b.get()那一行的注释,运行结果如下:

1
2
3
4
Traceback (most recent call last):
  File "abc.py", line 24, in <module>
    print b.get(timeout=1)
  File "/usr/lib/python2.7/site-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 157, in wait_for
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
shell returned 1

而celery终端的输出并没变化,这里的意思就是“等待1秒,如果没结果则报错。”

现在,把timeout的值设为15,这样足够它运行完毕了,不过这里需要注意,根据我运行了很多次后发现,除了b的结果一定是True外,其他的不可预知。按照我的理解15秒中应该足够所有任务都执行完成了,可是从结果来看好像是b阻塞了整个任务队列的执行。这里先留个坑。

再下面的propagate如果为True的话,则会忽略任务中的异常使程序能够继续执行,再使用traceback获取异常信息即可。

所以,尽量不要在程序中以阻塞的方式获取任务的返回值!!!!