run_continuously()
方法。然后,您可以在调度程序上调用该方法一次,然后在主线程中执行任何您喜欢的操作,而无需定期调用
run_pending()
按下Ctrl+C组合键时出现的错误不是问题,只是抱怨
sleep
在您手动终止时被中断。如果您想根据某些条件自动退出,可以根据循环中的某些逻辑来执行此操作
例如。
while not terminate:
其中terminate是您设置的一个变量,可能是全局变量,可以通过计划任务更改。
这种基于时间表的模型的许多实用性都是用于长时间重复运行的后台任务。假设您想执行更多的代码,或者您有一些代码需要运行一次,并且可能在您进入
while
循环,或者您希望重复运行它,也可以将其添加到计划中。
一些例子
import schedule
import threading
import time
# this is a class which uses inheritance to act as a normal Scheduler,
# but also can run_continuously() in another thread
class ContinuousScheduler(schedule.Scheduler):
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
# I've extended this a bit by adding self.jobs is None
# now it will stop running if there are no jobs stored on this schedule
while not cease_continuous_run.is_set() and self.jobs:
# for debugging
# print("ccr_flag: {0}, no. of jobs: {1}".format(cease_continuous_run.is_set(), len(self.jobs)))
self.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
# example using this custom scheduler that can be run in a separate thread
your_schedule = ContinuousScheduler()
your_schedule.every().day.do(print)
# it returns a threading.Event when you start it.
halt_schedule_flag = your_schedule.run_continuously()
# you can now do whatever else you like here while that runs
# if your main script doesn't stop the background thread, it will keep running
# and the main script will have to wait forever for it
# if you want to stop it running, just set the flag using set()
halt_schedule_flag.set()
# I've added another way you can stop the schedule to the class above
# if all the jobs are gone it stops, and you can remove all jobs with clear()
your_schedule.clear()
# the third way to empty the schedule is by using Single Run Jobs only
# single run jobs return schedule.CancelJob
def job_that_executes_once():
# Do some work ...
print("I'm only going to run once!")
return schedule.CancelJob
# using a different schedule for this example to avoid some threading issues
another_schedule = ContinuousScheduler()
another_schedule.every(5).seconds.do(job_that_executes_once)
halt_schedule_flag = another_schedule.run_continuously()
我会注意到您是否真的需要为此使用线程-如果您只是希望程序在完成一次作业后退出,那么您需要做的只是:
while schedule.jobs:
schedule.run_pending()
time.sleep(1)
CancelJob
. 不过,我们已经在repl中进行了测试,希望该示例对修补有用。它和一切都应该与Python 3一起工作。