假设我有一个Java IntStream,是否有可能将其转换为具有累积和的IntStream?例如,从4,2,6开始的流,.应改为4,6,12,.
更普遍地说,应该如何实现有状态流操作?我觉得这是可能的:
myIntStream.map(new Function<Integer, Integer> {
int sum = 0;
Integer apply(Integer value){
return sum += value;
}
);有一个明显的限制,即这只在顺序流上工作。但是,Stream.map显然需要一个无状态映射函数。我错过了一个Stream.statefulMap或Stream.cumulative操作是对的,还是缺少了Java的点?
例如,将其与Haskell进行比较,在Haskell中,scanl1函数精确地解决了这个示例:
scanl1 (+) [1 2 3 4] = [1 3 6 10]发布于 2015-02-06 01:12:14
你可以用原子序数来做这件事。例如:
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
public class Accumulator {
public static LongStream toCumulativeSumStream(IntStream ints){
AtomicLong sum = new AtomicLong(0);
return ints.sequential().mapToLong(sum::addAndGet);
}
public static void main(String[] args){
LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
sums.forEachOrdered(System.out::println);
}
}这一产出如下:
1
3
6
10我使用了一个Long来存储这些总和,因为完全有可能两个ints加起来超过了Integer.MAX_VALUE,而long有较少的溢出概率。
发布于 2015-02-06 00:11:12
可以通过一个收集器创建一个新的流:
class Accumulator {
public static void accept(List<Integer> list, Integer value) {
list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
}
public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
int total = list1.get(list1.size() - 1);
list2.stream().map(n -> n + total).forEach(list1::add);
return list1;
}
}这被用作:
myIntStream.parallel()
.collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
.stream();希望您可以看到,这个收集器的重要属性是,即使流是并行的,因为合并了Accumulator实例,它也会调整总数。
这显然不如映射操作有效,因为它收集整个流,然后生成一个新流。但这不仅仅是一个实现细节:它是一个必要的功能,因为流可能会被并发处理。
我已经用IntStream.range(0, 10000).parallel()测试过它,它的功能是正确的。
https://stackoverflow.com/questions/28355684
复制相似问题