首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java:多队列的Executor服务

Java:多队列的Executor服务
EN

Stack Overflow用户
提问于 2017-04-11 20:04:10
回答 4查看 4.5K关注 0票数 8

要求:

  1. 我将消息分组为不同的类型,例如Type1, Type2 ... Type100
  2. 我想并行地执行不同类型的消息。假设在10个线程中,但是同一类型的所有消息都必须一个一个地执行。执行命令并不重要。
  3. 一旦线程完成了TypeX的所有消息。它应该开始处理另一种类型。

我经历了不同的答案:其中大多数建议执行服务来处理多线程。假设我们创建执行程序服务,就像

代码语言:javascript
复制
ExecutorService executorService = Executors.newFixedThreadPool(10);

但是一旦我们使用executorService.submit(runnableMessage);提交消息

我们无法控制将特定类型的消息分配给特定线程。

解决方案:

创建一个单线程执行程序数组

代码语言:javascript
复制
ExecutorService[] pools = new ExecutorService[10];

最初传递Type1,Type2的消息.Type10,如果任何执行器已经完成执行,那么将Type11分配给它,并一直执行到所有类型都被处理为止。

有什么更好的方法吗?

类似于具有多个队列的执行器服务,可以将每种类型的消息推送到不同的队列中?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-04-14 16:16:09

一个更简单的解决办法可以是:

而不是让每条消息都可以运行。我们可以根据它们的类型创建组消息:

例如,我们为Group1type1的所有消息创建

代码语言:javascript
复制
class MessageGroup implements Runnable {
    String type;
    String List<Message> messageList;

    @Override
    public void run() {
      for(Message message : MessageList) {
         message.process();
      }
    }
} 

我们还可以使用固定线程创建常见的executor服务,例如

代码语言:javascript
复制
ExecutorService executorService = Executors.newFixedThreadPool(10); 

而不是提交单独的消息,我们可以提交一组消息,如

代码语言:javascript
复制
executorService.submit(runnableGroup);

每个组将在同一个线程中顺序执行相同类型的消息。

票数 2
EN

Stack Overflow用户

发布于 2017-04-11 20:43:20

我建议看一看阿克卡。它们提供了一个更适合这个用例的Actor框架。除了定义您自己的ExecutorService接口实现之外,JDK提供的默认实现对调度没有那么大的控制。

创建一个硬编码的ExecutionServices数组将不是非常动态或健壮的,特别是因为每个ExecutionService都有一个线程池。可以用散列映射替换数组,然后将其放在ExecutionService的自定义实现后面,这样做的优点是向调用方隐藏这些细节,但这不会解决拥有这么多线程池的线程浪费问题。

在Akka中,每个Actor都有自己的消息队列。每个Actor有效地在自己的线程中运行,每次从其队列中处理每个消息。Akka将管理多个Actor之间的线程共享。因此,如果您要为每个消息类型创建一个Actor,然后将消息与这些Actors一起排队,那么您将得到一个目标,即每种消息类型一次由最多一个线程处理,同时只由一个线程池进行备份。

该技术的演示:

对Akka的依赖。

代码语言:javascript
复制
    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.11</artifactId>
        <version>2.4.17</version>
    </dependency>

Java 8代码复制并粘贴到Java文件中,然后在IDE中运行主方法。

代码语言:javascript
复制
package com.softwaremosaic.demos.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;


public class ActorDemo {

    public static void main( String[] args ) throws InterruptedException {
        // The following partitioner will spread the requests over
        // multiple actors, which I chose to demonstrate the technique.
        // You will need to change it to one that better maps the the
        // jobs to your use case.   Remember that jobs that get mapped
        // to the same key, will get executed in serial (probably
        // but not necessarily) by the same thread.
        ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );

        for ( int i=0; i<100; i++ ) {
            int id = i;
            exectorService.submit( () -> System.out.println("JOB " + id) );
        }

        exectorService.shutdown();
        exectorService.awaitTermination( 1, TimeUnit.MINUTES );

        System.out.println( "DONE" );
    }

}


class ActorExecutionService extends AbstractExecutorService {

    private final ActorSystem                              actorSystem;
    private final Function<Runnable, String>               partitioner;
    private final ConcurrentHashMap<String,ActorRef>       actors = new ConcurrentHashMap<>();

    public ActorExecutionService( Function<Runnable,String> partitioner ) {
        this.actorSystem = ActorSystem.create("demo");
        this.partitioner = partitioner;
    }


    public void execute( Runnable command ) {
        String partitionKey = partitioner.apply( command );

        ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );

        actorRef.tell( command, actorRef );
    }

    private ActorRef createNewActor( String partitionKey ) {
        return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
    }


    public void shutdown() {
        actorSystem.terminate();
    }

    public List<Runnable> shutdownNow() {
        actorSystem.terminate();

        try {
            awaitTermination( 1, TimeUnit.MINUTES );
        } catch ( InterruptedException e ) {
            throw new RuntimeException( e );
        }

        return Collections.emptyList();
    }

    public boolean isShutdown() {
        return actorSystem.isTerminated();
    }

    public boolean isTerminated() {
        return actorSystem.isTerminated();
    }

    public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
        actorSystem.awaitTermination();

        return actorSystem.isTerminated();
    }
}

 class ExecutionServiceActor extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof Runnable) {
            ((Runnable) message).run();
        } else {
            unhandled(message);
        }
    }
}

注:以上代码将以未定义的顺序打印1-100。由于批处理( Akka这样做以获得额外的性能效益),订单看起来大部分是连续的。然而,您将看到数字的一些随机性,因为不同的线程分散工作。每个作业运行的时间越长,分配给Akka线程池的线程越多,使用的分区键越多,底层CPU内核越多,序列就可能变得越随机。

票数 7
EN

Stack Overflow用户

发布于 2017-04-11 21:16:32

这是我最基本的例子,说明它是什么样的。您可以创建一个地图,其中包含由他们的"Typ“寻址的10个ArrayDeques。也可以启动10 ScheduledExecutors。每个队列最初等待5秒,然后每200秒轮询一次队列。在当前的示例中,输出总是“TypeX: null的当前消息队列”,因为队列都是空的。

但是,您现在可以将消息传递到匹配的队列中。这项服务每200毫秒就能得到一次,你想用它做什么就怎么做。而且,当您使用队列时,在处理消息的方式上也会自动有一个顺序。

代码语言:javascript
复制
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Messages {

    public static void main(String[] args) {

        Map<String, ArrayDeque<String>> messages = new HashMap<String, ArrayDeque<String>>();
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        long initialDelay = 5000;
        long period = 200;

        // create 10 Queues, indexed by the type
        // create 10 executor-services, focused on their message queue
        for(int i=1; i<11; i++) {
            String type = "Type" + i;

            Runnable task = () -> System.out.println(
                     "current message of " + type + ": " + messages.get(type).poll()
            );

            messages.put(type, new ArrayDeque<String>());
            service.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
        }

    }
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43355252

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档