首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >实时Python流子处理stdout/stderr

实时Python流子处理stdout/stderr
EN

Stack Overflow用户
提问于 2015-03-04 17:16:09
回答 1查看 997关注 0票数 2

我希望生成多个子进程并并行运行它们。我有一个功能,看上去很像这样:

代码语言:javascript
复制
def stream_command(command):
    proc = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    while proc.poll() is None:
        line = proc.stdout.readline()
        sys.stdout.write('[%s]: %s' % (command, line))
    return proc.poll()

然后,我可以使用以下方法并行(大致)运行多个:

代码语言:javascript
复制
def stream_commands(commands):
    threads = []
    for command in commands:
        target = lambda: stream_command(command)
        thread = Thread(target=target)
        thread.start()
        threads.append(thread)
    while True:
        if any(t.is_alive() for t in threads):
            continue
        else:
            break

但是,问题是,在我的stream_command函数中,我阻止了对proc.stdout.readline()的调用。这意味着两件事:首先,如果进程从未写入stdout,则该函数将永远挂起(例如,即使子进程终止)。第二,我不能分别响应进程的stdoutstderr (我必须先阻塞读取一个,然后再阻塞另一个.这不太可能奏效)。我想做的是类似于我用node.js写的东西

代码语言:javascript
复制
def stream_command(command):
    def on_stdout(line):
        sys.stdout.write('[%s]: %s' % (command, line))
    def on_stderr(line):
        sys.stdout.write('[%s (STDERR)]: %s' % (command, line))
    proc = asyncprocess.Popen(shlex.split(command),
            on_stdout=on_stdout,
            on_stderr=on_stderr
    )
    return proc.wait()

当然,asyncprocess是一些虚构的进程模块,它允许我启动子进程并传递stdoutstderr的处理程序函数。

那么,是否有类似于我上面的asyncprocess模块的东西,或者如果没有,有什么简单的方法来异步响应python中子进程的事件呢?

顺便提一下,我应该注意到我使用的是python2.7。似乎有一些东西通过python3的asyncio库,但不幸的是,这里行不通,AFAIK。

EN

回答 1

Stack Overflow用户

发布于 2015-03-04 17:52:33

您可以在每个数据流中使用一个线程来完成此操作。假设您希望stream_commands阻塞,直到所有命令完成为止,您可以这样做:

代码语言:javascript
复制
stdout_lock = threading.Lock()

def pipe_to_stdout(preamble, pipe):
    for line in pipe:
        with stdout_lock:
            sys.stdout.write(preamble + line)

def stream_commands(commands):
    threads = []
    procs = []
    try:
        for command in commands:
            proc = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)
            procs.append(proc)
            out_thread = Thread(target=target, args=('[stdout]: ', proc.stdout)
            err_thread = Thread(target=target, args=('[stderr]: ', proc.stderr)
            out_thread.start()
            err_thread.start()
            threads.append(out_thread)
            threads.append(err_thread)
    finally:
        for proc in procs:
            proc.wait()
        for thread in threads:
            thread.join()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28860720

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档