我有这样的代码:
import multiprocessing
import logging
m = [0,1,2,3]
iter_state = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
print "n"
time.sleep(3)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
itst = multiprocessing.Array('i', 3)
def gen(t):
itst[t] = t
multiprocessing.log_to_stderr(logging.DEBUG)
tm = time.time()
job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))
job1.start()
job2.start()
job3.start()
job3.join()
for i in itst:
print i
tm = time.time() - tm
print tm具有以下输出:
退出:
n
n
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] child process calling self.run()
[INFO/Process-2] process shutting down
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
[INFO/Process-3] child process calling self.run()
[INFO/Process-3] process shutting down
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[INFO/Process-3] process exiting with exitcode 0
0
1
2
9.01742887497
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers所以我们可以看到,并行化实际上根本不起作用。
但是,当time.sleep()调用放置在gen()函数中时,我们会看到以下内容:
import multiprocessing
import logging
m = [0,1,2,3]
iter_state = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
print "n"
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
itst = multiprocessing.Array('i', 3)
def gen(t):
time.sleep(3)
itst[t] = t
multiprocessing.log_to_stderr(logging.DEBUG)
tm = time.time()
job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))
job1.start()
job2.start()
job3.start()
job3.join()
for i in itst:
print i
tm = time.time() - tm
print tm退出:
n
n
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-2] child process calling self.run()
[INFO/Process-3] child process calling self.run()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-3] process shutting down
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[INFO/Process-3] process exiting with exitcode 0
[INFO/Process-2] process shutting down
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
0
1
2
3.01985812187
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers现在我们可以看到并行化工作得很好!
但不幸的是,只适用于gen() 函数.。
我的问题:
如何实现next()方法内部的并行化工作?
发布于 2015-06-30 14:20:36
好吧,您只需要在next()中调用gen就可以了。现在,在父进程中调用tt.next(),并将返回值传递给子进程。相反,您应该将整个tt.next方法传递给子对象。但是请注意,通过这样做,您需要使gener.c成为一个进程共享变量,并使用一个进程安全锁来保护您增加它的代码部分。
import multiprocessing
import logging
import time
class gener(object):
def __init__(self, m):
self.m = m
self.c = multiprocessing.Value('i', 0)
def __iter__(self):
return self
def next(self):
print "n"
time.sleep(3)
with self.c.get_lock():
ret = self.m[self.c.value]
self.c.value += 1
return ret
def gen(func):
t = func()
itst[t] = t
if __name__ == "__main__":
m = [0,1,2,3]
iter_state = 0
tt = gener(m)
itst = multiprocessing.Array('i', 3)
multiprocessing.log_to_stderr(logging.DEBUG)
tm = time.time()
jobs = [ multiprocessing.Process(target=gen, args=(tt.next,)) for _ in range(3)]
for j in jobs:
j.start()
for j in jobs:
j.join()
for i in itst:
print i
tm = time.time() - tm
print tm输出:
[INFO/Process-3] child process calling self.run()
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-2] child process calling self.run()
n
n
[INFO/Process-2] process shutting down
[INFO/Process-3] process shutting down
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-3] process exiting with exitcode 0
0
1
2
3.02282786369
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizershttps://stackoverflow.com/questions/31136386
复制相似问题