作为编程新手,我仍然在探索多线程和多线程的概念。
我编写了一个小脚本,它读取文件并将文件复制到多个临时文件夹,并对每个文件夹执行以下操作。
label.
有大约500个文件夹&按顺序处理。如何在这里使用多处理,从而一次并行处理100个文件夹或增加数量。此外,是否有可能跟踪这些过程,并使构建失败,即使一个子进程失败。
我读过多篇关于多重处理的文章,但我无法理解它:
任何指导都会对我有很大帮助,谢谢。
folder1
-- war file
-- metadata
folder 2
-- war file
-- metadata
....
....
folder 500
-- war file
-- metadata代码段
import re, shutil, os
from pathlib import Path
target = "/home/work"
file_path = target + "/file.txt"
dict = {}
count = 1
def commands_to_run_on_each_folder(filetype, tmp_folder):
target_folder = tmp_folder+'/tmp'+str(count)
os.system(<1st command to build the label>)
os.system(<2nd command to build the package>)
<multiple file manipulations, where `filetype` is used and get the required file with right extension>
<curl command to upload it to the Nexus>
#Read the text file and assemble it in a dictionary.
with open(file_path, 'r') as f:
lines = f.read().splitlines()
for i, line in enumerate(lines):
match = re.match(r".*.war", line)
if match:
j = i-1 if i > 1 else 0
for k in range(j, i):
dict[match.string] = lines[k]
#Iterate the dictionary and copy the folder to the temporary folders.
for key, value in dict.items():
os.mkdir(target+'/tmp'+str(count))
shutil.copy(key, target+'/tmp'+str(count))
shutil.copy(value, target+'/tmp'+str(count))
commands_to_run_on_each_folder("war", target)
count += 1OS :Ubuntu18.04内存: 22 GB容器
发布于 2021-09-21 21:18:01
使用concurrent.futures很容易。我已经修改了你的脚本,使之成为:
#!/usr/bin/env python3
import itertools
import concurrent.futures
import logging
import pathlib
import re
import shutil
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)s:%(processName)s:%(message)s"
)
def worker(path1, path2, src, target, logger):
logger.debug("Create dir %s", target)
target.mkdir(exist_ok=True)
logger.debug("Copy files")
shutil.copy(src / path1, target / path1)
shutil.copy(src / path2, target / path2)
logger.debug("Additional commands to run on %s", target)
# TODO: Add actions here
# commands_to_run_on_each_folder(...)
def main():
#Read the text file and assemble it in a dictionary.
tasks = {}
with open("file.txt", 'r') as f:
lines = f.read().splitlines()
for i, line in enumerate(lines):
match = re.match(r".*.war", line)
if match:
j = i-1 if i > 1 else 0
for k in range(j, i):
tasks[match.string] = lines[k]
logger = logging.getLogger()
# src: The directory where this script is
src = pathlib.Path(__file__).parent
with concurrent.futures.ProcessPoolExecutor() as executor:
for taskid, (path2, path1) in enumerate(tasks.items(), 1):
target = pathlib.Path(f"/tmp/dir{taskid}")
# Calls `worker` function with parameters path1, path2, ...
# concurrently
executor.submit(worker, path1, path2, src, target, logger)
if __name__ == "__main__":
main()下面是一个示例输出:
DEBUG:ForkProcess-1:Create dir /tmp/dir1
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir1
DEBUG:ForkProcess-1:Create dir /tmp/dir2
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir2
DEBUG:ForkProcess-1:Create dir /tmp/dir3
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir3
DEBUG:ForkProcess-1:Create dir /tmp/dir4
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir4备注
logging.WARN
os.path
logging而不是print,因为logging在多进程环境
print更方便,submit调用不会等待。这意味着如果函数worker需要很长时间才能运行,submit将立即返回。with构造,执行器将等待所有并发任务在退出之前完成。这就是你想要的。发布于 2021-09-21 14:06:59
对于多处理来说,这不是一个好的目标,但是对于gnu parallel,它是一个很好的目标。
构建是在后台进行的: python只是在调用系统命令。当然,您可以从python并行地进行多个背景os.system调用,但是这个脚本最好作为一个find | parallel范例运行。
我要做的是重写脚本,只处理一个文件夹。那我就会做:
find /path/to/root/folder -type d | parallel --bar -I{} python3 script.py {} \;由于您使用的是ubuntu,您已经拥有了find和parallel。请注意,这是在shell中运行的bash,而不是python。
反对在python中这样做的理由
--jobs N
另一方面,如果您想在python中这样做,这是可能的。
请注意,当前的智慧建议使用subprocess而不是os.system。
https://stackoverflow.com/questions/69270271
复制相似问题