我正在探索一个很可能是问题类的特殊情况的问题,但是我不知道问题类或适当的术语,所以我不得不使用临时词汇来描述这个问题。一旦我知道了正确的术语,我就会改头换面。
我有一群单身汉A,B,C。单身人士是:
系统尽可能接受并行处理的任务。
每个任务由一系列操作组成,每个操作都要使用其中的一个单例来执行。不同的任务可以按不同的顺序访问不同的单例,任务可能包含操作的循环。
伪码:
void myTask(in1, in2, ...) {
doWithA(() -> {
// use in1, in2, ...
// inspect and/or update A
// set up outputs to be used as inputs for the next action:
outA1 = ...
outA2 = ...
...
});
doWithB(() -> {
// use outA1, outA2, ...
// inspect and/or update B
// set up outputs to be used as inputs for the next action:
outB1 = ...
outB2 = ...
...
});
// Tasks may touch singletons repeatedly, in any order
doWithA(() -> {
// outB1, outB2, ..., inspect/modify A, set up outputs
outAx1 = ...
outAx2 = ...
...
});
// Tasks may have loops:
while (conditionInC(() -> ...) {
doWithC(() -> ...);
doWithD(() -> ...);
}
// I am aware that a loop like this can cause a livelock.
// That's an aspect for another question, on another day.
}上面有多个像myTask这样的任务。
要执行的任务被包装在闭包中,并被调度到一个ThreadPoolExecutor (或类似的东西)中。
我所考虑的办法:
LockA,LockB,.
每个doWithX仅仅是一个synchronized(X)块。
OutXn是myTask的局部变量。
问题:其中一个是Swing,我无法将EDT移动到我管理的线程中。doWithSwing(){...}编码为SwingUtilities.invokeAndWait(() -> {...},从方法(1)解决Swing问题。
问题:通常认为invokeAndWait容易出现死锁。如果我陷入上述模式的麻烦中,我该如何确定呢?threadA,threadB,.,每个线程都“拥有”单个线程(Swing已经拥有这个,它就是EDT)。
doWithX将块调度为threadX上的一个Runnable。
outXn被设置为Future<...> outXn = new SettableFuture<>(),作业变成outXn.set(...)。
问题:我在JDK中找不到任何类似SettableFuture的东西;我可以找到的创建Future的所有方法都以某种方式绑定到ThreadPool上。也许我看到的是错误的顶层界面,而Future是一只红鲱鱼?用这些方法最好吗?
有没有一种我没有考虑过的更好的方法?
发布于 2020-04-20 14:34:02
我不知道问题类和适当的术语
我可能会将问题类称为并发任务编排。
在确定正确的方法时,有很多事情要考虑。如果你能提供更多的细节,我会尝试用更多的颜色更新我的答案。
没有诸如“您必须访问B才能用C执行X”之类的约束。
这通常是一件好事。死锁的一个常见原因是不同的线程以不同的顺序获取相同的锁。例如,线程1锁定A然后B,而线程2拥有锁B并等待获得A。设计解决方案使这种情况不会发生是非常重要的。
我在JDK中找不到类似
SettableFuture的东西
看看java.util.concurrent.CompletableFuture<T> --这可能就是你在这里想要的。它公开一个阻塞的get()以及许多异步完成回调(如thenAccept(Consumer<? super T>) )。
invokeAndWait通常被认为容易出现死锁。
那得看情况。如果您的调用线程没有包含执行您要提交的Runnable所必需的任何锁,那么您可能还好。也就是说,如果您可以将业务流程建立在异步回调的基础上,则可以使用SwingUtilities.invokeLater(Runnable) --这将在Swing事件循环上提交Runnable的执行,而不会阻塞调用线程。
我可能会避免为每个单例创建一个线程。每个正在运行的线程都会增加一些开销,最好是将线程数量与业务逻辑分离开来。例如,这将允许您根据内核的数量将软件调到不同的物理机器上。
听起来,您需要每个runWithX(...)方法都是原子的。换句话说,一旦一个线程开始访问X,另一个线程就不能这样做,直到第一个线程完成其任务步骤。如果是这样的话,那么为每个单例创建一个锁对象并确保串行(而不是并行)访问是正确的方法。您可以通过包装在runWithX(...)方法中以synchronized代码块提交的闭包的执行来实现这一点。块中的代码也称为临界区段或监视器区域。
另一件要考虑的事情是线程争用和执行顺序。如果两个任务都需要访问X,而任务1在任务2之前提交,那么任务1对X的访问是否必须发生在任务2之前?这样的需求可能会使设计变得相当复杂,我可能会推荐一种与上面描述的不同的方法。
有没有一种我没有考虑过的更好的方法?
如今,有一些框架可以解决这些类型的问题。我特别想到的是反应性流和RxJava。虽然它是一个非常强大的框架,但它也有一个非常陡峭的学习曲线。在组织内采用这种技术之前,应该进行大量的分析和考虑。
更新:
根据您的反馈,我认为CompletableFuture-based方法可能最有意义。
我会创建一个帮助类来编排任务步骤执行:
class TaskHelper
{
private final Object lockA;
private final Object lockB;
private final Object lockC;
private final Executor poolExecutor;
private final Executor swingExecutor;
public TaskHelper()
{
poolExecutor = Executors.newFixedThreadPool( 2 );
swingExecutor = SwingUtilities::invokeLater;
lockA = new Object();
lockB = new Object();
lockC = new Object();
}
public <T> CompletableFuture<T> doWithA( Supplier<T> taskStep )
{
return doWith( lockA, poolExecutor, taskStep );
}
public <T> CompletableFuture<T> doWithB( Supplier<T> taskStep )
{
return doWith( lockB, poolExecutor, taskStep );
}
public <T> CompletableFuture<T> doWithC( Supplier<T> taskStep )
{
return doWith( lockC, swingExecutor, taskStep );
}
private <T> CompletableFuture<T> doWith( Object lock, Executor executor, Supplier<T> taskStep )
{
CompletableFuture<T> future = new CompletableFuture<>();
Runnable serialTaskStep = () -> {
T result;
synchronized ( lock ) {
result = taskStep.get();
}
future.complete( result );
};
executor.execute( serialTaskStep );
return future;
}
}在上面的示例中,withA和withB被安排在共享线程池上,而withC总是在Swing线程上执行。Swing Executor在本质上已经是串行的,所以锁实际上是可选的。
对于创建实际任务,我建议为每个任务创建一个对象。这允许您将回调作为方法引用提供,从而获得更清晰的代码并避免回调:

此示例计算后台线程池上提供的数字的平方,然后在Swing线程上显示结果:
class SampleTask
{
private final TaskHelper helper;
private final String id;
private final int startingValue;
public SampleTask( TaskHelper helper, String id, int startingValue )
{
this.helper = helper;
this.id = id;
this.startingValue = startingValue;
}
private void start()
{
helper.doWithB( () -> {
int square = startingValue * startingValue;
return String.format( "computed-thread: %s computed-square: %d",
Thread.currentThread().getName(), square );
} )
.thenAccept( this::step2 );
}
private void step2( String result )
{
helper.doWithC( () -> {
String message = String.format( "current-thread: %s task: %s result: %s",
Thread.currentThread().getName(), id, result );
JOptionPane.showConfirmDialog( null, message );
return null;
} );
}
}
@Test
public void testConcurrent() throws InterruptedException, ExecutionException
{
TaskHelper helper = new TaskHelper();
new SampleTask( helper, "task1", 5 ).start();
new SampleTask( helper, "task2", 7 ).start();
Thread.sleep( 60000 );
}更新2:
如果您想避免回调,同时又不需要在每个任务中创建一个对象,那么也许您应该认真研究一下反应性流。
查看RxJava:https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava的“入门”页面
作为参考,下面是上面的相同示例在Rx中的外观(为了简单起见,我正在删除任务ID的概念):
@Test
public void testConcurrentRx() throws InterruptedException
{
Scheduler swingScheduler = Schedulers.from( SwingUtilities::invokeLater );
Subject<Integer> inputSubject = PublishSubject.create();
inputSubject
.flatMap( input -> Observable.just( input )
.subscribeOn( Schedulers.computation() )
.map( this::computeSquare ))
.observeOn( swingScheduler )
.subscribe( this::displayResult );
inputSubject.onNext( 5 );
inputSubject.onNext( 7 );
Thread.sleep( 60000 );
}
private String computeSquare( int input )
{
int square = input * input;
return String.format( "computed-thread: %s computed-square: %d",
Thread.currentThread().getName(), square );
}
private void displayResult( String result )
{
String message = String.format( "current-thread: %s result: %s",
Thread.currentThread().getName(), result );
JOptionPane.showConfirmDialog( null, message );
}https://stackoverflow.com/questions/61314504
复制相似问题