大家好,我是一个初学者,正在研究隐式和显式并发的不同变体,我发现自从Java 8并行流被引入以来,它允许并行并发,但我实际上并不理解编译器是如何处理这一点的!流是否被分解为单独的线程并并发运行?另外,如果这会导致任何错误或问题,比如使用显式并发时会出现的问题,该怎么办?
发布于 2016-01-19 21:12:10
自从引入了Java8并行流之后,我就看到了
,它允许并行并发,但我实际上并不理解编译器是如何处理这个问题的!流是否被分解为单独的线程并并发运行?
我不是100%确定,但是AFAIK并行流使用JavaSE7中引入的fork/join framework。
通过研究这个框架,您可能会更好地理解并行流是如何在内部工作的。
发布于 2016-01-19 22:07:18
自从引入了Java8并行流之后,我就看到了
,它允许并行并发,但我实际上并不理解编译器是如何处理这个问题的!
javac编译器不知道。parallelStream()是库中的一个方法,编译器不知道它做了什么。
是否将数据流分解为单独的线程并并发运行?
Streams库尝试将工作分解为任务(每个CPU大约2个),并使用ForkJoinPool.commonPool()执行这些任务
如果这会导致任何错误或问题,比如使用显式并发时会出现的问题,该怎么办?
如果抛出一个异常,它会被捕获并重新抛出,就好像它是在当前线程中抛出的一样。
你可以试试
IntStream.range(0, 16).parallel()
.forEach(i -> System.out.println(Thread.currentThread()));
IntStream.range(0, 16).parallel()
.forEach(i -> {
if (Thread.currentThread().getName().endsWith("-2"))
throw new RuntimeException();
Thread.yield();
});打印
Thread[main,5,main]
Thread[main,5,main]
Thread[main,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-11,5,main]
Thread[ForkJoinPool.commonPool-worker-13,5,main]
Thread[ForkJoinPool.commonPool-worker-9,5,main]
Thread[ForkJoinPool.commonPool-worker-13,5,main]
Thread[ForkJoinPool.commonPool-worker-9,5,main]
Thread[ForkJoinPool.commonPool-worker-13,5,main]
Thread[ForkJoinPool.commonPool-worker-8,5,main]
Thread[ForkJoinPool.commonPool-worker-9,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
Thread[ForkJoinPool.commonPool-worker-4,5,main]
Thread[ForkJoinPool.commonPool-worker-4,5,main]
Thread[main,5,main]
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.IntPipeline.forEach(IntPipeline.java:404)
at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:560)
at B.main(B.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.RuntimeException
at B.lambda$main$1(B.java:21)
at B$$Lambda$2/1452126962.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)https://stackoverflow.com/questions/34877596
复制相似问题