我有下面的代码。
这使用了一个称为装饰器的python模块。
from multiprocessing import Pool
from random import randint
import traceback
import decorator
import time
def test_retry(number_of_retry_attempts=1, **kwargs):
timeout = kwargs.get('timeout', 2.0) # seconds
@decorator.decorator
def tryIt(func, *fargs, **fkwargs):
for _ in xrange(number_of_retry_attempts):
try: return func(*fargs, **fkwargs)
except:
tb = traceback.format_exc()
if timeout is not None:
time.sleep(timeout)
print 'Catching exception %s. Attempting retry: '%(tb)
raise
return tryIt装饰模块帮助我装饰我的数据仓库调用功能。因此,我不需要处理连接删除和各种基于连接的问题,允许我重置连接,并在超时后再试一次。我用这种方法来装饰我所有做数据仓库读取的功能,所以我可以免费重试。
我有以下方法。
def process_generator(data):
#Process the generated data
def generator():
data = data_warhouse_fetch_method()#This is the actual method which needs retry
yield data
@test_retry(number_of_retry_attempts=2,timeout=1.0)
def data_warhouse_fetch_method():
#Fetch the data from data-warehouse
pass我尝试使用这样的多处理模块来处理我的代码。
try:
pool = Pool(processes=2)
result = pool.imap_unordered(process_generator,generator())
except Exception as exception:
print 'Do some post processing stuff'
tb = traceback.format_exc()
print tb 当一切都成功的时候,事情是正常的。同样,当它在重试次数内修复自己时,情况也是正常的。但是,一旦关联数超过,我将在test_retry方法中引发异常,该异常不会被捕获到主进程中。该工艺模具和加工分叉的主要工序被留下孤儿。也许我在这里做错了什么。我正在寻找一些帮助,以解决以下问题。将异常传播到父进程,这样我就可以处理异常并让我的孩子优雅地死去。此外,我还想知道如何通知孩子的过程,以优雅地死去。提前谢谢你的帮助。
编辑:添加了更多的代码来解释。
def test_retry(number_of_retry_attempts=1, **kwargs):
timeout = kwargs.get('timeout', 2.0) # seconds
@decorator.decorator
def tryIt(func, *fargs, **fkwargs):
for _ in xrange(number_of_retry_attempts):
try: return func(*fargs, **fkwargs)
except:
tb = traceback.format_exc()
if timeout is not None:
time.sleep(timeout)
print 'Catching exception %s. Attempting retry: '%(tb)
raise
return tryIt
@test_retry(number_of_retry_attempts=2,timeout=1.0)
def bad_method():
sample_list =[]
return sample_list[0] #This will result in an exception
def process_generator(number):
if isinstance(number,int):
return number+1
else:
raise
def generator():
for i in range(20):
if i%10 == 0 :
yield bad_method()
else:
yield i
try:
pool = Pool(processes=2)
result = pool.imap_unordered(process_generator,generator())
pool.close()
#pool.join()
for r in result:
print r
except Exception, e: #Hoping the generator will catch the exception. But not .
print 'got exception: %r, terminating the pool' % (e,)
pool.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
pool.join()
print 'join complete'
print 'the end'实际的问题归结为如果生成器抛出异常,我无法在pool.imap_unordered()方法包装的except子句中捕获生成器抛出的异常。因此,在抛出异常后,主进程被卡住,子进程将永远等待.Not,确定我在这里做错了什么。
发布于 2015-12-18 10:43:03
我不完全理解在这里共享的代码,因为我不是专家。而且,这个问题已经将近一年了。但我的要求与本主题中所解释的相同。我设法找到了一个解决办法:
import multiprocessing
import time
def dummy(flag):
try:
if flag:
print('Sleeping for 2 secs')
time.sleep(2) # So that it can be terminated
else:
raise Exception('Exception from ', flag) # To simulate termination
return flag # To check that the sleeping thread never returns this
except Exception as e:
print('Exception inside dummy', e)
raise e
finally:
print('Entered finally', flag)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
args_list = [(1,), (0,)]
# call dummy for each tuple inside args_list.
# Use error_callback to terminate the pool
results = pool.starmap_async(dummy, args_list,
error_callback=lambda e, mp_pool=pool: mp_pool.terminate())
pool.close()
pool.join()
try:
# Try to see the results.
# If there was an exception in any process, results.get() throws exception
for result in results.get():
# Never executed cause of the exception
print('Printing result ', result)
except Exception as e:
print('Exception inside main', e)
print('Reached the end')这将产生以下输出:
Sleeping for 2 secs
Exception inside dummy ('Exception from ', 0)
Entered finally 0
Exception inside main ('Exception from ', 0)
Reached the end这几乎是我第一次回答一个问题,所以如果我违反了任何规则或者犯了什么错误,我会提前道歉。
我曾试图做以下工作,但没有成功:
老实说,如果其中一个进程抛出了异常,那么终止同一个池中的所有进程就不是那么困难了。
https://stackoverflow.com/questions/26921134
复制相似问题