我试图同时使用ProcessPoolExecutor使用Dynamodb流触发lambda。这是我收到的错误。
当一堆记录(如1000条记录)落入dynamodb(批处理大小为10)时,流触发lambda。我以前使用过ThreadPoolExecutor,它可以工作,但在批处理的10个进程中,只有5-8条记录被处理,其余的是left.Each记录,大约需要50秒才能完成。在ThreadPoolExecutor中跳过其他记录的是AWS的5分钟限制吗?同时,使用ProcessPoolExecutor也会帮助我解决ThreadPoolExecutor的问题?
[Errno 38] Function not implemented: OSError
Traceback (most recent call last):
File "/var/task/ycf_calculator.py", line 464, in main
with ProcessPoolExecutor(max_workers=25) as executor:
File "/var/lang/lib/python3.6/concurrent/futures/process.py", line 390, in __init__
EXTRA_QUEUED_CALLS)
File "/var/lang/lib/python3.6/multiprocessing/context.py", line 102, in Queue
return Queue(maxsize, ctx=self.get_context())
File "/var/lang/lib/python3.6/multiprocessing/queues.py", line 42,in __init__
self._rlock = ctx.Lock()
File "/var/lang/lib/python3.6/multiprocessing/context.py", line 67, in Lock
return Lock(ctx=self.get_context())
File "/var/lang/lib/python3.6/multiprocessing/synchronize.py", line 163, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/var/lang/lib/python3.6/multiprocessing/synchronize.py", line 60, in __init__
unlink_now)
OSError: [Errno 38] Function not implemented发布于 2018-02-07 15:52:51
您正在使用一次调用来处理一批10条记录。每个记录需要大约50秒的时间来处理。没有并行处理,50 * 10 = 500秒。
Lambda的最大执行时间为300秒。Lambda实际上运行在一个处理器/核心上,因此不支持多CPU之间的多处理。(有关更多见解,请参见this question。)
问题是,你不能用Lambda的方式去做。
我建议将现有的Lambda划分为两个不同的Lambda(批处理程序和worker)。
批处理程序Lambda由您的DynamoDB流触发(批处理大小为10,如您的示例所示)。然后,这个Lambda异步地为流中的10个记录中的每个调用worker Lambda 。(重要的是:只为每个工作人员调用传递一个记录。)
worker Lambda独立地接收和处理一个和唯一一个记录。这就是它所能做的。
总之,
1 dynamoDB stream -> 1 batch invocation -> 10 worker invocations它大大简化了。不需要处理多线程或多进程。
你可以免费得到10个CPU核心!
https://stackoverflow.com/questions/48651601
复制相似问题