我有一组模拟,每组使用MPI并行运行(它们是CFD模拟)。但是,我想用Python创建一个任务池,并并行运行。我使用的多处理库如下:
import itertools
import multiprocessing
def Run_Cases(params):
run_each_mpi_simulation
a = range(1,len(U)); b = range(len(R))
paramlist = list(itertools.product(a,b))
pool = multiprocessing.Pool()
pool.map(Run_Case,paramlist)因此,基本上,代码创建了任务池(模拟实例),并将它们分配给每个处理器来运行。但是,它没有考虑到每个模拟都需要两个处理器,因为每种情况都是并行(MPI)模拟。这在仿真中造成了显著的性能下降。
在此,我想知道是否可以以某种方式定义分配给Python多处理包中每个任务的处理器数量?
如有任何意见,敬请见谅。
亲切问候阿什坎
编辑/更新:
非常感谢你的回答。
我尝试了您的方法,处理器上的性能和工作负载分布有了很大改进,但似乎有两个问题:
1)虽然一切都在继续,但我得到了以下信息
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 1073, in run
self.function(*self.args, **self.kwargs)
File "/usr/local/lib/python2.7/dist-packages/PyFoam/Execution/FoamThread.py", line 86, in getLinuxMem
me=psutil.Process(thrd.threadPid)
File "/usr/local/lib/python2.7/dist-packages/psutil/__init__.py", line 341, in __init__
self._init(pid)
File "/usr/local/lib/python2.7/dist-packages/psutil/__init__.py", line 381, in _init
raise NoSuchProcess(pid, None, msg)
NoSuchProcess: psutil.NoSuchProcess no process found with pid 8880我将非常感谢你的评论。
再次感谢阿什坎
编辑/更新:
我相信这个消息是因为进程的数量少于处理器的列表。由于我有两个案例/模拟,每个使用2个处理器,当我使用超线程时,我有8个处理器,因此得到了信息。它使用4个处理器或具有更大的模拟池来解决。
发布于 2018-01-03 11:33:57
multiprocessing.Pool接受要创建的进程数作为其第一个参数。您可以使用multiprocessing.cpu_count()获取逻辑cpu核心的数量,然后在池中创建一半的进程(因此每个进程获得2个进程)。
multiprocessing.Pool(multiprocessing.cpu_count()/2)这假设您的cpu计数为2,对于几乎所有的cpu来说都是正确的.
请注意,此解决方案不考虑SMT (或超线程),因为multiprocessing.cpu_count()计算逻辑核,因此它可能会报告双倍的物理内核。对于大多数cpu密集型任务SMT是一个性能提升,您运行双倍的任务,但以超过一半的速度,如果您有SMT,您需要决定它是否有利于您的模拟。
最后,您还可以设置每个进程的亲缘关系,以便它只能在两个核心上运行,但是没有直接的标准方法来实现这一点,因为多处理不会公开它打开的进程的PID。下面是一些粗略的完整代码,可以设置每个进程的亲缘关系:
import multiprocessing
import psutil
import itertools
cores_per_process = 2
cpu_count = multiprocessing.cpu_count()
manager = multiprocessing.Manager()
pid_list = manager.list() # trick to find pid of all the processes in the pool
cores_list = range(cpu_count)
it = [iter(cores_list)] * cores_per_process # this is a python trick to group items from the same list together into tuples
affinity_pool = manager.list(zip(*it)) # list of affinity
def Run_Case(params):
self_proc = psutil.Process() # get your own process
if self_proc.pid not in pid_list:
pid_list.append(self_proc.pid) # found new pid in pool
self_proc.cpu_affinity(affinity_pool.pop()) # set affinity from the affinity list but also remove it so other processes can't use the same affinity
#run simulation
a = range(1, len(U))
b = range(len(R))
paramlist = list(itertools.product(a, b))
pool = multiprocessing.Pool(cpu_count/cores_per_process) # decide on how many processes you want
pool.map(Run_Case, paramlist)https://stackoverflow.com/questions/48070703
复制相似问题