我最近发现了一个bug,其中
StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.limit(20)调用Spliterator.ofInt.tryAdvance超过20次。当我将其更改为
StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.sequential()
.limit(20)问题就消失了。这一切为什么要发生?当tryAdvance有副作用时,除了在Spliterator中构建一个之外,有什么方法可以实现对并行流的严格限制吗?(这是为了测试一些返回无限流的方法,但是测试需要到达最终的终点,而不需要复杂的“X毫秒循环”构造。)
发布于 2017-10-04 16:52:33
关于limit和trySplit应该如何交互,似乎有一个根本性的误解。假设不应该有比指定的limit更多的trySplit调用是完全错误的。
trySplit的目的是将源数据分成两部分,在最好的情况下分成两半,因为trySplit应该尝试平衡拆分。因此,如果您有一个包含一百万个元素的源数据集,则成功的拆分将生成两个各包含50万个元素的源数据集。这与您可能应用于流的limit(20)完全无关,除非我们事先知道,如果拆分器具有SIZED|SUBSIZED特征,我们可以删除第二个数据集,因为请求的前20个元素只能在前50万个元素中找到。
很容易计算出,在最好的情况下,即使用平衡拆分,我们已经需要15次拆分操作,每次删除上半部分,然后我们才能得到前20个元素之间的拆分,从而允许我们并行处理前20个元素。
这一点很容易演示:
class DebugSpliterator extends Spliterators.AbstractIntSpliterator {
int current, fence;
DebugSpliterator() {
this(0, 1_000_000);
}
DebugSpliterator(int start, int end) {
super(end-start, ORDERED|SIZED|SUBSIZED);
current = start;
fence = end;
}
@Override public boolean tryAdvance(IntConsumer action) {
if(current<fence) {
action.accept(current++);
return true;
}
return false;
}
@Override public OfInt trySplit() {
int mid = (current+fence)>>>1;
System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");
return mid>current? new DebugSpliterator(current, current=mid): null;
}
}StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});在我的机器上,它打印:
trySplit() [0, 500000, 1000000]
trySplit() [0, 250000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [0, 62500, 125000]
trySplit() [0, 31250, 62500]
trySplit() [0, 15625, 31250]
trySplit() [0, 7812, 15625]
trySplit() [0, 3906, 7812]
trySplit() [0, 1953, 3906]
trySplit() [0, 976, 1953]
trySplit() [0, 488, 976]
trySplit() [0, 244, 488]
trySplit() [0, 122, 244]
trySplit() [0, 61, 122]
trySplit() [0, 30, 61]
trySplit() [0, 15, 30]
trySplit() [15, 22, 30]
trySplit() [15, 18, 22]
trySplit() [15, 16, 18]
trySplit() [16, 17, 18]
trySplit() [0, 7, 15]
trySplit() [18, 20, 22]
trySplit() [18, 19, 20]
trySplit() [7, 11, 15]
trySplit() [0, 3, 7]
trySplit() [3, 5, 7]
trySplit() [3, 4, 5]
trySplit() [7, 9, 11]
trySplit() [4, 4, 5]
trySplit() [9, 10, 11]
trySplit() [11, 13, 15]
trySplit() [0, 1, 3]
trySplit() [13, 14, 15]
trySplit() [7, 8, 9]
trySplit() [1, 2, 3]
trySplit() [8, 8, 9]
trySplit() [5, 6, 7]
trySplit() [14, 14, 15]
trySplit() [17, 17, 18]
trySplit() [11, 12, 13]
trySplit() [12, 12, 13]
trySplit() [2, 2, 3]
trySplit() [10, 10, 11]
trySplit() [6, 6, 7]当然,这远远超过20次拆分尝试,但完全合理,因为数据集必须拆分,直到我们拥有所需目标范围内的子范围,才能并行处理它。
我们可以通过删除导致此执行策略的元信息来强制执行不同的行为:
StreamSupport.stream(new DebugSpliterator(), true)
.filter(x -> true)
.limit(20)
.forEach(x -> {});由于Stream API不了解谓词的行为,因此管道失去了它的SIZED特征,导致
trySplit() [0, 500000, 1000000]
trySplit() [500000, 750000, 1000000]
trySplit() [500000, 625000, 750000]
trySplit() [625000, 687500, 750000]
trySplit() [625000, 656250, 687500]
trySplit() [656250, 671875, 687500]
trySplit() [0, 250000, 500000]
trySplit() [750000, 875000, 1000000]
trySplit() [250000, 375000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [250000, 312500, 375000]
trySplit() [312500, 343750, 375000]
trySplit() [125000, 187500, 250000]
trySplit() [875000, 937500, 1000000]
trySplit() [375000, 437500, 500000]
trySplit() [125000, 156250, 187500]
trySplit() [250000, 281250, 312500]
trySplit() [750000, 812500, 875000]
trySplit() [281250, 296875, 312500]
trySplit() [156250, 171875, 187500]
trySplit() [437500, 468750, 500000]
trySplit() [0, 62500, 125000]
trySplit() [875000, 906250, 937500]
trySplit() [62500, 93750, 125000]
trySplit() [812500, 843750, 875000]
trySplit() [906250, 921875, 937500]
trySplit() [0, 31250, 62500]
trySplit() [31250, 46875, 62500]
trySplit() [46875, 54687, 62500]
trySplit() [54687, 58593, 62500]
trySplit() [58593, 60546, 62500]
trySplit() [60546, 61523, 62500]
trySplit() [61523, 62011, 62500]
trySplit() [62011, 62255, 62500]这显示了较少的trySplit调用,但没有改进;查看数字显示,现在处理了结果元素范围之外的范围(如果我们使用我们的知识,所有元素都将通过filer),更糟糕的是,结果元素的范围完全由单个拆分器覆盖,导致结果元素根本没有并行处理,所有其他线程都在处理后来被删除的元素。
当然,我们可以很容易地对任务进行最优拆分。
int mid = (current+fence)>>>1;至
int mid = fence>20? 20: (current+fence)>>>1;所以
StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});结果:
trySplit() [0, 20, 1000000]
trySplit() [0, 10, 20]
trySplit() [10, 15, 20]
trySplit() [10, 12, 15]
trySplit() [12, 13, 15]
trySplit() [0, 5, 10]
trySplit() [15, 17, 20]
trySplit() [5, 7, 10]
trySplit() [0, 2, 5]
trySplit() [17, 18, 20]
trySplit() [2, 3, 5]
trySplit() [5, 6, 7]
trySplit() [15, 16, 17]
trySplit() [6, 6, 7]
trySplit() [16, 16, 17]
trySplit() [0, 1, 2]
trySplit() [7, 8, 10]
trySplit() [8, 9, 10]
trySplit() [1, 1, 2]
trySplit() [3, 4, 5]
trySplit() [9, 9, 10]
trySplit() [18, 19, 20]
trySplit() [10, 11, 12]
trySplit() [13, 14, 15]
trySplit() [11, 11, 12]
trySplit() [4, 4, 5]
trySplit() [14, 14, 15]但这不是一个通用的拆分器,而是一个在限制不是20的情况下性能很差的拆分器。
如果我们可以将限制合并到拆分器中,或者更广泛地说,合并到流源中,我们就不会有这个问题。因此,您可以调用list.subList(0, Math.min(x, list.size())).stream(),而不是random.ints().limit(x),使用random.ints(x),而不是Stream.generate(generator).limit(x)。您可以使用LongStream.range(0, x).mapToObj( index -> generator.get())或this answer的工厂方法,而不是list.stream().limit(x)。
对于任意的流源/拆分器,对于并行流(即even documented )来说,应用limit是非常昂贵的。嗯,在trySplit中有副作用首先不是一个好主意。
发布于 2017-10-03 16:51:38
我不认为这是一个错误,但仍然是一个非常有趣的想法,即tryAdvance可能会有副作用。
据我所知,当您的trySplit没有拆分成单个元素批处理时,这是完全可能的。
例如,您有一个数组,并且希望(通过trySplit)将其拆分为不少于4个元素的子数组部分。在这种情况下,当你不能再拆分时(例如,你在当前Spliterator中已经达到最少4个元素),当处理开始时- forEachRemaning将被调用;反过来,它将默认为当前Spliterator中的每个元素调用tryAdvance,如默认实现所示:
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}显然,因为你是并行工作的--一旦线程启动了它的工作(读取executing it's forEachRemaning),它就不能再停止了--所以更多的元素将命中tryAdvance。
因此,我真的认为除了将其集成到Spliterator本身之外,没有其他方法可以做到这一点;我认为这应该是可行的:
static class LimitingSpliterator<T> implements Spliterator<T> {
private int limit;
private final Supplier<T> generator;
private LimitingSpliterator(Supplier<T> generator, int limit) {
this.limit = limit;
this.generator = generator;
}
static <T> LimitingSpliterator<T> of(Supplier<T> supplier, int limit) {
return new LimitingSpliterator<>(supplier, limit);
}
@Override
public boolean tryAdvance(final Consumer<? super T> consumer) {
Objects.requireNonNull(consumer);
if (limit > 0) {
--limit;
generator.get();
consumer.accept(generator.get());
return true;
}
return false;
}
@Override
public void forEachRemaining(final Consumer<? super T> consumer) {
while (limit > 0) {
consumer.accept(generator.get());
--limit;
}
}
@Override
public LimitingSpliterator<T> trySplit() {
int half = limit >> 2;
limit = limit - half;
return new LimitingSpliterator<>(generator, half);
}
@Override
public long estimateSize() {
return limit << 2;
}
@Override
public int characteristics() {
return SIZED;
}
}发布于 2018-08-14 04:19:52
对于我的用例,解决方案是使用: PRNG NB:这是用于来自LongStream.range(0, streamSize).unordered().parallel().mapToInt(ignored -> nextInt())的随机数流,这些随机数可能会不断地重新播种。
https://stackoverflow.com/questions/46535831
复制相似问题