首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >巨蟒中的TaskScheduler

巨蟒中的TaskScheduler
EN

Code Review用户
提问于 2019-09-06 14:29:57
回答 2查看 88关注 0票数 7

我发现需要用先到先得的方式来调度一些简单的功能,所以我开发了一个基于多处理的小型任务调度器,我认为这将是我在这里的第一个问题的一个很好的候选。

我对所有的评论开放,关于风格,约定,可能的错误和其他我现在没有考虑的事情。我为最合理的机制添加了一些评论,但这主要是半天的代码。

代码语言:javascript
复制
from multiprocessing import Process
from multiprocessing import JoinableQueue

def worker_process(in_queue, out_queue):
    run = True
    while run:
        # in the queue there should be a Job or None.
        # Latter case is the signal that we should stop.
        task = in_queue.get()
        if task:
            task.run()
            in_queue.task_done()
            # when done, we signal and we put the task id in the output queue
            out_queue.put(task.get_id())
        else:
            run = False

class Scheduler:
    def __init__(self, job_done_cb, cb_args=[], N=2):
        self.n = N
        self.processes = []
        self.job_queue = JoinableQueue()
        self.done_queue = JoinableQueue()
        self.job_done_cb = job_done_cb
        self.cb_args = cb_args
        for i in range(self.n):
            # these workers will get jobs from the queue and give us back the id when done
            p = Process(target=worker_process, args=(self.job_queue, self.done_queue))
            self.processes.append(p)
        # there process will get back the job of the id done and execute the callback
        self.cb_process = Process(target=self.on_job_done)

    def start(self):
        for p in self.processes:
            p.start()
        self.cb_process.start()

    def stop(self):
        # add the necessary None to the queue and wait for the processes to be done
        # this will make the queue execute the remaining task before finishing
        for i in range(self.n):
            self.job_queue.put(None)
        self.job_queue.close()
        for p in self.processes:
            p.join()
        self.done_queue.put(None)
        self.cb_process.join()

    def add_task(self, task_id, task, arg_list):
        job = Job(task, task_id, arg_list)
        self.job_queue.put(job)

    def wait_for_idle(self):
        self.job_queue.join()

    def on_job_done(self):
        run = True
        while run:
            # again, if the id is not None then we should go on waiting for stuff
            done = self.done_queue.get()
            if done:
                self.job_done_cb(self, done, *self.cb_args)
            else:
                run = False

class Job:
    def __init__(self, task, job_id, arg_list):
        self.task = task
        self.id = job_id
        self.arg_list = arg_list

    def run(self):
        self.task(*self.arg_list)

    def get_id(self):
        return self.id
EN

回答 2

Code Review用户

发布于 2019-10-02 22:58:04

未用循环变量

代码语言:javascript
复制
for i in range(self.n):
    self.job_queue.put(None)

可以这样写

代码语言:javascript
复制
for _ in range(self.n):
    self.job_queue.put(None)

任何不使用循环变量的地方。_让您和其他程序员知道这个变量没有被使用,应该被忽略。

命名

p应该是描述性的process。虽然p in self.processes是显而易见的,但是在描述变量名称的实践中获得它是很好的。

类型提示

代码语言:javascript
复制
def add_task(self, task_id, task, arg_list):

可以(假设taskstrarg_listlist)

代码语言:javascript
复制
def add_task(self, task_id: int, task: str, arg_list: list) -> None:

使用类型提示,您可以看到/显示接受的参数类型,以及函数返回的类型(S)。

票数 2
EN

Code Review用户

发布于 2019-10-02 23:35:20

终止条件

这个循环:

代码语言:javascript
复制
run = True
while run:
    # ...

应该删除run变量,而当前分配它的位置应该是break。使循环成为while True

换句话说,

代码语言:javascript
复制
while True:
    # in the queue there should be a Job or None.
    # Latter case is the signal that we should stop.
    task = in_queue.get()
    if task:
        task.run()
        in_queue.task_done()
        # when done, we signal and we put the task id in the output queue
        out_queue.put(task.get_id())
    else:
        break

变量案例

您的参数N应该是n,因为它既不是类也不是常量。

关键字参数

*args**kwargs传递给泛型函数(如task )是典型的做法。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/227576

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档