在 Flask 项目中使用 Celery(with 工厂模式 or not)

本文隶属于《Flask Web 开发实战》番外系列。这篇文章会介绍如何在 Flask 项目中集成 Celery。

创建 Celery 程序

第一步是创建一个 Celery 程序实例。因为 Flask 程序实例通常会命名为 app,为了避免冲突,我们一般会把 Celery 实例命名为 celery 或 celery_app:

from celery import Celery

celery = Celery(__name__, broker='pyamqp://guest@localhost//')

@celery.task
def add(x, y):
    return x + y

组织和加载配置

大多数教程,包括目前的 Flask 文档里都会介绍用下面的方式加载配置:

celery.conf.update(app.config)  # 这里的 app 是 Flask 程序实例

也就是把 Celery 配置和 Flask 配置写在一起,然后从 Flask 程序实例的配置字典里更新配置。

但问题是,Celery 从 4.0 开始启用新的小写配置名,某些配置被新的名称替换。虽然旧的大写配置仍然支持,但如果你打算使用小写配置名,或是打算在未来进行迁移,这里的配置加载方式就会失效,因为 Flask 在从文件或对象导入配置时只会导入大写形式的配置变量。

因此,我建议将 Celery 配置写在单独的文件里,不要和 Flask 配置混在一起。按照 Celery 文档的示例,你可以在当前目录创建一个 celeryconfig.py 文件(或是其他名字)保存配置:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

然后使用下面的方法加载配置(使用其他模块名,或是在其他路径时,记得修改传入的字符串):

celery.config_from_object('celeryconfig')

如果需要在创建 Celery 实例时传入 broker 和 backend 配置,可以直接写出或是从配置模块中导入:

from celeryconfig import broker_url

celery = Celery(__name__, broker=broker_url)

在 Flask 程序中初始化 Celery

你可以单独创建 Celery 程序,但我们通常会需要为它添加 Flask 程序上下文支持,因为有时候你的 Celery 任务函数会用到依赖于 Flask 程序上下文的某些变量。

下面我们为 Celery 创建了一个工厂函数,在工厂函数中创建 Celery 实例,加载配置,并实现 Flask 程序上下文支持:

from flask import Flask
from celery import Celery

from celeryconfig import broker_url


def make_celery(app):
    celery = Celery(__name__, broker=broker_url)
    celery.config_from_object('celeryconfig')

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

app = Flask(__name__)

celery = make_celery(app)

在定义 Celery 任务的模块里,比如 tasks.py,你可以导入这个 Celery 程序实例:

from app import celery

@celery.task
def add(x, y):
    return x + y

在使用工厂函数的 Flask 程序中初始化 Celery

当 Flask 程序也使用工厂函数创建时,我们可以全局创建 Celery 程序实例,然后在创建 Flask 程序实例的工厂函数里更新 Celery 程序配置并进行上下文设置:

from flask import Flask
from celery import Celery

from celeryconfig import broker_url
 
celery = Celery(__name__, broker=broker_url)
 

def create_app():
    app = Flask(__name__)
 
    register_celery(app)
    return app
 

def register_celery(app):
    celery.config_from_object('celeryconfig')
 
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
 
    celery.Task = ContextTask

同样直接导入 Celery 实例并创建任务:

from app import celery

@celery.task
def add(x, y):
    return x + y

这本来是一个完整的 Celery 入门教程,但因为去年的一次硬盘损坏,对应的示例程序弄丢了,暂时没有毅力重写一遍,所以这篇文章只抽取了其中和 Flask 相关的内容。

因为距离初稿写作的时间已经过去半年多,Celery 的最新版本也已经是 4.3.0,如果文中有什么疏漏,或是有更好的实现方式,欢迎评论指出。

在 Flask 项目中使用 Celery(with 工厂模式 or not)》上有2条评论

撰写评论

电子邮件地址不会被公开,必填项已用 * 标出。