Celery入门

Celery入门

Celery基础

Celery是一个专注于实时处理和任务调度的分布式任务队列。
Celery特性:方便查看定时任务的执行情况(包含执行结果,执行状态,执行时间等)、任务支持后台管理(安装flower组件)、可选多进程Eventlet和Gevent三种模式执行任务、支持多种消息代理和后端存储

Celery包含的组件:

  • Producer:任务生产者,可用task装饰器或其他方法将函数转化为celery任务
  • Broker:消息代理(或称消息中间件),接收Celery Beat发送过来的任务,存储到任务队列再有序发给worker执行,生产环境推荐使用Rabbit或Redis存储任务队列
  • Celery Beat:任务调度器,负责发布异步或定时任务给Broker
  • Celery Workr:任务执行者,
  • Result Backend:任务处理完保存执行状态和结果,生产环境推荐使用redis存储
    当worker运行完结果之后就会把他返回给生产者的唯一id作为键,
    将结果作为值传递给你设置的worker(backend测试环境为redis).

Celery序列化:
在Broker和消费者之间传输的是序列化的数据(类似前后端分离传递json),支持pickle/json/yaml/msgpack方案,推荐使用msgpack

Celery的使用

1.安装

pip install celeryredis
pip install redis==2.10.6 msgpack celery==3.1.20(windows采用方案)

2.采用celery包启动

  • 准备:新建python包celery_learn、app.py文件
    celery_learn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# __init__.py
from celery import Celery

app = Celery("mytask")
app.config_from_object('celery_learn.celery_config')

# celery_config.py
# -*- coding: utf-8 -*-
from datetime import timedelta
from celery.schedules import crontab
BROKER_URL = "redis://127.0.0.1:6379/4"
BACKEND_URL = "redis://127.0.0.1:6379/5"

# CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/5"

CELERY_TIMEZONE = 'Asia/Shanghai'

# 导入指定的任务模块
CELERY_IMPORTS = (
'celery_learn.task1',
'celery_learn.task2',
)
# 定时任务
CELERYBEAT_SCHEDULE = {
'task1': {
'task': 'celery_learn.task1.func',
'schedule': timedelta(seconds=10),
# 'args': ()
},
'task2': {
'task': 'celery_learn.task2.add',
'schedule': crontab(hour=11, minute=48),
'args': (4, 5)
}
}


# task1.py
from celery_learn import app

@app.task
def func(list1=None):
if list1 is None:
list1 = []
for i in range(10000000):
list1.append(i)
return list1[-1]

# task2.py
import time
from celery_learn import app

@app.task
def add(x,y):
time.sleep(3)
return x+y
app.py
1
2
3
4
5
6
7
8
9
10
11

from celery_learn import task1
from celery_learn import task2


if __name__ == '__main__':
task1.func.delay()
# 或者用apply_async()
# task1.func.apply_async()
task2.add.delay(2,3)
print("end.....")

3.启动

  • -A参数:指定任务启动的文件名
  • -l参数:指定日志级别(INFO,DEBUG,WARNING,ERROR,默认是WARNING)
    1.开启异步任务
  • 启动worker:celery worker -A celery_learn -l INFO
  • 开启任务:python app.py
    2.开启定时任务
  • celery任务调度Beat(用作定时任务):celery beat -A celery_learn -l INFO
  • 启动worker:celery worker -A celery_learn -l INFO –logfile celery_learn/log/celery.log

一起启动beat和worker:celery -B -A celery_learn worker -l INFO(windows不适用)
https://github.com/zhwl934008411/celery-learning

坚持原创技术分享,您的支持将鼓励我继续创作!