首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何通过多个线程来执行一系列操作?

如何通过多个线程来执行一系列操作?
EN

Stack Overflow用户
提问于 2020-04-20 03:03:19
回答 1查看 322关注 0票数 1

我正在探索一个很可能是问题类的特殊情况的问题,但是我不知道问题类或适当的术语,所以我不得不使用临时词汇来描述这个问题。一旦我知道了正确的术语,我就会改头换面。

我有一群单身汉ABC。单身人士是:

  • 不相关。没有诸如“您必须访问B才能用C执行X”之类的约束。
  • 不是线安全的。

系统尽可能接受并行处理的任务。

每个任务由一系列操作组成,每个操作都要使用其中的一个单例来执行。不同的任务可以按不同的顺序访问不同的单例,任务可能包含操作的循环。

伪码:

代码语言:javascript
复制
void myTask(in1, in2, ...) {
    doWithA(() -> {
       // use in1, in2, ...
       // inspect and/or update A
       // set up outputs to be used as inputs for the next action:
       outA1 = ...
       outA2 = ...
       ...
    });
    doWithB(() -> {
       // use outA1, outA2, ...
       // inspect and/or update B
       // set up outputs to be used as inputs for the next action:
       outB1 = ...
       outB2 = ...
       ...
    });
    // Tasks may touch singletons repeatedly, in any order
    doWithA(() -> {
       // outB1, outB2, ..., inspect/modify A, set up outputs
       outAx1 = ...
       outAx2 = ...
       ...
    });
    // Tasks may have loops:
    while (conditionInC(() -> ...) {
        doWithC(() -> ...);
        doWithD(() -> ...);
    }
    // I am aware that a loop like this can cause a livelock.
    // That's an aspect for another question, on another day.
}

上面有多个像myTask这样的任务。

要执行的任务被包装在闭包中,并被调度到一个ThreadPoolExecutor (或类似的东西)中。

我所考虑的办法:

  1. 单身汉LockALockB,. 每个doWithX仅仅是一个synchronized(X)块。 OutXnmyTask的局部变量。 问题:其中一个是Swing,我无法将EDT移动到我管理的线程中。
  2. 如上段所述。通过将doWithSwing(){...}编码为SwingUtilities.invokeAndWait(() -> {...},从方法(1)解决Swing问题。 问题:通常认为invokeAndWait容易出现死锁。如果我陷入上述模式的麻烦中,我该如何确定呢?
  3. 有线程threadAthreadB,.,每个线程都“拥有”单个线程(Swing已经拥有这个,它就是EDT)。 doWithX将块调度为threadX上的一个RunnableoutXn被设置为Future<...> outXn = new SettableFuture<>(),作业变成outXn.set(...)。 问题:我在JDK中找不到任何类似SettableFuture的东西;我可以找到的创建Future的所有方法都以某种方式绑定到ThreadPool上。也许我看到的是错误的顶层界面,而Future是一只红鲱鱼?

用这些方法最好吗?

有没有一种我没有考虑过的更好的方法?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-04-20 14:34:02

我不知道问题类和适当的术语

我可能会将问题类称为并发任务编排。

在确定正确的方法时,有很多事情要考虑。如果你能提供更多的细节,我会尝试用更多的颜色更新我的答案。

没有诸如“您必须访问B才能用C执行X”之类的约束。

这通常是一件好事。死锁的一个常见原因是不同的线程以不同的顺序获取相同的锁。例如,线程1锁定A然后B,而线程2拥有锁B并等待获得A。设计解决方案使这种情况不会发生是非常重要的。

我在JDK中找不到类似SettableFuture的东西

看看java.util.concurrent.CompletableFuture<T> --这可能就是你在这里想要的。它公开一个阻塞的get()以及许多异步完成回调(如thenAccept(Consumer<? super T>) )。

invokeAndWait通常被认为容易出现死锁。

那得看情况。如果您的调用线程没有包含执行您要提交的Runnable所必需的任何锁,那么您可能还好。也就是说,如果您可以将业务流程建立在异步回调的基础上,则可以使用SwingUtilities.invokeLater(Runnable) --这将在Swing事件循环上提交Runnable的执行,而不会阻塞调用线程。

我可能会避免为每个单例创建一个线程。每个正在运行的线程都会增加一些开销,最好是将线程数量与业务逻辑分离开来。例如,这将允许您根据内核的数量将软件调到不同的物理机器上。

听起来,您需要每个runWithX(...)方法都是原子的。换句话说,一旦一个线程开始访问X,另一个线程就不能这样做,直到第一个线程完成其任务步骤。如果是这样的话,那么为每个单例创建一个锁对象并确保串行(而不是并行)访问是正确的方法。您可以通过包装在runWithX(...)方法中以synchronized代码块提交的闭包的执行来实现这一点。块中的代码也称为临界区段或监视器区域。

另一件要考虑的事情是线程争用和执行顺序。如果两个任务都需要访问X,而任务1在任务2之前提交,那么任务1对X的访问是否必须发生在任务2之前?这样的需求可能会使设计变得相当复杂,我可能会推荐一种与上面描述的不同的方法。

有没有一种我没有考虑过的更好的方法?

如今,有一些框架可以解决这些类型的问题。我特别想到的是反应性流和RxJava。虽然它是一个非常强大的框架,但它也有一个非常陡峭的学习曲线。在组织内采用这种技术之前,应该进行大量的分析和考虑。

更新:

根据您的反馈,我认为CompletableFuture-based方法可能最有意义。

我会创建一个帮助类来编排任务步骤执行:

代码语言:javascript
复制
class TaskHelper
{
    private final Object lockA;
    private final Object lockB;
    private final Object lockC;

    private final Executor poolExecutor;
    private final Executor swingExecutor;

    public TaskHelper()
    {
        poolExecutor = Executors.newFixedThreadPool( 2 );
        swingExecutor = SwingUtilities::invokeLater;

        lockA = new Object();
        lockB = new Object();
        lockC = new Object();
    }

    public <T> CompletableFuture<T> doWithA( Supplier<T> taskStep )
    {
        return doWith( lockA, poolExecutor, taskStep );
    }

    public <T> CompletableFuture<T> doWithB( Supplier<T> taskStep )
    {
        return doWith( lockB, poolExecutor, taskStep );
    }

    public <T> CompletableFuture<T> doWithC( Supplier<T> taskStep )
    {
        return doWith( lockC, swingExecutor, taskStep );
    }

    private <T> CompletableFuture<T> doWith( Object lock, Executor executor, Supplier<T> taskStep )
    {
        CompletableFuture<T> future = new CompletableFuture<>();

        Runnable serialTaskStep = () -> {

            T result;

            synchronized ( lock ) {
                result = taskStep.get();
            }

            future.complete( result );
        };

        executor.execute( serialTaskStep );
        return future;
    }
}

在上面的示例中,withAwithB被安排在共享线程池上,而withC总是在Swing线程上执行。Swing Executor在本质上已经是串行的,所以锁实际上是可选的。

对于创建实际任务,我建议为每个任务创建一个对象。这允许您将回调作为方法引用提供,从而获得更清晰的代码并避免回调:

此示例计算后台线程池上提供的数字的平方,然后在Swing线程上显示结果:

代码语言:javascript
复制
class SampleTask
{
    private final TaskHelper helper;
    private final String id;
    private final int startingValue;

    public SampleTask( TaskHelper helper, String id, int startingValue )
    {
        this.helper = helper;
        this.id = id;
        this.startingValue = startingValue;
    }

    private void start()
    {
        helper.doWithB( () -> {

            int square = startingValue * startingValue;
            return String.format( "computed-thread: %s computed-square: %d",
                    Thread.currentThread().getName(), square );
        } )
        .thenAccept( this::step2 );
    }

    private void step2( String result )
    {
        helper.doWithC( () -> {

            String message = String.format( "current-thread: %s task: %s result: %s",
                    Thread.currentThread().getName(), id, result );

            JOptionPane.showConfirmDialog( null, message );
            return null;
        } );
    }
}

@Test
public void testConcurrent() throws InterruptedException, ExecutionException
{
    TaskHelper helper = new TaskHelper();

    new SampleTask( helper, "task1", 5 ).start();
    new SampleTask( helper, "task2", 7 ).start();

    Thread.sleep( 60000 );
}

更新2:

如果您想避免回调,同时又不需要在每个任务中创建一个对象,那么也许您应该认真研究一下反应性流。

查看RxJava:https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava的“入门”页面

作为参考,下面是上面的相同示例在Rx中的外观(为了简单起见,我正在删除任务ID的概念):

代码语言:javascript
复制
@Test
public void testConcurrentRx() throws InterruptedException
{
    Scheduler swingScheduler = Schedulers.from( SwingUtilities::invokeLater );
    Subject<Integer> inputSubject = PublishSubject.create();

    inputSubject
        .flatMap( input -> Observable.just( input )
                .subscribeOn( Schedulers.computation() )
                .map( this::computeSquare ))
        .observeOn( swingScheduler )
        .subscribe( this::displayResult );

    inputSubject.onNext( 5 );
    inputSubject.onNext( 7 );
    Thread.sleep( 60000 );
}

private String computeSquare( int input )
{
    int square = input * input;
    return String.format( "computed-thread: %s computed-square: %d",
            Thread.currentThread().getName(), square );
}

private void displayResult( String result )
{
    String message = String.format( "current-thread: %s result: %s",
            Thread.currentThread().getName(), result );

    JOptionPane.showConfirmDialog( null, message );
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61314504

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档