首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >实时多目标监测

实时多目标监测
EN

Stack Overflow用户
提问于 2022-10-14 13:49:28
回答 1查看 58关注 0票数 0

现在,我正在使用subprocess在后台运行一个长期运行的作业。由于多种原因(PyInstaller + AWS ),我不能再使用子进程了。

是否有一种简单的方法可以实现以下相同的目标?在多进程池(或其他什么地方)中运行一个长时间运行的python函数,并对stdout/stderr进行实时处理?

代码语言:javascript
复制
import subprocess

process = subprocess.Popen(
    ["python", "long-job.py"],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    shell=True,
)

while True:
    out = process.stdout.read(2000).decode()
    if not out:
        err = process.stderr.read().decode()
    else:
        err = ""
    if (out == "" or err == "") and process.poll() is not None:
        break

    live_stdout_process(out)

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-14 16:16:01

跨越平台是很麻烦的..。首先,windows实现的非阻塞管道并不是用户友好或便携的。

一种选择是让应用程序读取其命令行参数并有条件地执行文件,这样就可以使用子进程,因为您将使用不同的参数来启动自己。

但是为了保持它的多处理性:

输出必须记录到队列,而不是管道。__main__.

  • this

  • 您需要子程序来执行python文件,这可以使用runpy来执行文件,因为initializer.

  • when runpy函数应该在多处理子程序下运行,这个子函数必须首先重定向它的stdout和stderr在initializer.

  • when中发生错误,您的主应用程序必须捕获它.但是,如果它忙于读取输出,它将无法等待错误,因此子线程必须启动多进程并等待错误。

  • 主进程必须创建队列,启动子线程并读取输出。

把这一切结合在一起:

代码语言:javascript
复制
import multiprocessing
from multiprocessing import Queue
import sys
import concurrent.futures
import threading
import traceback
import runpy
import time

class StdoutQueueWrapper:
    def __init__(self,queue:Queue):
        self._queue = queue
    def write(self,text):
        self._queue.put(text)
    def flush(self):
        pass

def function_to_run():
    # runpy.run_path("long-job.py",run_name="__main__")  # run long-job.py
    print("hello")  # print something
    raise ValueError  # error out

def initializer(stdout_queue: Queue,stderr_queue: Queue):
    sys.stdout = StdoutQueueWrapper(stdout_queue)
    sys.stderr = StdoutQueueWrapper(stderr_queue)

def thread_function(child_stdout_queue,child_stderr_queue):
    with concurrent.futures.ProcessPoolExecutor(1, initializer=initializer,
                                                initargs=(child_stdout_queue, child_stderr_queue)) as pool:
        result = pool.submit(function_to_run)
        try:
            result.result()
        except Exception as e:
            child_stderr_queue.put(traceback.format_exc())


if __name__ == "__main__":
    child_stdout_queue = multiprocessing.Queue()
    child_stderr_queue = multiprocessing.Queue()

    child_thread = threading.Thread(target=thread_function,args=(child_stdout_queue,child_stderr_queue),daemon=True)
    child_thread.start()

    while True:
        while not child_stdout_queue.empty():
            var = child_stdout_queue.get()
            print(var,end='')
        while not child_stderr_queue.empty():
            var = child_stderr_queue.get()
            print(var,end='')
        if not child_thread.is_alive():
            break
        time.sleep(0.01)  # check output every 0.01 seconds

请注意,作为多进程运行的一个直接后果是,如果子进程遇到分段错误或某个不可恢复的错误,父进程也将死亡,如果期望分段错误,则在子进程下运行您自己似乎是一个更好的选择。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74070276

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档