本文隶属于《Flask Web 开发实战》番外系列。这篇文章会介绍如何在 Flask 项目中集成 Celery。
创建 Celery 程序
第一步是创建一个 Celery 程序实例。因为 Flask 程序实例通常会命名为 app,为了避免冲突,我们一般会把 Celery 实例命名为 celery 或 celery_app:
from celery import Celery
celery = Celery(__name__, broker='pyamqp://guest@localhost//')
@celery.task
def add(x, y):
return x + y
组织和加载配置
大多数教程,包括目前的 Flask 文档里都会介绍用下面的方式加载配置:
celery.conf.update(app.config) # 这里的 app 是 Flask 程序实例
也就是把 Celery 配置和 Flask 配置写在一起,然后从 Flask 程序实例的配置字典里更新配置。
但问题是,Celery 从 4.0 开始启用新的小写配置名,某些配置被新的名称替换。虽然旧的大写配置仍然支持,但如果你打算使用小写配置名,或是打算在未来进行迁移,这里的配置加载方式就会失效,因为 Flask 在从文件或对象导入配置时只会导入大写形式的配置变量。
因此,我建议将 Celery 配置写在单独的文件里,不要和 Flask 配置混在一起。按照 Celery 文档的示例,你可以在当前目录创建一个 celeryconfig.py 文件(或是其他名字)保存配置:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
然后使用下面的方法加载配置(使用其他模块名,或是在其他路径时,记得修改传入的字符串):
celery.config_from_object('celeryconfig')
如果需要在创建 Celery 实例时传入 broker 和 backend 配置,可以直接写出或是从配置模块中导入:
from celeryconfig import broker_url
celery = Celery(__name__, broker=broker_url)
在 Flask 程序中初始化 Celery
你可以单独创建 Celery 程序,但我们通常会需要为它添加 Flask 程序上下文支持,因为有时候你的 Celery 任务函数会用到依赖于 Flask 程序上下文的某些变量。
下面我们为 Celery 创建了一个工厂函数,在工厂函数中创建 Celery 实例,加载配置,并实现 Flask 程序上下文支持:
from flask import Flask
from celery import Celery
from celeryconfig import broker_url
def make_celery(app):
celery = Celery(__name__, broker=broker_url)
celery.config_from_object('celeryconfig')
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
celery = make_celery(app)
在定义 Celery 任务的模块里,比如 tasks.py,你可以导入这个 Celery 程序实例:
from app import celery
@celery.task
def add(x, y):
return x + y
在使用工厂函数的 Flask 程序中初始化 Celery
当 Flask 程序也使用工厂函数创建时,我们可以全局创建 Celery 程序实例,然后在创建 Flask 程序实例的工厂函数里更新 Celery 程序配置并进行上下文设置:
from flask import Flask
from celery import Celery
from celeryconfig import broker_url
celery = Celery(__name__, broker=broker_url)
def create_app():
app = Flask(__name__)
register_celery(app)
return app
def register_celery(app):
celery.config_from_object('celeryconfig')
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
同样直接导入 Celery 实例并创建任务:
from app import celery
@celery.task
def add(x, y):
return x + y
这本来是一个完整的 Celery 入门教程,但因为去年的一次硬盘损坏,对应的示例程序弄丢了,暂时没有毅力重写一遍,所以这篇文章只抽取了其中和 Flask 相关的内容。
因为距离初稿写作的时间已经过去半年多,Celery 的最新版本也已经是 4.3.0,如果文中有什么疏漏,或是有更好的实现方式,欢迎评论指出。
‘No application found. Either work inside a view function or push’
RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.
可以运行,但是上下文没联上,请辉大神指导下