我想创建一个类似于以下内容的Elixir代码:
def infinite_loop(created_workers \\ []) do
case next_from_queue do
{:ok, queue_msg} ->
new_worker = Task.async(fn -> crawling(queue_msg) end)
infinite_loop([new_worker | created_workers])
{:error, :empty} ->
created_workers.map(&Task.await/1)
end
end假设:
crawling函数将创建另一个3 Taskcrawling worker可以花费3秒的时间运行。queue可能有数百万条消息我如何才能知道并行过程的极限是什么?我怎么才能不打破它呢?
发布于 2017-12-07 15:02:59
为此,我建议使用Task.async_stream。Task.async_stream允许您并行处理流,同时限制并行运行的任务数量。虽然Erlang 20中进程数的默认限制是262144,但是如果您正在爬行一个站点,您可能需要一个更低的限制。
可以使用Stream.iterate从不断返回新项的函数创建流。
stream =
Stream.iterate(next_from_queue(), fn _ -> next_from_queue() end)
|> Stream.take_while(fn {:ok, _} -> true; {:error, :empty} -> false end)因为您想在{:error, :empty}停下来,所以我们使用Stream.take_while来停止流。
然后像这样使用Task.async_stream:
stream
|> Task.async_stream(fn {:ok, queue_msg} ->
crawling(queue_msg)
end, max_concurrency: 16)这将以最多16个任务并行运行流。最终结果将是crawling(queue_msg)的所有返回值的列表。
https://stackoverflow.com/questions/47697675
复制相似问题