(编辑:jimmy 日期: 2025/1/19 浏览:2)
在工作场景遇到了这么一个场景,就是需要定期去执行一个缓存接口,用于同步设备配置。首先想到的就是Linux上的crontab,可以定期,或者间隔一段时间去执行任务。但是如果你想要把这个定时任务作为一个模块集成到Python项目中,或者想持久化任务,显然crontab不太适用。Python的APScheduler模块能够很好的解决此类问题,所以专门写这篇文章,从简单入门开始记录关于APScheduler最基础的使用场景,以及解决持久化任务的问题,最后结合其他框架深层次定制定时任务模块这几个点入手。
先简单介绍一下Apscheduler模块包含的四种组件:
大概了解了Apscheduler包含的几种概念,现在先来看一下一个简单的示例:
# -*- coding: utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import time def hello(): print(time.strftime("%c")) if __name__ == "__main__": scheduler = BlockingScheduler() scheduler.add_job(hello, 'interval', seconds=5) scheduler.start()
示例的输出:
Thu Dec 3 16:01:20 2020 Thu Dec 3 16:01:25 2020 Thu Dec 3 16:01:30 2020 Thu Dec 3 16:01:35 2020 Thu Dec 3 16:01:40 2020 ..........
这个简单的示例,我们用上面提到几种组件分析一下运行逻辑:
内置的三种Trigger触发器类型:
常见的Scheduler调度器:
常见的JobStore:
通过上面一个简单的示例了解大概的工作流程,以及各个组件在整个流程中的作用,以下的示例是Flask Web框架结合使用Apscheduler定时器,定时执行任务。
# -*- coding: utf-8 -*- from flask import Flask, Blueprint, request from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.redis import RedisJobStore import time app = Flask(__name__) executors = {"default": ThreadPoolExecutor(5)} default_redis_jobstore = RedisJobStore(db=2, jobs_key="apschedulers.default_jobs", run_times_key="apschedulers.default_run_times", host = '127.0.0.1', port = 6379 ) scheduler = BackgroundScheduler(executors=executors) scheduler.add_jobstore(default_redis_jobstore) scheduler.start() def say_hello(): print(time.strftime("%c")) @app.route("/get_job", methods=['GET']) def get_job(): if scheduler.get_job("say_hello_test"): return "YES" else: return "NO" @app.route("/start_job", methods=["GET"]) def start_job(): if not scheduler.get_job("say_hello_test"): scheduler.add_job(say_hello, "interval", seconds=5, id="say_hello_test") return "Start Scuessfully!" else: return "Started Failed" @app.route("/remove_job", methods=["GET"]) def remove_job(): if scheduler.get_job("say_hello_test"): scheduler.remove_job("say_hello_test") return "Delete Successfully!" else: return "Delete Failed" if __name__ == "__main__": app.run(host="127.0.0.1", port=8787, debug=True)
最后总结一下,首先你要设置一个作业存储器用于在调度程序崩溃重新恢复时,还能够在作业存储器中获取到作业继续执行;然后你需要设置一个执行器,这个根据作业的类型,比如时一个CPU密集型的任务,那就可以用进程池执行器,默认是用线程池执行器;最后创建配置调度器,启动调度,可以在启动前添加作业,也可以在启动后添加,删除,获取作业。(在这里需要明白的一点就是应用程序不会直接去操作作业存储器,作业或者执行器,而是调度器提供适当的接口来处理这些接口。)
ApScheduler是一个不错的定时任务库,能够动态的添加删除,同时也支持不同的触发器类型,这也是它的优势,相反一些如果是静态任务,其实可以用如linux的crontab工具去做定时任务。有关这方面的记录还会持续更新,如果有什么问题,可以提出来,大家一起探讨。
以上就是python Apscheduler的使用方法的详细内容,更多关于python Apscheduler的资料请关注其它相关文章!