Celery介绍 Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。celery看起来似乎很庞大,我们先对其进行简单的了解,然后再去学习其他一些高级特性。 celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。 celery的特点是:
简单,易于使用和维护,有丰富的文档。
高效,单个celery进程每分钟可以处理数百万个任务。
灵活,celery中几乎每个部分都可以自定义扩展。
celery非常易于集成到一些web开发框架中.
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
Celery架构 Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。
消息中间件(message broker) Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元(worker) Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果储存(task result store) 如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。
使用场景 异步执行:解决耗时任务
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务
Celery的安装配置 1 2 3 4 5 pip install celery 消息中间件:RabbitMQ/Redis app=Celery('任务名' , broker='xxx' , backend='xxx' )
celery框架工作流程
创建celery框架对象app,配置broker和backend,得到的app就是worker
给worker对应的app添加可处理的任务函数,用include配置给worker的app
完成提供的任务的定时配置app.conf.beat_schedul
启动celery服务,运行worker,执行任务
启动beat服务,运行beat,添加任务
Celery执行异步任务 包架构封装 1 2 3 4 5 6 7 project ├── celery_task │ ├── __init__.py │ ├── celery.py │ └── tasks.py ├── add_task.py └── get_result.py
使用 celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from celery import Celerybroker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks' ])
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from .celery import app@app.task def add (a, b ): res = a + b print ('a + b = %s' % res) return res @app.task def reduce (a, b ): res = a - b print ('a - b = %s' % res) return res
add_task.py
1 2 3 4 from tasks import add, reduceresult = add.delay(10 , 20 ) print (result.id )
get_result.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from tasks import appfrom celery.result import AsyncResultid = 'f3e679c8-ac51-41a7-9bad-72e1ea5b6a96' if __name__ == '__main__' : async = AsyncResult(id =id , app=app) if async .successful(): result = async .get() print (result) elif async .failed(): print ('任务失败' ) elif async .status == 'PENDING' : print ('任务等待中被执行' ) elif async .status == 'RETRY' : print ('任务异常后正在重试' ) elif async .status == 'STARTED' : print ('任务已经开始被执行' )
Celery执行延迟任务 celery.py
1 2 3 4 5 6 7 from celery import Celerybroker = 'redis://127.0.0.1:6379/14' backend = 'redis://127.0.0.1:6379/15' app = Celery(broker=broker, backend=backend, include=['celery_tasks.tasks' ])
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from .celery import app@app.task def add (a, b ): res = a + b print ('a + b = %s' % res) return res @app.task def reduce (a, b ): res = a - b print ('a - b = %s' % res) return res
add_task.py
1 2 3 4 5 6 from celery_tasks.tasks import addfrom datetime import datetime, timedeltaresult = add.apply_async(args=(20 , 40 ), eta=datetime.utcnow() + timedelta(seconds=10 )) print (result)
Celery执行定时任务 celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from celery import Celerybroker = 'redis://127.0.0.1/14' backend = 'redis://127.0.0.1/15' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks' ]) app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False from datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = { 'add-task' : { 'task' : 'celery_task.tasks.add' , 'schedule' : timedelta(seconds=3 ), 'args' : (20 , 50 ) }, 'reduce-task' : { 'task' : 'celery_task.tasks.reduce' , 'schedule' : timedelta(seconds=6 ), 'args' : (20 , 50 ) } }
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from .celery import appimport time@app.task def add (n, m ): print (n) print (m) time.sleep(10 ) print ('n+m的结果:%s' % (n + m)) return n + m @app.task def low (n, m ): print (n) print (m) print ('n-m的结果:%s' % (n - m)) return n - m
get_result.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5' if __name__ == '__main__' : async = AsyncResult(id =id , app=app) if async .successful(): result = async .get() print (result) elif async .failed(): print ('任务失败' ) elif async .status == 'PENDING' : print ('任务等待中被执行' ) elif async .status == 'RETRY' : print ('任务异常后正在重试' ) elif async .status == 'STARTED' : print ('任务已经开始被执行' )
django中使用 celery配置django缓存
在项目根目录下先建一个包celery_task
,包中先建两个文件 celery.py
和 tasks.py
celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import osos.environ.setdefault("DJANGO_SETTINGS_MODULE" , "luffyapi.settings.dev" ) from celery import Celerybroker = 'redis://127.0.0.1:6379/14' backend = 'redis://127.0.0.1:6379/15' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks' ]) app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False from datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = { 'update-banner-cache' : { 'task' : 'celery_task.tasks.update_banner_cache' , 'schedule' : timedelta(seconds=10 ), 'args' : (), } }
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from .celery import appfrom home.models import Bannerfrom django.core.cache import cachefrom django.conf import settingsfrom home.serializers import BannerModelSerializer@app.task def update_banner_cache (): banner_query = Banner.objects.filter (is_delete=False , is_show=True ).all ()[:settings.BANNER_COUNT] banner_data = BannerModelSerializer(banner_query, many=True ).data for banner in banner_data: banner['image' ] = '%s%s' % (settings.BASE_URL, banner.get('image' )) cache.set ('banner_cache' , banner_data) return True