以下代码偶尔出现错误。如果我只启动一个过程的话,效果会很好。但是我不断地增加进程的数量,可能是11个,然后开始抛出一个错误。
try:
num_workers = int(sys.argv[1])
except:
num_workers = 1
someval = 10
def do_work(in_queue,x):
i = 0
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
if i > 0 :
work.put(i,)
# work.put(i)
return
else:
print "value from work " + line.rstrip('\n')
i = i + 1
if __name__ == "__main__":
manager = Manager()
work = manager.Queue(num_workers)
someval = 20
print " Number of workers is " + str(num_workers)
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work,someval))
p.start()
pool.append(p)
with open("/home/jay/scripts/a.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
x = 0
for p in pool:
p.join()文件/home/jay/script/a.txt有10行。
如果我做了
./x.py 7
Number of workers is 7
value from work 1
value from work 2
value from work 3
value from work 4
value from work 5
value from work 6
value from work 7
value from work 8
value from work 9
value from work 10
x is 0
all done
./x.py 11
Number of workers is 11
value from work 1
value from work 2
value from work 3
value from work 4
value from work 5
value from work 6
value from work 7
value from work 8
value from work 9
value from work 10
Process Process-11:
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "./x.py", line 18, in do_work
line_no, line = item
TypeError: 'int' object is not iterable
x is 0
all done 发布于 2016-05-17 17:26:06
冒犯的行是work.put(i,) in do_work --您将int放入队列中,而int由另一个工作人员读取和解压。
另外,我同意dano的观点,即使用multiprocessing.Pool更容易,也更短。
if __name__ == "__main__":
pool = multiprocessing.Pool(num_workers)
with open("/home/jay/scripts/a.txt") as f:
mapped = pool.map(do_work, f)如果您需要工作人员的i,只需返回它,它将存储在mapped中。
发布于 2016-05-17 17:26:17
问题是work.put(1,)没有做你认为它所做的事情。您打算将一元组(1,)放入队列,但实际上您只是将1放入队列中。如果将该行更改为work.put((1,)),您将看到预期的行为。
有一个具有较大num_workers值的争用条件,允许您的一个子进程在主进程中的1循环用(None,)哨兵值加载完Queue之前将1添加到队列中。使用较小的num_workers值,您可以在任何工作进程将1添加到队列之前完成for循环。
另外,您考虑过使用multiprocessing.Pool,而不是使用Process和Queue手动创建Pool吗?它将大大简化您的代码。
https://stackoverflow.com/questions/37282387
复制相似问题