Celery Best Practices

1 minute read

Celery 在实际应用中的最佳实践

Celery是在Python生态系统中最好用的任务队列,通常被用做在后台处理任务。 最新版本为4.0.2 ,使用Celery的社区相当多(包括一些大公司比如Mozilla、Instagram、Yandex 等等) 并且在不断的发展。 因此,在这项技术上投入时间你是可以得到回报。 这篇文章不是关于如何设置Celery,安装和说明都可以在官方网站上找到。我将注重点放在使用Celery的最佳实践。 # Celery 使用技巧

代理和后端使用

多数开发者比较同意在Python和Web开发中,搭配Django,Gunicorn,nginx使用。 尽管Celery默认内置支持了Django的django-celery,应用django-celery 的监控、定期任务、ORM、代理、后端。 但是在产品中使用数据库当做代理和后端是不适合和荒谬的。随着你的WEB程序的发展 以及流量的增加,数据库的负担也会增加并且成为一个需要尽快解决的瓶颈。否则你将开始丢失任务、数据和客户端。在这里最好的实践是使用production-ready 系统比如Redis或RabbitMQ作为代理。他们经过测试,证明了他们的可靠性和一致性。 但这也是一个折中:Redis是快速和响应迅速在内存中存储key-value,但是使用默认设置时不可靠的。RabbitMQ是一个可靠和强大的系统,但稍微难以设置和调整。 谈到Celery的后端,我建议使用memcached或Redis。请不要使用关系数据库。 当我刚开始使用队列时,由下列几个原因我倾向使用数据库作为后端: .. 设置简单 .. django-celery 支持非常好 .. 没有额外的依赖

当前不使用数据库的原因如上所述。我已经在网站上使用memcached和redis 3年了,一切都很完美。

日志

当涉及到错误和异常行为时,日志是查询的关键点。同样适用于后台任务处理系统。 Celery支持标准的Python日志机制(这个非常简单)。默认情况下,它会向stderr输出错误,并将stdout用于其他所有内容。最好的实践运行celery worker使用进程管理比如supervisord。使用配置文件,可以设置文件名和日志的路径。但是手动分析日志使用grep和/这样不是很无聊不是吗。我更喜欢在任务运行时使用Sentry来处理每个可疑行为。 这个是日志文件的配置

CELERYD_HIJACK_ROOT_LOGGER = False
LOGGING = {
    'handlers': {
        'celery_sentry_handler': {
            'level': 'ERROR',
            'class': 'core.log.handlers.CelerySentryHandler'
        }
    },
    'loggers': {
        'celery': {
            'handlers': ['celery_sentry_handler'],
            'level': 'ERROR',
            'propagate': False,
        },
    }
}

CelerySentryHandler 内容

from raven.handlers.logging import SentryHandler
class CelerySentryHandler(SentryHandler):
    def __init__(self):
        super(AviataSentryHandler, self).__init__(settings.RAVEN_CELERY_DSN)

检查下CELERYD_HIJACK_ROOT_LOGGER的值,默认是True。防止任何客户化handler替代celery日志。按照我的方法你不需要设置任何handler在你的任务中。

不要存储执行结果你不要他

我们通常用Celery异步执行任务,并不关心任务执行后的返回值。(比如发送邮件,调整图片尺寸)。默认情况下任务执行结果存储在后台,但是当你不需要返回值,不需要保存,设置CELERY_IGNORE_RESULT为True

超时和Broker设置 TODO

除了后台执行任务,Celery同时支持延迟执行任务(apply_async)

任务队列的划分

当你调用一个任务,他会放在默认的队列里执行,除非你有另外的声明。 当你有大量的不认同任务,应用程序快速发展时,这里有一个非常简单明了的想法。 在这种情况下,每个任务都有自己的优先级,有些任务不如其他任务重要。比如在你的程序中发送邮件的优先级高于调整图片大小。最好将它们分成不同的队列和workers,避免当你只有一个队列时的溢出。而溢出会导致所有任务的执行出现问题。 我通常这么做:

CELERY_QUEUES = (
    Queue('high', Exchange('high'), routing_key='high'),
    Queue('normal', Exchange('normal'), routing_key='normal'),
    Queue('low', Exchange('low'), routing_key='low'),
)
CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'
CELERY_ROUTES = {
    # -- HIGH PRIORITY QUEUE -- #

    'myapp.tasks.check_payment_status': {'queue': 'high'},
    # -- LOW PRIORITY QUEUE -- #

    'myapp.tasks.close_session': {'queue': 'low'},
}

启动celery worker 监听队列

celery worker -E -l INFO -n worker.high -Q high
celery worker -E -l INFO -n worker.normal -Q normal
celery worker -E -l INFO -n worker.low -Q low

我通常有3种队列 .high .normal .low

这个取决于程序的实际情况,以及设计到的领域和架构。你可以增加或减少。 警告:当你为特定的队列设置名称时(比如high),像这样在没有特定队列的情况下运行workers

    celery worker -E -l INFO -n worker.whatever

当你的high队列已满时,workers忙于处理任务。你的待处理任务将由其中一个没有监听该队列的workers执行。因此当设计任务队列分离时请小心处理。

尽量保持任务简单

这里的简单是指不要把你的业务逻辑卸载任务中,比如你想使用Celery发送邮件,那就定义发送代码在方法外面。

from .utils import generate_report, send_email
@app.task(bind=True)
def send_report():
    filename = generate_report()
    send_email(subject, message, attachments=[filename])

这样在单一测试期间,将更加具备灵活性。

使用任务监控系统

最好的世界就是使用Flower进行执行是的监控。 每天使用它。花一些时间来研究这个工具,你会得到回报。

任务参数中不要传递ORM对象

Django ORM 非常好,但是在Celery的任务中,使用 ORM对象作为方法的参数。方法将会变得很糟糕。让我们看下下列示例:

     from .models import Profile
@app.task(bind=True):
def send_notification(profile):
    send_email(profile.user.email, subject, message_body)
    profile.notified = True
    profile.save()
def notify_user():
    profile = Profile.objects.get(id=1)
    check_smthng()
    send_notification.delay(profile)
    profile.activated = True
    profile.save()
 

设置任务超时

为任务中心提供超时总是好的。你可以使用下列方法执行这个操作。 . 使用@app.task 修饰参数soft_time_limit 和 time_limit . 为特定worker设置超时参数CELERYD_TASK_SOFT_TIME_LIMIT, CELERYD_TASK_TIME_LIMIT

设置超时限制非常重要,避免worker “freeze” 和队列“deadlock”

Leave a Comment