django集成celery添加异步任务

django集成celery添加异步任务

django集成celery

一.安装

pip install celery[redis](windows不适用,4.1.0版本定时任务有bug)
pip install redis==2.10.6 msgpack celery==3.1.20(windows采用方案)
pip install django-celery-3.2.2

二.应用

1.添加celery配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# settings.py增加如下内容

INSTALLED_APPS.append('djcelery',)

from .celeryconfig import *
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://@127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://@127.0.0.1:6379/2'

# settings同一目录下添加celeryconfig.py
# celeryconfig.py
from datetime import timedelta

import djcelery
from celery.schedules import crontab

djcelery.setup_loader()

CELERY_IMPORTS = (
'app.tasks',
)
CELERY_TASK_SERIALIZER = "msgpack"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务执行结果超时时间
CELERY_ACCEPT_CONTENT = ['json','msgpack']
CELERY_TIMEZONE = 'Asia/Shanghai'


# 有些情况可以防止死锁
CELERY_FORCE_EXECV =True

# 设置并发的worker数
CELERY_CONCURRENCY = 4

# 允许重试
CELERY_ACKS_LATE = True

# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERY_MAX_TASKS_PER_CHILD = 100

# 单个任务运行超时时间
CELERY_TASK_TIME_LIMIT = 12 * 30

# 自定义任务队列
CELERY_QUEUES = {
# 定时任务队列
'beat_task':{
'exchange':'beat_task',
'exchange_type':'direct',
'binding_key':'beat_task'
},
# 普通异步任务队列
'work_queue':{
'exchange':'work_queue',
'exchange_type':'direct',
'binding_key':'work_queue'
},
}
CELERY_DEFAULT_QUEUE = 'work_queue'
# 定时任务
CELERYBEAT_SCHEDULE = {
'task1':{
'task':'mytask',
'schedule':timedelta(seconds=10),# 每十秒执行一次
'args':(),
'options':{
'queue':'beat_task'
}
},
'task2': {
'task': 'mytask2',
'schedule': crontab(hour=17,minute=51), #16:50执行
'options':{
'queue':'beat_task'
}
}
}
2.添加任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 运行python manage.py startapp app,app目录下添加tasks.py
# tasks.py
from celery.task import Task

# 定时任务
class MyTask(Task):
name = "mytask"

def run(self,list1=None):
if list1 is None:
list1 = []
for i in range(10000000):
list1.append(i)
return list1[-1]

# 异步任务
class MyTask2(Task):
name = "mytask2"

def run(self,list1=None):
if list1 is None:
list1 = []
for i in range(10000000):
list1.append(i)
return list1[-1]

# view.py
from django.http.response import JsonResponse
from django.shortcuts import render
from .tasks import MyTask2


# Create your views here.

def do(request):
# CourseTask.delay()
# 执行异步任务
print('do request')
MyTask2.apply_async(args=([1,2,3],),queue="work_queue")
return JsonResponse({'result': 'ok'})

# urls.py
from app.views import do
urlpatterns.append(path('run/',do,name="run"))

三.启动

-l参数:指定日志级别(INFO,DEBUG,WARNING,ERROR,默认是WARNING)

1.开启异步任务
1.启动worker:python manage.py celery worker -l INFO --logfile /log/celery.log
2.启动django:python manage.py runserver
3.访问URL:127.0.0.1:8000/run
2.再开启定时任务
celery任务调度Beat:python manage.py celerybeat -l info --logfile /log/beat.log

四.任务监控 flower

安装:pip install flower
启动:python manage.py celery flower –basic_auth=root:123456
访问:127.0.0.1:5555

https://github.com/zhwl934008411/celery-learning

坚持原创技术分享,您的支持将鼓励我继续创作!