我需要迭代一个列表,并为每个项目运行一个耗时的操作,然后将其结果收集到一个映射中,如下所示:
List<String> strings = ['foo', 'bar', 'baz']
Map<String, Object> result = strings.collectEntries { key ->
[key, expensiveOperation(key)]
}所以我的结果是
[foo: <an object>, bar: <another object>, baz: <another object>]因为我需要做的操作很长,而且彼此不依赖,所以我愿意研究一下使用GPars来并行运行循环。
然而,GPars有一个并行遍历集合的collectParallel方法,它收集到一个列表,而不是一个收集到映射的collectEntriesParallel:使用GPars做这件事的正确方法是什么?
发布于 2018-03-02 01:35:48
没有collectEntriesParallel,因为它必须产生与以下内容相同的结果:
collectParallel {}.collectEntries {}作为Tim mentioned in the comment。除了并行地将结果收集到列表中,并最终以顺序方式收集到映射条目之外,很难以确定性的方式创建要映射的值列表(或任何其他可变容器)。考虑下面的顺序示例:
static def expensiveOperation(String key) {
Thread.sleep(1000)
return key.reverse()
}
List<String> strings = ['foo', 'bar', 'baz']
GParsPool.withPool {
def result = strings.inject([:]) { seed, key ->
println "[${Thread.currentThread().name}] (${System.currentTimeMillis()}) seed = ${seed}, key = ${key}"
seed + [(key): expensiveOperation(key.toString())]
}
println result
}在这个例子中,我们使用了Collection.inject(initialValue, closure),它相当于传统的“向左折叠”操作--它从初始值[:]开始,遍历所有的值,并将它们作为键和值添加到初始映射中。在这种情况下,顺序执行大约需要3秒(每个expensiveOperation()休眠1秒)。
控制台输出:
[main] (1519925046610) seed = [:], key = foo
[main] (1519925047773) seed = [foo:oof], key = bar
[main] (1519925048774) seed = [foo:oof, bar:rab], key = baz
[foo:oof, bar:rab, baz:zab]这基本上就是collectEntries()所做的--这是一种简化操作,它的初始值是一个空的映射。
现在让我们看看如果我们尝试并行化它会发生什么--我们将使用injectParallel方法而不是inject:
GParsPool.withPool {
def result = strings.injectParallel([:]) { seed, key ->
println "[${Thread.currentThread().name}] (${System.currentTimeMillis()}) seed = ${seed}, key = ${key}"
seed + [(key): expensiveOperation(key.toString())]
}
println result
}让我们来看看结果是什么:
[ForkJoinPool-1-worker-1] (1519925323803) seed = foo, key = bar
[ForkJoinPool-1-worker-2] (1519925323811) seed = baz, key = [:]
[ForkJoinPool-1-worker-1] (1519925324822) seed = foo[bar:rab], key = baz[[:]:]:[]
foo[bar:rab][baz[[:]:]:[]:][:]:]:[[zab]如你所见,并行版本的inject并不关心顺序(这是预期的),例如,第一线程会将foo作为seed变量接收,而将bar作为键接收。如果对map (或任何可变对象)的缩减是并行执行的,并且没有特定的顺序,那么就会发生这种情况。
解决方案
有两种方法可以并行化进程:
1. collectParallel + collectEntries组合
正如Tim Yates在评论中提到的,您可以并行执行代价高昂的操作,并最终将结果按顺序收集到映射中:
static def expensiveOperation(String key) {
Thread.sleep(1000)
return key.reverse()
}
List<String> strings = ['foo', 'bar', 'baz']
GParsPool.withPool {
def result = strings.collectParallel { [it, expensiveOperation(it)] }.collectEntries { [(it[0]): it[1]] }
println result
}此示例在大约1秒内执行,并生成以下输出:
[foo:oof, bar:rab, baz:zab]2. Java的并行流
或者,您可以使用Java的并行流和Collectors.toMap() reducer函数:
static def expensiveOperation(String key) {
Thread.sleep(1000)
return key.reverse()
}
List<String> strings = ['foo', 'bar', 'baz']
def result = strings.parallelStream()
.collect(Collectors.toMap(Function.identity(), { str -> expensiveOperation(str)}))
println result 此示例也在大约1秒内执行,并生成如下输出:
[bar:rab, foo:oof, baz:zab]希望能有所帮助。
https://stackoverflow.com/questions/49054590
复制相似问题