现在,我正在使用subprocess在后台运行一个长期运行的作业。由于多种原因(PyInstaller + AWS ),我不能再使用子进程了。
是否有一种简单的方法可以实现以下相同的目标?在多进程池(或其他什么地方)中运行一个长时间运行的python函数,并对stdout/stderr进行实时处理?
import subprocess
process = subprocess.Popen(
["python", "long-job.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)
while True:
out = process.stdout.read(2000).decode()
if not out:
err = process.stderr.read().decode()
else:
err = ""
if (out == "" or err == "") and process.poll() is not None:
break
live_stdout_process(out)谢谢
发布于 2022-10-14 16:16:01
跨越平台是很麻烦的..。首先,windows实现的非阻塞管道并不是用户友好或便携的。
一种选择是让应用程序读取其命令行参数并有条件地执行文件,这样就可以使用子进程,因为您将使用不同的参数来启动自己。
但是为了保持它的多处理性:
输出必须记录到队列,而不是管道。__main__.
runpy来执行文件,因为initializer.
runpy函数应该在多处理子程序下运行,这个子函数必须首先重定向它的stdout和stderr在initializer.
把这一切结合在一起:
import multiprocessing
from multiprocessing import Queue
import sys
import concurrent.futures
import threading
import traceback
import runpy
import time
class StdoutQueueWrapper:
def __init__(self,queue:Queue):
self._queue = queue
def write(self,text):
self._queue.put(text)
def flush(self):
pass
def function_to_run():
# runpy.run_path("long-job.py",run_name="__main__") # run long-job.py
print("hello") # print something
raise ValueError # error out
def initializer(stdout_queue: Queue,stderr_queue: Queue):
sys.stdout = StdoutQueueWrapper(stdout_queue)
sys.stderr = StdoutQueueWrapper(stderr_queue)
def thread_function(child_stdout_queue,child_stderr_queue):
with concurrent.futures.ProcessPoolExecutor(1, initializer=initializer,
initargs=(child_stdout_queue, child_stderr_queue)) as pool:
result = pool.submit(function_to_run)
try:
result.result()
except Exception as e:
child_stderr_queue.put(traceback.format_exc())
if __name__ == "__main__":
child_stdout_queue = multiprocessing.Queue()
child_stderr_queue = multiprocessing.Queue()
child_thread = threading.Thread(target=thread_function,args=(child_stdout_queue,child_stderr_queue),daemon=True)
child_thread.start()
while True:
while not child_stdout_queue.empty():
var = child_stdout_queue.get()
print(var,end='')
while not child_stderr_queue.empty():
var = child_stderr_queue.get()
print(var,end='')
if not child_thread.is_alive():
break
time.sleep(0.01) # check output every 0.01 seconds请注意,作为多进程运行的一个直接后果是,如果子进程遇到分段错误或某个不可恢复的错误,父进程也将死亡,如果期望分段错误,则在子进程下运行您自己似乎是一个更好的选择。
https://stackoverflow.com/questions/74070276
复制相似问题