找回密码
 会员注册
查看: 32|回复: 0

PythonCelery:高效处理异步任务的必备神器

[复制链接]

2万

主题

0

回帖

7万

积分

超级版主

积分
73180
发表于 2024-9-4 18:46:35 | 显示全部楼层 |阅读模式
更多Python学习内容:ipengtao.comCelery是一个简单、灵活且可靠的分布式系统,用于处理大量消息并提供实时操作。它是用Python编写的,并且可以在Web开发、数据处理和自动化任务中广泛应用。Celery支持任务的异步执行、定时执行和任务重试,能够与各种消息代理(如RabbitMQ和Redis)和结果存储后端无缝集成。本文将详细介绍Celery库的安装、主要功能、基本操作、高级功能及其实践应用,并提供丰富的示例代码。安装Celery可以通过pip进行安装。确保Python环境已激活,然后在终端或命令提示符中运行以下命令:pip install celery如果打算使用Redis作为消息代理,还需要安装redis:pip install redis主要功能异步任务执行:支持任务异步分发和执行。定时任务:可以定时或周期性地执行任务。任务重试:支持任务失败后的自动重试机制。任务结果存储:可以存储任务的执行结果并进行检索。扩展性:支持自定义中间件和扩展,适用于复杂的分布式系统。基本操作配置Celery首先,需要配置Celery应用。以下示例展示了如何配置一个简单的Celery应用,并使用Redis作为消息代理:from celery import Celeryapp = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')app.conf.update(    result_expires=3600,)定义任务Celery任务是通过装饰器定义的。以下示例展示了如何定义一个简单的任务:@app.taskdef add(x, y):    return x + y发送任务可以通过调用任务函数的delay方法将任务发送到消息队列:result = add.delay(4, 6)print(result.id)  # 输出任务ID获取任务结果可以通过任务ID获取任务的执行结果:result = add.delay(4, 6)print(result.get())  # 输出:10高级功能定时任务Celery支持定时任务,可以使用celery.beat模块实现。以下示例展示了如何配置和使用定时任务:from celery.schedules import crontabapp.conf.beat_schedule = {    'add-every-30-seconds': {        'task': 'myapp.add',        'schedule': 30.0,        'args': (16, 16)    },    'multiply-at-midnight': {        'task': 'myapp.multiply',        'schedule': crontab(hour=0, minute=0),        'args': (4, 4),    },}任务重试Celery支持任务失败后的自动重试,可以通过任务的retry方法实现。以下示例展示了如何配置任务重试:@app.task(bind=True, max_retries=3)def fragile_task(self):    try:        # 执行任务        pass    except Exception as exc:        raise self.retry(exc=exc, countdown=5)链接任务Celery支持将多个任务链接在一起依次执行。以下示例展示了如何链接任务:from celery import chainresult = chain(add.s(4, 4) | add.s(8) | add.s(16))()print(result.get())  # 输出:32分组任务Celery支持将多个任务分组并行执行。以下示例展示了如何分组任务:from celery import groupresult = group(add.s(i, i) for i in range(10))()print(result.get())  # 输出:[0,2,4,6,8,10,12,14,16,18]使用中间件Celery支持自定义中间件,用于在任务执行前后进行处理。以下示例展示了如何使用中间件记录任务日志:from celery.signals import before_task_publish, after_task_publish@before_task_publish.connectdef task_sent_handler(sender=None, body=None, **kwargs):    print(f'Task {sender} is sent with body {body}')@after_task_publish.connectdef task_sent_handler(sender=None, body=None, **kwargs):    print(f'Task {sender} has been sent')实践应用Web应用中的异步任务在Web应用中,可以使用Celery处理耗时的任务,例如发送电子邮件:# tasks.pyfrom celery import Celeryapp = Celery('emailapp', broker='redis://localhost:6379/0')@app.taskdef send_email(to_address, subject, body):    # 发送电子邮件的逻辑    print(f"Sending email to {to_address} with subject '{subject}'")# views.pyfrom flask import Flask, request, jsonifyfrom tasks import send_emailapp = Flask(__name__)@app.route('/send_email', methods=['POST'])def send_email_endpoint():    data = request.json    send_email.delay(data['to_address'], data['subject'], data['body'])    return jsonify({"status": "Email sent"}), 200if __name__ == '__main__':    app.run()数据处理任务使用Celery进行大规模数据处理任务:# tasks.pyfrom celery import Celeryimport pandas as pdapp = Celery('dataapp', broker='redis://localhost:6379/0')@app.taskdef process_data(file_path):    df = pd.read_csv(file_path)    summary = df.describe()    output_path = file_path.replace('.csv', '_summary.csv')    summary.to_csv(output_path)    return output_path# main.pyfrom tasks import process_dataresult = process_data.delay('data.csv')print(f"Summary file saved at: {result.get()}")分布式爬虫使用Celery构建分布式爬虫系统:# tasks.pyfrom celery import Celeryimport requestsapp = Celery('crawlerapp', broker='redis://localhost:6379/0')@app.taskdef fetch_url(url):    response = requests.get(url)    return response.text# main.pyfrom tasks import fetch_urlurls = ['https://example.com', 'https://example.org', 'https://example.net']result = group(fetch_url.s(url) for url in urls)()print(result.get())总结Celery库为Python开发者提供了一个强大且灵活的工具,用于管理异步任务队列和分布式任务执行。通过其简洁的API和丰富的功能,用户可以轻松地定义、调度和管理各种异步任务。无论是在Web开发、数据处理还是自动化任务中,Celery都能提供强大的支持和便利。如果你觉得文章还不错,请大家 点赞、分享、留言 ,因为这将是我持续输出更多优质文章的最强动力!更多Python学习内容:ipengtao.com如果想要系统学习Python、Python问题咨询,或者考虑做一些工作以外的副业,都可以扫描二维码添加微信,围观朋友圈一起交流学习。我们还为大家准备了Python资料和副业项目合集,感兴趣的小伙伴快来找我领取一起交流学习哦!往期推荐Python中的iter()函数:迭代器的生成工具Python中的isinstance()函数:类型检查的利器Python中的sorted()函数:排序的利器Python中的hash()函数:哈希值的奥秘Python中的slice()函数:切片的利器Python的tuple()函数:创建不可变序列点击下方“阅读原文”查看更多
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-12 18:15 , Processed in 0.639807 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表