让我们从这句话开始:
✗ ~/.pyenv/versions/3.7.11/bin/python demo_multiprocessing.py
<Process(Process-6, started)> is putting 4 in the list
<Process(Process-2, started)> is putting 0 in the list
<Process(Process-4, started)> is putting 2 in the list
<Process(Process-5, started)> is putting 3 in the list
<Process(Process-3, started)> is putting 1 in the list
<Process(Process-7, started)> is putting 5 in the list
all processes finished, here's what we have in the list:
[4, 0, 2, 3, 1, 5]
✗ ~/.pyenv/versions/3.9.11/bin/python demo_multiprocessing.py
<Process name='Process-4' parent=74535 started> is putting 2 in the list
<Process name='Process-5' parent=74535 started> is putting 3 in the list
<Process name='Process-6' parent=74535 started> is putting 4 in the list
<Process name='Process-2' parent=74535 started> is putting 0 in the list
<Process name='Process-7' parent=74535 started> is putting 5 in the list
<Process name='Process-3' parent=74535 started> is putting 1 in the list
all processes finished, here's what we have in the list:
[]下面是代码:
from multiprocessing import Process, current_process, Manager
from typing import List
from random import shuffle
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self._manager = manager
self.shared_list = self._manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp
class NullStream(StreamInterface):
def write(self, x:int):
pass
n_processors = 6
def do_work(i):
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass
if __name__ == "__main__":
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
Process(
target=do_work,
args=(i,)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()到底怎么回事?提示:类变量的处理方式似乎发生了变化。如果我在“写”方法中放置一个断点,那么子进程就会认为self.list_stream是缺省值,NullStream()值。但是,我想知道更多的细节,什么改变了,什么是最优雅的,习惯的方式来解决这个问题-谢谢!
发布于 2022-07-28 15:31:25
以这种方式更改主进程中的类属性不会在子进程启动(当开始方法设置为spawn时)中反映出来。这是因为if __name__...子句中的代码仅在主进程中运行,并且子模块重新导入主模块以复制其父进程的状态。因此,就子类而言,类ShiftPlanner仍然与最初定义的类相同。
一个简单的解决方法是将ListStream对象传递给每个子进程,并让它们自己设置内存空间的类属性。然而,这样做的会要求你不要把经理藏在里面 ListStream。
示例代码,其工作方式为:
from multiprocessing import Process, current_process, Manager
from typing import List
from random import shuffle
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self.shared_list = manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp
class NullStream(StreamInterface):
def write(self, x:int):
pass
n_processors = 6
def do_work(i, x):
ShiftPlanner.list_stream = x
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass
if __name__ == "__main__":
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
Process(
target=do_work,
args=(i, x)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()发布于 2022-07-28 15:31:25
这个问题实际上与类变量无关,它是默认行为在MacOS上从叉子到子类的变化:
从医生那里:“在版本3.8中更改:在macOS上,派生启动方法现在是默认的。叉启动方法应该被认为是不安全的,因为它可能导致子进程崩溃。”
代码可以这样修正:
from typing import List
from random import shuffle
from multiprocessing import get_context
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self._manager = manager
self.shared_list = self._manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp
class NullStream(StreamInterface):
def write(self, x:int):
pass
n_processors = 6
def do_work(i):
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass
if __name__ == "__main__":
ctx = get_context('fork')
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
ctx.Process(
target=do_work,
args=(i,)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()https://stackoverflow.com/questions/73154925
复制相似问题