首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python -多处理队列在子进程被杀死后没有返回正确的顺序。

Python -多处理队列在子进程被杀死后没有返回正确的顺序。
EN

Stack Overflow用户
提问于 2015-06-22 11:46:38
回答 2查看 1.8K关注 0票数 1

我正在迭代一个规则列表(每个规则都是一个大的布尔表达式)。我打算使用Pyeda库来解决这个表达式。步骤是: 1.解析规则,2.转换为BDD表单,3.解决规则。步骤对于我所面临的问题并不重要,它被包装在函数do_big_job下,这个函数需要rule来解决,global Queue (q)借用的是`multiprocessing.Manager‘,而不是一般队列。

我必须超时处理太长时间('time_in‘秒)的规则。do_threading函数接受全局q (Queue)、要在子进程(do_big_job)中运行的函数以及要传递给do_big_jobtimeout_in以控制子进程执行的参数(规则)。

让我感到奇怪的是,当有一个超时和一个子进程因为运行时间太长而被终止时,结果就会出现混乱,即队列中返回的值与传递的规则不匹配,并且属于其他早期规则。

我在这里做错什么了?还有其他方法来做我想做的事吗?

另外,我还有一个问题,当我以线性的方式执行,而不是使用多重处理时,处理每条规则所花费的时间要比在单独的过程中对每条规则的处理时间要长得多。对此有什么解释?

代码语言:javascript
复制
def do_threading(q,function,argument, timeout_in=1):

    # Start function as a process
    p = Process(target=function, args=(argument,q,))
    p.start()
    p.join(.1)

    if p.is_alive():
        # Wait for 'timeout_in' seconds or until process finishes
        p.join(timeout_in)

        # If thread is still active
        if p.is_alive():
            print("running... let's kill it...")

            # Terminate
            p.terminate()
            p.join()
            return False
    return True

def do_big_job(rule, q):
    # Do something with passed argument
    print("Child: ", rule)

    # heavy computation using Pyeda library
    f = expr2bdd(expr(rule))
    count = f.satisfy_count()
    solution=[]
    for i in f.satisfy_all():
        solution.append(i)

    # Putting result in the queue for exchange
    q.put([solution,count])


def main()

    manager = multiprocessing.Manager()
    q = manager.Queue()   # Initializing Queue for data exchange between processes

    solved_parts={}
    timed_out_parts={}

    for rule in rules:   # Iterating over rules and creating process for each rule
        each_rule={}

        #Creating new processes to carry out heavy computation and passing the Queue 'q' for data exchange
        processed = do_threading( q, do_big_job, rule, timeout_in=1) 

        if processed:
            r = q.get()  # Getting result from the queue

            each_rule["solution"] = r[0]
            each_rule["solution_count"] = r[1]
            each_rule["count_unique_var"]=count_unique_variables(rule)

        else:

            each_rule["solution"] = "None"
            each_rule["solution_count"] = "None"
            each_rule["count_unique_var"]=count_unique_variables(rule)

        # Putting results in 2 types of lists
        if each_rule["solution"]=="None":
            timed_out_parts[part_num]=each_rule.copy()
        else:
            solved_parts[part_num]=each_rule.copy()

main()
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-06-24 10:38:21

由于各种原因,我不得不对您的代码做了很多更改。

有些名称没有像part_num那样定义。

我没有使用实际的Pyeda图书馆。多处理的解决方案是通用的,工作流程中实际发生的事情与处理进程之间的数据流无关。

我也没有试图猜测从哪里导入expr

因此,有些函数是模拟的,但它们与理解并行计算无关。

因此,将对模拟进行注释,虚拟输入数据也是如此。

代码的主要问题是,您希望启动工作人员并在一个循环中收集结果。当您使用线程或多处理时,请忘记它,因为工作人员返回的数据顺序基本上是未定义的。因此,员工有责任提供明确的信息,说明它正在根据哪条规则与结果一起工作。

另一个很大的区别是,我实际上在一开始就启动了所有的工人,这使得计算实际上是并行的。然后我会收集新的结果。每当队列被删除时,我就检查是否所有的工作人员都已经返回了他们的出口代码,这是一个明确的信息,任何有趣的事情都不会发生。

主流程不负责安排工人的时间安排。工作人员在SIGALRM超时后终止自己。我这样做是因为主进程没有关于工作进程何时进入Python代码入口点的可靠信息。

最后一件事是,我根据timed_out_parts中缺少的结果填充solved_parts

代码语言:javascript
复制
from multiprocessing import Process, Manager
from multiprocessing.queues import Empty as QueueEmpty
from signal import alarm

# Following imports are only needed to mock some function
from time import sleep
from collections import namedtuple
import random

# Mock for `expr()`
def expr(rule):
    return rule

# Mock for `expr2bdd()` - sleeps randomly simulating heavy computation
def expr2bdd(expression):
    sleep(random.randint(0, 9))
    satisfied = [n for n in xrange(random.randint(0, 5))]
    def satisfy_count():
        return len(satisfied)
    def satisfy_all():
        return satisfied
    Evaluation = namedtuple('Evaluation', ('satisfy_count', 'satisfy_all'))
    return Evaluation(satisfy_count=satisfy_count, satisfy_all=satisfy_all)

# Mock for `count_unique_variables()`
def count_unique_variables(arg):
    return random.randint(0, 9)

# This function is executed in separate process - does the actual computation
def evaluate_rule(queue, part_num, rule, timeout):
    alarm(timeout)
    print 'Part: {}, Rule: {}'.format(part_num, rule)
    evaluation = expr2bdd(expr(rule))
    count = evaluation.satisfy_count()
    solution=[]
    for i in evaluation.satisfy_all():
        solution.append(i)
    queue.put([part_num, solution, count])

# Main function which starts workers and collects results
def evaluate_rules(rules, timeout=5):
    manager = Manager()
    queue = manager.Queue()
    solved_parts = {}
    processes = []
    for part_num, rule in enumerate(rules):
        process = Process(target=evaluate_rule, args=(queue, part_num, rule, timeout))
        process.start()
        processes.append(process)
    while True:
        try:
            result = queue.get_nowait()
        except QueueEmpty:
            if all((process.exitcode is not None for process in processes)):
                break
        solved_parts[result[0]] = {
            'solution': result[1],
            'solution_count': result[2],
            'count_unique_var': count_unique_variables(rule)
        }
    timed_out_parts = {
        part_num: {
            'solution': None,
            'solution_count': None,
            'count_unique_var': count_unique_variables(rule)
        }
        for part_num, rule in enumerate(rules) if part_num not in solved_parts
    }
    return solved_parts, timed_out_parts

# Initialize `random generator` - only for mocks
random.seed()

# Dummy rules
rules = [i for i in xrange(50)]

# Fun starts here
solved_parts, timed_out_parts = evaluate_rules(rules)

# You definitely want to do something more clever with the results than just printing them
print solved_parts
print timed_out_parts

至于你的第二个问题:这是没有黄金答案的。线性和并行处理时间的差异取决于工作人员实际做了什么。

票数 1
EN

Stack Overflow用户

发布于 2015-06-22 21:11:18

如果不能控制处理每个规则的所有代码,那么单独的Process是实现超时的可靠解决方案。

您可以将规则添加到结果中,以避免担心订单。为了避免损坏公共队列,您可以为每个进程使用一个单独的管道(未经测试):

代码语言:javascript
复制
#!/usr/bin/env python3
from itertools import islice
from multiprocessing import Process, Pipe, cpu_count, freeze_support
from multiprocessing.connection import wait

def do_big_job(rule, conn):
    with conn:
        # compute solution, count for the rule..
        # send the result to the parent process
        conn.send((rule, solution, count))

def main():
    jobs = {} # mapping: connection -> process
    max_workers = cpu_count() # max number of concurrent jobs
    rules = iter(rules) # make an iterator

    for rule in islice(rules, max_workers): # start initial jobs
        add_job(jobs, rule)

    while jobs:
        ready = wait(jobs, timeout)
        if not ready: # timeout and no results are ready
           rotate_job(jobs, rules) # remove old job, add a new one

        for conn in ready: # get results
            try:
                rule, solution, count = conn.recv()
            except EOFError:
                rotate_job(jobs, rules, conn)        
            else:
                print(rule, solution, count)

if __name__ == '__main__':
    freeze_support()
    main()

add_job()中,rotate_job()实现了一个进程池,该池限制并发进程的数量,并允许杀死它的辅助进程:

代码语言:javascript
复制
def add_job(jobs, rule): #XXX create class for the Pool
    r, w = Pipe(duplex=False)
    with w:
        p = Process(target=do_big_job, args=[rule, w], daemon=True)
        p.start() 
    jobs[r] = p

def rotate_job(jobs, rules, conn=None): 
    if conn is None:
       for conn in jobs:
           break

    # start next job
    for rule in rules:
        add_job(jobs, rule)
        break

    # remove timeouted job
    process = jobs.pop(conn)
    if process.is_alive():
        process.terminate()
    process.join() #NOTE: it may hang if `.terminate()` is ignored

池实现为每个作业创建一个新的Process

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30979211

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档