我发现需要用先到先得的方式来调度一些简单的功能,所以我开发了一个基于多处理的小型任务调度器,我认为这将是我在这里的第一个问题的一个很好的候选。
我对所有的评论开放,关于风格,约定,可能的错误和其他我现在没有考虑的事情。我为最合理的机制添加了一些评论,但这主要是半天的代码。
from multiprocessing import Process
from multiprocessing import JoinableQueue
def worker_process(in_queue, out_queue):
run = True
while run:
# in the queue there should be a Job or None.
# Latter case is the signal that we should stop.
task = in_queue.get()
if task:
task.run()
in_queue.task_done()
# when done, we signal and we put the task id in the output queue
out_queue.put(task.get_id())
else:
run = False
class Scheduler:
def __init__(self, job_done_cb, cb_args=[], N=2):
self.n = N
self.processes = []
self.job_queue = JoinableQueue()
self.done_queue = JoinableQueue()
self.job_done_cb = job_done_cb
self.cb_args = cb_args
for i in range(self.n):
# these workers will get jobs from the queue and give us back the id when done
p = Process(target=worker_process, args=(self.job_queue, self.done_queue))
self.processes.append(p)
# there process will get back the job of the id done and execute the callback
self.cb_process = Process(target=self.on_job_done)
def start(self):
for p in self.processes:
p.start()
self.cb_process.start()
def stop(self):
# add the necessary None to the queue and wait for the processes to be done
# this will make the queue execute the remaining task before finishing
for i in range(self.n):
self.job_queue.put(None)
self.job_queue.close()
for p in self.processes:
p.join()
self.done_queue.put(None)
self.cb_process.join()
def add_task(self, task_id, task, arg_list):
job = Job(task, task_id, arg_list)
self.job_queue.put(job)
def wait_for_idle(self):
self.job_queue.join()
def on_job_done(self):
run = True
while run:
# again, if the id is not None then we should go on waiting for stuff
done = self.done_queue.get()
if done:
self.job_done_cb(self, done, *self.cb_args)
else:
run = False
class Job:
def __init__(self, task, job_id, arg_list):
self.task = task
self.id = job_id
self.arg_list = arg_list
def run(self):
self.task(*self.arg_list)
def get_id(self):
return self.id发布于 2019-10-02 22:58:04
for i in range(self.n):
self.job_queue.put(None)可以这样写
for _ in range(self.n):
self.job_queue.put(None)任何不使用循环变量的地方。_让您和其他程序员知道这个变量没有被使用,应该被忽略。
p应该是描述性的process。虽然p in self.processes是显而易见的,但是在描述变量名称的实践中获得它是很好的。
这
def add_task(self, task_id, task, arg_list):可以(假设task是str,arg_list是list)
def add_task(self, task_id: int, task: str, arg_list: list) -> None:使用类型提示,您可以看到/显示接受的参数类型,以及函数返回的类型(S)。
发布于 2019-10-02 23:35:20
这个循环:
run = True
while run:
# ...应该删除run变量,而当前分配它的位置应该是break。使循环成为while True。
换句话说,
while True:
# in the queue there should be a Job or None.
# Latter case is the signal that we should stop.
task = in_queue.get()
if task:
task.run()
in_queue.task_done()
# when done, we signal and we put the task id in the output queue
out_queue.put(task.get_id())
else:
break您的参数N应该是n,因为它既不是类也不是常量。
将*args和**kwargs传递给泛型函数(如task )是典型的做法。
https://codereview.stackexchange.com/questions/227576
复制相似问题