找回密码
 会员注册
查看: 26|回复: 0

Python实现定时任务的三种方案——schedule、APScheduler、Celery

[复制链接]

3

主题

0

回帖

10

积分

新手上路

积分
10
发表于 2024-9-7 15:29:09 | 显示全部楼层 |阅读模式
scheduleschedule是一个轻量级的Python库,用于定期执行任务,即定时任务调度。它提供了一种简单直观的方式来自定义任务执行的时间规则,而无需复杂的线程或进程管理知识。schedule适用于那些需要在后台定期执行某些功能的Python应用程序,比如数据抓取、定时发送邮件、系统维护脚本等场景。pipinstallschedule简单示例importscheduleimporttimedefjob():print("I'mworking...")#每隔1秒执行一次job函数schedule.every(1).seconds.do(job)#每隔10分钟执行一次job函数schedule.every(10).minutes.do(job)#每小时的整点执行job函数schedule.every().hour.do(job)#每天的14:30分执行job函数schedule.every().day.at("14:30").do(job)#随机地在每5到10分钟之间选择一个时间点执行job函数schedule.every(5).to(10).minutes.do(job)#每周一执行job函数schedule.every().monday.do(job)#每周三的13:15分执行job函数schedule.every().wednesday.at("13:15").do(job)#每个小时的第17分钟执行job函数schedule.every().minute.at(":17").do(job)whileTrue:schedule.run_pending()time.sleep(1)装饰器:通过@repeat()装饰静态方法importtimefromscheduleimportevery,repeat,run_pending@repeat(every().second)defjob():print('working...')whileTrue:run_pending()time.sleep(1)传递参数importscheduledefgreet(name):print('Hello',name)schedule.every(2).seconds.do(greet,name='Alice')schedule.every(4).seconds.do(greet,name='Bob')whileTrue:schedule.run_pending()@repeat(every().second,'World')@repeat(every().minute,'Mars')defhello(planet):print('Hello',planet)whileTrue:run_pending()取消任务importschedulei=0defsome_task():globalii+=1print(i)ifi==10:schedule.cancel_job(job)print('canceljob')exit(0)job=schedule.every().second.do(some_task)whileTrue:schedule.run_pending()运行一次任务importtimeimportscheduledefjob_that_executes_once():print('Hello')returnschedule.CancelJobschedule.every().minute.at(':34').do(job_that_executes_once)whileTrue:schedule.run_pending()time.sleep(1)根据标签检索任务#检索所有任务:schedule.get_jobs()importscheduledefgreet(name):print('Hello{}'.format(name))schedule.every().day.do(greet,'Andrea').tag('daily-tasks','friend')schedule.every().hour.do(greet,'John').tag('hourly-tasks','friend')schedule.every().hour.do(greet,'Monica').tag('hourly-tasks','customer')schedule.every().day.do(greet,'Derek').tag('daily-tasks','guest')friends=schedule.get_jobs('friend')print(friends)根据标签取消任务#取消所有任务:schedule.clear()importscheduledefgreet(name):print('Hello{}'.format(name))ifname=='Cancel':schedule.clear('second-tasks')print('cancelsecond-tasks')schedule.every().second.do(greet,'Andrea').tag('second-tasks','friend')schedule.every().second.do(greet,'John').tag('second-tasks','friend')schedule.every().hour.do(greet,'Monica').tag('hourly-tasks','customer')schedule.every(5).seconds.do(greet,'Cancel').tag('daily-tasks','guest')whileTrue:schedule.run_pending()运行任务到某时间importschedulefromdatetimeimportdatetime,timedelta,timedefjob():print('working...')schedule.every().second.until('23:59').do(job)#今天23:59停止schedule.every().second.until('2030-01-0118:30').do(job)#2030-01-0118:30停止schedule.every().second.until(timedelta(hours=8)).do(job)#8小时后停止schedule.every().second.until(time(23,59,59)).do(job)#今天23:59:59停止schedule.every().second.until(datetime(2030,1,1,18,30,0)).do(job)#2030-01-0118:30停止whileTrue:schedule.run_pending()马上运行所有任务importscheduledefjob():print('working...')defjob1():print('Hello...')schedule.every().monday.at('12:40').do(job)schedule.every().tuesday.at('16:40').do(job1)schedule.run_all()schedule.run_all(delay_seconds=3)#任务间延迟3秒并发运行:使用Python内置队列实现:importthreadingimporttimeimportscheduledefjob1():print("I'mrunningonthread%s"%threading.current_thread())defjob2():print("I'mrunningonthread%s"%threading.current_thread())defjob3():print("I'mrunningonthread%s"%threading.current_thread())defrun_threaded(job_func):job_thread=threading.Thread(target=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded,job1)schedule.every(10).seconds.do(run_threaded,job2)schedule.every(10).seconds.do(run_threaded,job3)whileTrue:schedule.run_pending()time.sleep(1)APSchedulerAPScheduler是一个功能强大的Python库,用于在后台调度作业,支持多种类型的触发器(如定时、间隔、日期和cron表达式),以及持久化作业存储和分布式执行。相较于schedule库,APScheduler提供了更多的灵活性和企业级特性,适用于更复杂的调度需求。APScheduler四个组成部分1.调度器(Scheduler)作用:调度器是APScheduler的核心,负责管理和驱动整个任务调度流程。它根据配置的时间规则(触发器)来决定何时执行哪些任务,并管理作业的添加、修改、删除以及执行。2.触发器(Triggers)作用:触发器定义了任务执行的时间规则,即决定任务何时触发执行。APScheduler支持几种类型的触发器,包括:IntervalTrigger:按照固定的时间间隔执行任务,如每隔5分钟执行一次。DateTrigger:在指定的日期和时间点执行一次任务。CronTrigger:类似于UnixCron表达式,提供非常灵活的时间规则,可以精确到秒、分钟、小时、日、月、周几等。3.作业储存器(JobStores)作用:作业储存器用于持久化作业的状态信息,确保任务调度的可靠性。即使程序重启,也能恢复作业的执行状态。APScheduler支持多种作业储存方式,包括内存、SQL数据库(如SQLite、MySQL)、Redis等。4.执行器(Executors)作用:执行器负责实际执行作业,它决定了任务在哪个线程或进程中运行。APScheduler提供了多种执行器,例如:ThreadPoolExecutor:使用线程池来执行任务。ProcessPoolExecutor:使用进程池来执行任务,适用于CPU密集型任务。GeventExecutor或TornadoExecutor:为异步IO框架(如Gevent或Tornado)设计的执行器,适用于I/O密集型任务。简单使用 pipinstallapschedulerfromapscheduler.schedulers.blockingimportBlockingSchedulerfromdatetimeimportdatetime#输出时间defjob():print(datetime.now().strftime("%Y-%m-%d%H:%M:%S"))#BlockingSchedulersched=BlockingScheduler()#创建调度器sched.add_job(job,'interval',seconds=1,id='my_job_id')sched.start()add_job部分源码参数说明:id:指定作业的唯一IDname:指定作业的名字trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间,满足时将会执行executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的执行器,执行job指定的函数max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此jobcoalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行func:Job执行的函数args:Job执行函数需要的位置参数kwargs:Job执行函数需要的关键字参数调度器、执行器、触发器Scheduler调度器调度器是管理定时任务的当使用BlockingScheduler时作为独立进程使用,会阻塞主线程当使用BackgroundScheduler时会在后台运行,不会发生阻塞fromapscheduler.schedulers.blockingimportBlockingSchedulerfromapscheduler.schedulers.backgroundimportBackgroundSchedulerscheduler=BackgroundScheduler()scheduler.start()#此处程序不会发生阻塞scheduler=BlockingScheduler()scheduler.start()#此处程序会发生阻塞executors执行器执行器控制执行方式在定时任务该执行时,ThreadPoolExecutor以线程方式执行任务,ProcessPoolExecutor以进程方式执行任务#方式1:线程fromapscheduler.executors.poolimportThreadPoolExecutorexecutors={'default':ThreadPoolExecutor(20)#最多20个线程同时执行}scheduler=BackgroundScheduler(executors=executors)#方式2:进程fromapscheduler.executors.poolimportProcessPoolExecutorexecutors={'default'rocessPoolExecutor(3)#最多3个进程同时运行}scheduler=BackgroundScheduler(executors=executors)Trigger触发器触发器控制的是什么时候会执行任务目前APScheduler支持触发器:指定时间的DateTrigger指定间隔时间的IntervalTrigger像Linux的crontab一样的CronTrigger触发器参数:date(作业只执行一次)fromdatetimeimportdate#在2024年6月11日00:00:00执行sched.add_job(my_job,'date',run_date=date(2024,06,11))#在2024年5月31日16:30:05sched.add_job(my_job,'date',run_date=datetime(2024,05,31,16,30,5))sched.add_job(my_job,'date',run_date='2009-11-0616:30:05')#立即执行sched.add_job(my_job,'date')sched.start()触发器参数:intervalinterval间隔调度weeks(int)–间隔几周days(int)–间隔几天hours(int)–间隔几小时minutes(int)–间隔几分钟seconds(int)–间隔多少秒start_date(datetime|str)–开始日期end_date(datetime|str)–结束日期timezone(datetime.tzinfo|str)–时区fromdatetimeimportdatetime#每两小时执行一次sched.add_job(job_function,'interval',hours=2)#在2024年5月31日09:30:00到2024年6月15日的时间内,每两小时执行一次sched.add_job(job_function,'interval',hours=2,start_date='2010-05-3109:30:00',end_date='2024-06-1511:00:00')触发器参数:croncron调度(int|str)表示参数既可以是int类型,也可以是str类型(datetime|str)表示参数既可以是datetime类型,也可以是str类型year(int|str)–4-digityear-(表示四位数的年份,如2008年)month(int|str)–month(1-12)-(表示取值范围为1-12月)day(int|str)–dayofthe(1-31)-(表示取值范围为1-31日)week(int|str)–ISOweek(1-53)-(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))day_of_week(int|str)–numberornameofweekday(0-6ormon,tue,wed,thu,fri,sat,sun)–(表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)hour(int|str)–hour(0-23)–(表示取值范围为0-23时)minute(int|str)–minute(0-59)–(表示取值范围为0-59分)second(int|str)–second(0-59)–(表示取值范围为0-59秒)start_date(datetime|str)–earliestpossibledate/timetotriggeron(inclusive)–(表示开始时间)end_date(datetime|str)–latestpossibledate/timetotriggeron(inclusive)–(表示结束时间)timezone(datetime.tzinfo|str)–timezonetouseforthedate/timecalculations(defaultstoschedulertimezone)-(表示时区取值)CronTrigger可用的表达式:#6-8,11-12月第三个周五00:00,01:00,02:00,03:00运行sched.add_job(job_function,'cron',month='6-8,11-12',day='3rdfri',hour='0-3')#每周一到周五运行直到2024-05-3000:00:00sched.add_job(job_function,'cron',day_of_week='mon-fri',hour=5,minute=30,end_date='2024-05-30' 任务管理添加、移除、暂停、恢复、停止#方式1:通过对象job=scheduler.add_job(myfunc,'interval',minutes=2)#添加任务job.remove()#移除任务job.pause()#暂停任务job.resume()#恢复任务#方式2:通过任务idscheduler.add_job(myfunc,'interval',minutes=2,id='my_job_id')#添加任务scheduler.remove_job('my_job_id')#移除任务scheduler.pause_job('my_job_id')#暂停任务scheduler.resume_job('my_job_id')#恢复任务scheduler.shutdown()#停止任务#修改任务#方式1:通过对象job.modify(max_instances=6,name='Alternatename')#方式2:通过任务idscheduler.reschedule_job('my_job_id',trigger='cron',minute='*/5')CeleryCelery是一个基于Python的分布式任务队列和作业调度系统,专为处理大量实时的异步任务和定时任务而设计。它能够有效地将耗时的操作异步处理,从而提高应用程序的响应速度和吞吐量。Celery架构包含三个核心组件,分别是消息中间件(Broker)、任务执行单元(Worker)和任务结果存储(ResultBackend)。Celery的三大组成1.消息中间件(Broker)消息中间件是Celery的神经系统,它负责接收任务生产者产生的消息并将这些消息分发给任务执行单元(Worker)。Celery不直接提供消息服务,而是需要与第三方消息中间件集成,常用的包括:RabbitMQ:一个高可用的消息队列服务,支持AMQP协议,广泛应用于企业级系统。Redis:一个高性能的键值存储系统,也可以作为简单的消息队列使用。AmazonSQS、GooglePub/Sub等云服务商提供的消息队列服务。2.任务执行单元(Worker)Worker是任务的实际执行者,它们监听消息中间件上的任务队列,当有新的任务到达时,Worker会取出任务并执行。Worker可以部署在单个或多个服务器上,实现任务的分布式处理。Worker能够并行处理任务,提高系统处理能力。用户可以配置Worker的数量和运行Worker的机器,以达到最佳的性能和资源利用。3.任务结果存储(ResultBackend)ResultBackend用于存储任务执行的结果,以便任务的发起方(生产者)可以查询任务执行的状态和获取结果。Celery支持多种存储结果的方式,包括:RabbitMQ/AMQP:直接使用消息中间件存储结果,但功能有限。Redis:常用的选择,快速且灵活,适合存储短期结果。DjangoORM、SQLAlchemy:将结果存储在关系数据库中,适合需要持久化存储的场景。Memcached、MongoDB等其他数据存储服务。安装#celery安装pipinstallcelery#celery监控flowerpipinstallflowerpipinstallredispipinstalleventlet简单任务单任务目录结构#celery_task.pyimportceleryimporttimebackend='redis://127.0.0.1:6379/1'broker='redis://127.0.0.1:6379/2'cel=celery.Celery('test',backend=backend,broker=broker)@cel.taskdefsend_email(name):print("向%s发送邮件..."%name)time.sleep(5)print("向%s发送邮件完成"%name)return"ok"#product.pyfromcelery_taskimportsend_emailresult=send_email.delay("yuan")print(result.id)result2=send_email.delay("alex")print(result2.id) 接下来这一步很关键#启动命令,进入该文件的目录下启动celery--app=celery_taskworker-Peventlet-lINFO命令启动后,到product.py文件下运行代码,效果如下:多任务目录结构#celery.pyfromceleryimportCelerycel=Celery('celery_demo',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2',#包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类include=['celery_tasks.task01','celery_tasks.task02'])#时区cel.conf.timezone='Asia/Shanghai'#是否使用UTCcel.conf.enable_utc=False#task01importtimefromcelery_tasks.celeryimportcel@cel.taskdefsend_email(res):time.sleep(5)return"完成向%s发送邮件任务"%res#task02importtimefromcelery_tasks.celeryimportcel@cel.taskdefsend_msg(res):time.sleep(5)return"完成向%s发送短信任务"%res#product.pyfromcelery_tasks.task01importsend_emailfromcelery_tasks.task02importsend_msg#立即告知celery去执行test_celery任务,并传入一个参数result=send_email.delay('yuan')print(result.id)result=send_msg.delay('yuan')print(result.id)进入终端,在celery_tasks所在目录运行celery--app=celery_tasksworker-Peventlet-lINFO 定时任务单任务目录结构#celery_task.pyimportceleryimporttimebackend='redis://127.0.0.1:6379/1'#结果存储broker='redis://127.0.0.1:6379/2'#消息中间件cel=celery.Celery('test',backend=backend,broker=broker)@cel.taskdefsend_email(name):print("向%s发送邮件..."%name)time.sleep(5)print("向%s发送邮件完成"%name)return"ok"#product.pyfromcelery_taskimportsend_emailfromdatetimeimportdatetime#方式一v1=datetime(2020,3,11,16,19,00)print(v1)v2=datetime.utcfromtimestamp(v1.timestamp())print(v2)result=send_email.apply_async(args=["python",],eta=v2)print(result.id)#方式二ctime=datetime.now()#默认用utc时间utc_ctime=datetime.utcfromtimestamp(ctime.timestamp())fromdatetimeimporttimedeltatime_delay=timedelta(seconds=10)task_time=utc_ctime+time_delay#当前时间加10秒#使用apply_async并设定时间result=send_email.apply_async(args=["Golang"],eta=task_time)print(result.id)在终端执行命令 celery--app=celery_taskworker-Peventlet-lINFOcelery-Acelery_taskbeat#CeleryBeat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列 执行完命令之后还要去运行product.py的代码 多任务目录结构只需修改celery.py代码#celery.pyfromdatetimeimporttimedeltafromceleryimportCeleryfromcelery.schedulesimportcrontabcel=Celery('tasks',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2',include=['celery_tasks.task01','celery_tasks.task02',])cel.conf.timezone='Asia/Shanghai'cel.conf.enable_utc=Falsecel.conf.beat_schedule={#名字随意命名'add-every-10-seconds':{#执行tasks1下的test_celery函数'task':'celery_tasks.task01.send_email',#每隔2秒执行一次#'schedule':1.0,#'schedule':crontab(minute="*/1"),'schedule':timedelta(seconds=6),#传递参数'args''你好',)},#'add-every-12-seconds':{#'task':'celery_tasks.task01.send_email',#每年4月11号,8点42分执行#'schedule':crontab(minute=42,hour=8,day_of_month=11,month_of_year=4),#'args''你好',)#},}在终端执行命令(不用执行product.py):#分两个终端执行celery--app=celery_tasksworker-Peventlet-lINFOcelery-Acelery_tasksbeat#CeleryBeat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列任务定时执行:
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-11 05:59 , Processed in 0.485145 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表