首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在一个时间段内启动/停止Python函数(例如。从上午10点到晚上12点30分)?

如何在一个时间段内启动/停止Python函数(例如。从上午10点到晚上12点30分)?
EN

Stack Overflow用户
提问于 2016-12-22 18:54:51
回答 1查看 7.6K关注 0票数 5

我正在尝试创建一个函数(例如def startTime()),它执行另一个函数,如def runFunc(),它每天使用python在上午10点开始,晚上12:30自动停止(或脚本结束)。

示例:startTime(start_time, stop_time,runFunc)

有人能帮我吗?

我试着安排startTime从上午10点到下午12点30分。

代码语言:javascript
复制
import threading
import schedule
import time

def runFunc(interval, innerFunc, iterations = 0):
   if iterations != 1:
      threading.Timer (interval,runFunc, [interval, innerFunc , 0 ]).start ()
   innerFunc ()

def A():
     print "Hello World- A"
def B():
     print "Hello World- B"

我试过但没成功:

代码语言:javascript
复制
def startTime(job):
      schedule.every().day.at("10:00").do(job)
      while True:
           schedule.run_pending()

startTime(runFunc(60,A))
startTime(runFunc(300,B))

( runFunc(60,A)运行良好,但无法安排runFunc从上午10点到下午12点30分

另一种方式

代码语言:javascript
复制
from datetime import datetime, time
now = datetime.now()
now_time = now.time()
now_time
if time(5,27) <= now.time() <= time(5,28):
    runFunc(10,A)

runFunc确实停止了,但是它在结束后继续执行。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-12-23 11:12:01

整个故事是有点复杂,这在很大程度上取决于你真正想用你的脚本。例如,这段代码可以正常工作:

代码语言:javascript
复制
import threading
import schedule
import time
import datetime
import sys
def test():
    print('{} This is a test'.format(datetime.datetime.now())) #this works ok

def exit():
    print('{} Now the system will exit '.format(datetime.datetime.now())) #this works ok
    sys.exit()

schedule.every().day.at("09:57").do(test)
schedule.every().day.at('09:58').do(exit)

while True:
    schedule.run_pending()
    time.sleep(1)

您将在终端中看到“测试消息”,一分钟后您将看到“退出消息”,它实际上终止了脚本。

但是,如果您在上面的函数测试中应用了一些循环,比如:

代码语言:javascript
复制
def test():
    while True: 
        print "This is a test"
        time.sleep(5)

那么脚本就不会退出。实际上,函数退出甚至不会被调用,因为Python被函数测试中的while循环所困,并且将继续进行下去。

计划文档指出,计划作业是按顺序调用的,因此,如果前面的作业没有完成,那么下一个作业实际上不会开始。

我怀疑你的目的是让一种功能在10:00持续运行,而你想强迫在12:30停止这个功能。如果不是这样的话,你的主要功能会在他完成任务后立即退出,你就不需要一个时间框架了。

在这种情况下,为了绕过Python & Schedule的序列化方式,您需要使用线程。

结合来自“如何并行执行作业”部分的计划文档的信息和其他溢出(如如何停止正在运行的线程 )的答案的信息,这个示例在我的pc上与Python2.7一起工作得很好:

代码语言:javascript
复制
import threading
    import schedule
    import time
    import datetime
    import sys

def doit(stop_event, arg):
    while not stop_event.wait(1): 
        #By wait(1) you repeat the loop every 1 sec. 
        #Applying wait(0) , loops run immediatelly until to be stopped by  stop_event
        print ("working on %s" % arg)
    print("Stopping as you wish.")


def startit():
    global pill2kill
    global t
    pill2kill = threading.Event()
    t = threading.Thread(target=doit, args=(pill2kill, "task"))
    t.start()

def stopit():
    global pill2kill
    global t
    pill2kill.set()
    t.join()

#startit() #Manual call for Testing 
#time.sleep(5) #Wait 5 seconds
#stopit() #Manual call for Testing

schedule.every().day.at("12:48").do(startit)
schedule.every().day.at('12:49').do(stopit)

#schedule.every().day.at("12:50").do(startit) #Uncomment this to recall it for testing
#schedule.every().day.at('12:51').do(stopit) #Unocmment this to recall it for testing

while 1:
    schedule.run_pending()
    time.sleep(1)

您还可以查看,以防符合您的需要。

PS:顺便提一句,通过快速查看Python的源代码,整个故事似乎是通过捕获整个脚本并不断地将date.now()与设置为运行作业的日期进行比较的。这个逻辑可以通过几个默认命令和一个无限主循环来连续地比较日期(就像Schedule那样)。

这个职位有一些很好的代码片段来创建您自己的cron作业,但是仅仅为了测试这个简化的脚本也可以在没有外部库的情况下正常工作,在datetime.now处于所需的开始/停止框架内时调用函数测试。

代码语言:javascript
复制
from datetime import datetime
import time

def test():
    global hasrun
    print('{} This is a test'.format(datetime.now()))
    time.sleep(5)
    hasrun=True

year,month,day,hour,minute=2016,12,23,15,55 
hasrun=False
now=datetime.now()

print "Now the time is :", now
jobstart=datetime(year,month,day,hour,minute)
jobstop=datetime(year,month, day,hour,minute+1)
print "Job will run at: ", jobstart
print "Job will finish at: ", jobstop
#print datetime.now() - jobstart
while True:
    while ((datetime.now() > jobstart) and (datetime.now() < jobstop )): 
        test()
    else:
        print('{} Please Wait...'.format(datetime.now()))
        if hasrun:
#           day=day+1
            minute=minute+2 #Just for Testing
            jobstart=datetime(year,month,day,hour,minute)
            jobstop=datetime(year,month, day,hour,minute+1)
            print "the job will run again ", jobstart
            print "and will finish at ", jobstop
            hasrun=False
        time.sleep(5)
票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41289947

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档