例如,是否可以将RecursiveAction与一个虚拟线程池(在我尝试一个设计不佳的定制工作之前) --而不是使用叉/连接池结合使用?
发布于 2022-12-01 11:19:03
RecursiveAction是ForkJoinTask的一个子类,顾名思义,文档甚至从字面上说,
抽象基类,用于在
ForkJoinPool中运行的任务。
虽然ForkJoinPool可以是用线程工厂定制,但它不是标准线程工厂,而是用于生成ForkJoinWorkerThread实例的一个特殊的工厂。因为这些线程是Thread的子类,所以不能用虚拟线程工厂来创建它们。
因此,您不能在虚拟线程中使用RecursiveAction。这同样适用于RecursiveTask。但是,值得重新考虑的是,在虚拟线程中使用这些类会给您带来什么好处。
要将你的任务分解成子任务,主要的挑战在于你。这些类提供的是专门用于处理Fork/Join池和与可用平台线程平衡工作负载的特性。当您想在自己的虚拟线程上执行每个子任务时,您不需要这样做。因此,您可以很容易地使用没有内置类的虚拟线程来实现递归任务。
record PseudoTask(int from, int to) {
public static CompletableFuture<Void> run(int from, int to) {
return CompletableFuture.runAsync(
new PseudoTask(from, to)::compute, Thread::startVirtualThread);
}
protected void compute() {
int mid = (from + to) >>> 1;
if(mid == from) {
// simulate actual processing with potentially blocking operations
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
}
else {
CompletableFuture<Void> sub1 = run(from, mid), sub2 = run(mid, to);
sub1.join();
sub2.join();
}
}
}这个示例只是不关心限制细分或避免阻塞join()调用,而且它在运行时仍然执行得很好,例如,您可能会注意到,如果使用更大的范围,其他递归任务实现中已知的技术在这里也是有用的,因为子任务非常便宜。
例如,您可以将范围的一半提交到另一个线程,并在本地处理另一半,例如
record PseudoTask(int from, int to) {
public static CompletableFuture<Void> run(int from, int to) {
return CompletableFuture.runAsync(
new PseudoTask(from, to)::compute, Thread::startVirtualThread);
}
protected void compute() {
CompletableFuture<Void> f = null;
for(int from = this.from, mid; ; from = mid) {
mid = (from + to) >>> 1;
if (mid == from) {
// simulate actual processing with potentially blocking operations
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
break;
} else {
CompletableFuture<Void> sub1 = run(from, mid);
if(f == null) f = sub1; else f = CompletableFuture.allOf(f, sub1);
}
}
if(f != null) f.join();
}
}这在运行时产生了显著的不同,例如PseudoTask.run(0, 1_000_000).join();,它在第二个示例中只使用100万个线程,而不是使用200万个线程。但是,当然,这是一个与平台线程不同的层次上的讨论,在平台线程中,这两种方法都不能合理地工作。
另一个即将到来的选项是StructuredTaskScope,它允许生成子任务并等待它们完成。
record PseudoTask(int from, int to) {
public static void run(int from, int to) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
new PseudoTask(from, to).compute(scope);
scope.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
protected Void compute(StructuredTaskScope<Object> scope) {
for(int from = this.from, mid; ; from = mid) {
mid = (from + to) >>> 1;
if (mid == from) {
// simulate actual processing with potentially blocking operations
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
break;
} else {
var sub = new PseudoTask(from, mid);
scope.fork(() -> sub.compute(scope));
}
}
return null;
}
}在这里,任务不等待子任务的完成,而只有根任务等待所有任务的完成。但是这个特性处于孵化器状态,因此,可能需要比虚拟线程特性更长的时间,才能使产品就绪。
https://stackoverflow.com/questions/74487536
复制相似问题