要求:
Type1, Type2 ... Type100。TypeX的所有消息。它应该开始处理另一种类型。我经历了不同的答案:其中大多数建议执行服务来处理多线程。假设我们创建执行程序服务,就像
ExecutorService executorService = Executors.newFixedThreadPool(10);但是一旦我们使用executorService.submit(runnableMessage);提交消息
我们无法控制将特定类型的消息分配给特定线程。
解决方案:
创建一个单线程执行程序数组
ExecutorService[] pools = new ExecutorService[10];最初传递Type1,Type2的消息.Type10,如果任何执行器已经完成执行,那么将Type11分配给它,并一直执行到所有类型都被处理为止。
有什么更好的方法吗?
类似于具有多个队列的执行器服务,可以将每种类型的消息推送到不同的队列中?
发布于 2017-04-14 16:16:09
一个更简单的解决办法可以是:
而不是让每条消息都可以运行。我们可以根据它们的类型创建组消息:
例如,我们为Group1为type1的所有消息创建
class MessageGroup implements Runnable {
String type;
String List<Message> messageList;
@Override
public void run() {
for(Message message : MessageList) {
message.process();
}
}
} 我们还可以使用固定线程创建常见的executor服务,例如
ExecutorService executorService = Executors.newFixedThreadPool(10); 而不是提交单独的消息,我们可以提交一组消息,如
executorService.submit(runnableGroup);每个组将在同一个线程中顺序执行相同类型的消息。
发布于 2017-04-11 20:43:20
我建议看一看阿克卡。它们提供了一个更适合这个用例的Actor框架。除了定义您自己的ExecutorService接口实现之外,JDK提供的默认实现对调度没有那么大的控制。
创建一个硬编码的ExecutionServices数组将不是非常动态或健壮的,特别是因为每个ExecutionService都有一个线程池。可以用散列映射替换数组,然后将其放在ExecutionService的自定义实现后面,这样做的优点是向调用方隐藏这些细节,但这不会解决拥有这么多线程池的线程浪费问题。
在Akka中,每个Actor都有自己的消息队列。每个Actor有效地在自己的线程中运行,每次从其队列中处理每个消息。Akka将管理多个Actor之间的线程共享。因此,如果您要为每个消息类型创建一个Actor,然后将消息与这些Actors一起排队,那么您将得到一个目标,即每种消息类型一次由最多一个线程处理,同时只由一个线程池进行备份。
该技术的演示:
对Akka的依赖。
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.17</version>
</dependency>Java 8代码复制并粘贴到Java文件中,然后在IDE中运行主方法。
package com.softwaremosaic.demos.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class ActorDemo {
public static void main( String[] args ) throws InterruptedException {
// The following partitioner will spread the requests over
// multiple actors, which I chose to demonstrate the technique.
// You will need to change it to one that better maps the the
// jobs to your use case. Remember that jobs that get mapped
// to the same key, will get executed in serial (probably
// but not necessarily) by the same thread.
ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );
for ( int i=0; i<100; i++ ) {
int id = i;
exectorService.submit( () -> System.out.println("JOB " + id) );
}
exectorService.shutdown();
exectorService.awaitTermination( 1, TimeUnit.MINUTES );
System.out.println( "DONE" );
}
}
class ActorExecutionService extends AbstractExecutorService {
private final ActorSystem actorSystem;
private final Function<Runnable, String> partitioner;
private final ConcurrentHashMap<String,ActorRef> actors = new ConcurrentHashMap<>();
public ActorExecutionService( Function<Runnable,String> partitioner ) {
this.actorSystem = ActorSystem.create("demo");
this.partitioner = partitioner;
}
public void execute( Runnable command ) {
String partitionKey = partitioner.apply( command );
ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );
actorRef.tell( command, actorRef );
}
private ActorRef createNewActor( String partitionKey ) {
return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
}
public void shutdown() {
actorSystem.terminate();
}
public List<Runnable> shutdownNow() {
actorSystem.terminate();
try {
awaitTermination( 1, TimeUnit.MINUTES );
} catch ( InterruptedException e ) {
throw new RuntimeException( e );
}
return Collections.emptyList();
}
public boolean isShutdown() {
return actorSystem.isTerminated();
}
public boolean isTerminated() {
return actorSystem.isTerminated();
}
public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
actorSystem.awaitTermination();
return actorSystem.isTerminated();
}
}
class ExecutionServiceActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof Runnable) {
((Runnable) message).run();
} else {
unhandled(message);
}
}
}注:以上代码将以未定义的顺序打印1-100。由于批处理( Akka这样做以获得额外的性能效益),订单看起来大部分是连续的。然而,您将看到数字的一些随机性,因为不同的线程分散工作。每个作业运行的时间越长,分配给Akka线程池的线程越多,使用的分区键越多,底层CPU内核越多,序列就可能变得越随机。
发布于 2017-04-11 21:16:32
这是我最基本的例子,说明它是什么样的。您可以创建一个地图,其中包含由他们的"Typ“寻址的10个ArrayDeques。也可以启动10 ScheduledExecutors。每个队列最初等待5秒,然后每200秒轮询一次队列。在当前的示例中,输出总是“TypeX: null的当前消息队列”,因为队列都是空的。
但是,您现在可以将消息传递到匹配的队列中。这项服务每200毫秒就能得到一次,你想用它做什么就怎么做。而且,当您使用队列时,在处理消息的方式上也会自动有一个顺序。
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Messages {
public static void main(String[] args) {
Map<String, ArrayDeque<String>> messages = new HashMap<String, ArrayDeque<String>>();
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
long initialDelay = 5000;
long period = 200;
// create 10 Queues, indexed by the type
// create 10 executor-services, focused on their message queue
for(int i=1; i<11; i++) {
String type = "Type" + i;
Runnable task = () -> System.out.println(
"current message of " + type + ": " + messages.get(type).poll()
);
messages.put(type, new ArrayDeque<String>());
service.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
}
}
}https://stackoverflow.com/questions/43355252
复制相似问题