我正在将一些串行处理的python作业转换为使用dask或joblib进行多进程处理。可悲的是,我需要在窗户上工作。
当从IPython内部或命令行使用python调用py文件时,一切都运行良好。
在用cython编译可执行文件时,它不再运行良好:一步一步地增加进程(无限进程和大于请求进程的数量),得到startet并阻塞我的系统。
它有点像多处理炸弹 --当然,我使用了if __name__=="__main__:"来拥有控制块--通过在命令行上运行python调用来获得批准。
我的cython调用是cython --embed --verbose --annotate THECODE.PY,我正在使用gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE进行编译,从而生成了一个windows可执行THECODE.exe。
使用运行良好的其他(单一处理)代码。
对于dask和joblib来说,问题似乎是一样的(这可能意味着,dask的工作方式类似于或基于joblib)。
有什么建议吗?
对于那些对mcve感兴趣的人来说:仅仅从多处理炸弹获取第一段代码并使用上面的cython命令编译它,就会导致一个可执行文件丢失您的系统。(我刚试过:-)
我只是在代码示例中添加一行以显示__name__,从而发现了一些有趣的东西。
import multiprocessing
def worker():
"""worker function"""
print('Worker')
return
print("-->" + __name__ + "<--")
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()在使用python运行这段代码时,它显示了
__main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__(其他产出被抑制)。解释如果决定有效。在cython和编译之后运行可执行文件时,如下所示
__main__
__main__
__main__
__main__
__main__
__main__而且越来越多。因此,对模块的工作人员调用不再像导入那样是masqueraded,因此每个工作人员试图以递归的方式启动五个新的调用。
发布于 2017-11-21 10:37:21
我认为基于提交的错误报告的细节,我可以在这里提供最优雅的解决方案
if __name__ == '__main__':
if sys.argv[0][-4:] == '.exe':
setattr(sys, 'frozen', True)
multiprocessing.freeze_support()
YOURMAINROUTINE()freeze_support()-call在windows上是必需的-参见python多处理文档。
如果仅在python中使用该行运行,则已经可以了。
但是,cython显然不知道其中的一些事情(文档告诉它它是用py2exe、PyInstaller和cx_Freeze测试的)。只有在编译时才能使用setattr-call,因此可以通过文件扩展名进行决策。
发布于 2017-11-17 22:53:10
在启动新的python进程时,multiprocessing-module在Windows上使用spawn-method (这种行为也可以通过使用mp.set_start_method('spawn')在Linux上触发)。
命令行参数在新进程中传递给解释器,因此可以建立与父进程的通信,例如:
python -c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork嵌入式cython模块(或者冻结模块(即用cx_Freeze、py2exe和类似的)模块创建)的问题是,将命令行参数传递给它们更多地对应于
python my_script.py <arguments>也就是说,命令行不是由interpeter自动处理的,而是需要在脚本中处理。
multiprocessing提供了一个名为multiprocessing.freeze_support()的函数,它正确地处理命令行参数,可以按巴斯蒂安的回答中所示的方式使用
if __name__ == '__main__':
# needed for Cython, as it doesn't set `frozen`-attribute
setattr(sys, 'frozen', True)
# parse command line options and execute it if needed
multiprocessing.freeze_support()但是,此解决方案仅适用于Windows,如代码中所示:
def freeze_support(self):
'''Check whether this is a fake forked process in a frozen executable.
If so then run code specified by commandline and exit.
'''
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
from .spawn import freeze_support
freeze_support()有一个错误报告:需要win32以外的支持,可能/不会很快修复.
正如上面的bug报告所解释的那样,仅仅将frozen属性设置为True和直接从multiprocessing.spawn调用freeze_support是不够的,因为信号量跟踪器没有得到正确的处理。
我看到了两个选项:要么用上面的bug报告中的一个尚未发布的补丁来修补您的安装,要么使用下面介绍的自己动手的方法。
这里是这个答案的早期版本,它更“实验性”,但提供了更多的洞察力/细节,并提出了一个解决方案,在某种程度上做自己的风格。
我在linux上,所以我使用mp.set_start_method('spawn')来模拟windows的行为。
在spawn-mode中发生了什么?让我们添加一些sleep,这样我们就可以研究这些过程:
#bomb.py
import multiprocessing as mp
import sys
import time
def worker():
time.sleep(50)
print('Worker')
return
if __name__ == '__main__':
print("Starting...")
time.sleep(20)
mp.set_start_method('spawn') ## use spawn!
jobs = []
for i in range(5):
p = mp.Process(target=worker)
jobs.append(p)
p.start()通过使用pgrep python,我们可以看到最初只有一个python进程,然后是7(!)不同的pid。我们可以通过cat /proc/<pid>/cmdline看到命令行参数。5个新进程具有命令行。
-c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork第一项:
-c "from multiprocessing.semaphore_tracker import main;main(4)"这意味着父进程启动6个新的python解释器实例,每个新启动的解释器通过命令行选项执行从父进程发送的代码,信息通过管道共享。这6个python实例中的一个是跟踪器,它观察整个过程。
好吧,如果是cythonized+embeded会发生什么?与普通的python一样,惟一的区别是启动了bomb-executable而不是python。但是与python解释器不同的是,它不执行/不知道命令行参数,因此main函数一次又一次地运行。
有一个简单的解决方法:让bomb-exe启动python解释器。
...
if __name__ == '__main__':
mp.set_executable(<PATH TO PYTHON>)
....现在,bomb不再是一个多处理炸弹!
但是,我们的目标可能是没有python解释器,所以我们需要让程序知道可能的命令行:
import re
......
if __name__ == '__main__':
if len(sys.argv)==3: # should start in semaphore_tracker mode
nr=list(map(int, re.findall(r'\d+',sys.argv[2])))
sys.argv[1]='--multiprocessing-fork' # this canary is needed for multiprocessing module to work
from multiprocessing.semaphore_tracker import main;main(nr[0])
elif len(sys.argv)>3: # should start in slave mode
fd, pipe=map(int, re.findall(r'\d+',sys.argv[2]))
print("I'm a slave!, fd=%d, pipe=%d"%(fd,pipe))
sys.argv[1]='--multiprocessing-fork' # this canary is needed for multiprocessing module to work
from multiprocessing.spawn import spawn_main;
spawn_main(tracker_fd=fd, pipe_handle=pipe)
else: #main mode
print("Starting...")
mp.set_start_method('spawn')
jobs = []
for i in range(5):
p = mp.Process(target=worker)
jobs.append(p)
p.start()现在,我们的炸弹不需要一个独立的蟒蛇解释器,并停止后,工人完成。请注意以下事项:
bomb的方式并不是非常安全的,但我希望您能得到要点。--multiprocessing-fork只是一只金丝雀,它不做任何事情,它必须在那里,参见这里。注意:修改后的代码也可以与python一起使用,因为在执行"from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork之后,python更改了sys.argv,因此代码不再看到原来的命令行,len(sys.argv)是1。
发布于 2017-11-20 06:36:40
受ead给出的答案(或给出的想法)的启发,我找到了一个非常简单的解决方案--或者让我们把它称为解决办法。
对于我来说,把if子句更改为
if __name__ == '__main__':
if len(sys.argv) == 1:
main()
else:
sys.argv[1] = sys.argv[3]
exec(sys.argv[2])做到了。
这样做的原因是(在我的例子中):当调用原始的..py文件时,工人的__name__被设置为__mp_main__ (但是所有进程都只是普通的..py文件)。
当运行(cython)编译版本时,工人的name是不可用的,但是工作人员会被不同的调用,因此我们可以通过argv中的多个参数来识别它们。在我的例子中,工人的辩论内容是
['MYPROGRAMM.exe',
'-c',
'from multiprocessing.spawn import spawn_main;
spawn_main(parent_pid=9316, pipe_handle =392)',
'--multiprocessing-fork']因此,在argv[2]中,可以找到激活工作人员的代码,并使用上面的命令执行这些代码。
当然,如果您需要编译文件的参数,则需要进行更大的努力,可能需要对调用中的parent_pid进行解析。但就我而言,这简直是言过其实。
https://stackoverflow.com/questions/47325297
复制相似问题