run.py
def work(repo,cpuid):
my_tool_subprocess = subprocess.Popen('./scan.py {} {}'.format(repo,cpuid),shell=True, stdout=subprocess.PIPE)
line = True
while line:
myline = my_tool_subprocess.stdout.readline()
if "scan is done" in myline:
break
num = 10 # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
cpuid=1
for repo in repos:
tp.apply_async(work, (repo[0],"1-240"))
print('Runing {} at core {}'.format(repo[0],"1-240"))
tp.close()
tp.join()scan.py
completed = subprocess.run(['git', 'clone', repo],env=my_env)
bunch of other subprocess.run()
# at the end:
print('returncode:', completed.returncode)
print('scan is done')我原本期望活动进程数为10 (10个线程),但不知何故...事实并非如此。它似乎没有等到“扫描完成”,scan.py中的最后一条语句,而是遍历repos列表(for loop)克隆repos列表中的所有repos。重复一遍,它不会等待克隆和处理第一到第十个存储库(维护一个包含10个进程的移动窗口),它只是继续...创建额外的进程和repos克隆。
有人知道这里出了什么问题吗?
发布于 2020-02-01 00:05:42
尝试像这样重构你的代码:
在scan.py中,将所有模块级代码移动到一个函数中,例如:
def run(repo, cpuid):
# do whatever scan.py does given a repo path and cpuid
# instead of printing to stdout, have this return a value如果您仍然关心scan.py是否也具有命令行界面,请添加:
import argparse
def main(argv=None):
parser = argparse.ArgumentParser()
# ... implement command-line argument parsing here
args = parser.parse_args(argv)
value = run(args.repo, args.cpuid)
print(value)
if __name__ == '__main__':
main()现在在你的run.py中做一些类似这样的事情:
import multiprocessing
import scan # maybe give this a more specialized name
def work(args):
repo, cpuid = args
output = scan.run(repo, cpuid)
for line in output.splitlines():
# Do whatever you want to do here...
def main():
repos = ... # You didn't show us where this comes from
pool = multiprocessing.Pool() # Or pass however many processes
pool.map(work, [(r[0], '1-240') for r in repos])
if __name__ == '__main__':
main()就像这样。我在这里想要说明的是,如果你明智地分解你的代码,它将使多处理变得更加简单。然而,这里的一些细节有点固执己见。
https://stackoverflow.com/questions/60007148
复制相似问题