我有一个以下简单的模块。
defmodule Flow.F1 do
def t1 do
["one", "two", "three"]
|> Flow.from_enumerable()
|> Flow.map(&p1(&1))
|> Flow.partition()
|> Enum.to_list()
end
defp p1(item) do
IO.puts("Processing #{item}")
:timer.sleep(2000)
IO.puts("Processed #{item}")
String.upcase(item)
end
end现在,当运行Flow.F1.t1()时,我得到以下结果:
Processing one
Processed one
Processing two
Processed two
Processing three
Processed three
["TWO", "ONE", "THREE"]处理需要6秒,因为它在每个P1调用中等待2秒,但我希望(从文档中) Elixir.map并行处理项目。所以我们应该看到类似这样的东西:
Processing one
Processing two
Processing three
Processed one
Processed two
Processed three并且应该只需要2秒。有人能解释一下我错过了什么吗?
发布于 2021-01-21 22:20:38
按照docs中的说明,Flow处理500个项目的批次。
我认为您的输入数据不足以将其划分为多个消费者,因此只有一个消费者承担了所有工作。如果您将选项max_demand: 1添加到您的Flow.from_enumerable,它将并行使用。
https://stackoverflow.com/questions/65829241
复制相似问题