我正在设计一个java模块来处理其他外部系统发送的事件。系统接收到的所有事件都将经过映射,然后在ThreadPoolExecutor中排队以进行进一步处理。
相当简单。
现在这里的问题是,如果我收到一个具有相同键的事件,比如"SystemID + EventEntityID",我必须按顺序处理它们。由于事件频率是中到高,我不能按顺序处理所有的事件。这是毫无疑问的。
因此,当我收到一个事件时,在我把它推入队列之前,我需要确保任何具有相同键的事件既不在处理中也不在队列中。另外,如果我发现它在处理中或在队列中,我必须将它放在某个地方,稍后当现有的一个完成处理时,再推送到队列中。
我不确定这个解决方案的好的设计方法是什么。任何帮助都是非常有帮助的。
发布于 2020-05-14 04:50:22
据我所知,事件的流程是:映射到队列-> executed的接收->。现在可以有许多解决方案,具有不同的效果。因为我实际上并不了解您的系统,所以我所介绍的可能并不是最合适的,只是实现的基本方向。
要实现您所需的功能,我们实际需要修改的只是队列。所以我们将创建一个队列的实现来支持这一点。
通常,当我遇到一个键可以匹配多个值的问题时,我会使用一个Map<K, Collection<V>>,其中K是键类型,V是值类型。当然,这不是一个队列,但它将被用作队列实现的一部分。
当添加新事件时,我们可以将其放入与键匹配的map中的集合中。从那里它可以排队。
优先级队列是必需的,因此我们将使用一种PriorityQueue (实现可能会改变)。
我们需要注意几件事:
将新事件接收到地图中
这是在我们的push方法中完成的,它将接收一个事件,并且非常简单:
Collection<V> collection = map.get(key);
if (collection == null) {
collection = (create collection)
map.put(key, collection);
}
collection.add(value);从映射转移到队列
这是最有问题的,因为我们需要找到一种方法来知道每种类型的事件是否在队列中,以及最后一个事件是什么时候被处理的。我的第一个想法是使用代理事件值。该值将包含该数据,如下所示:
interface StoredEventType {
// when a new event of this type was received
void registerNewEvent(Event e);
// move from map to queue
void pollNextEventInto(Queue<StoredEvent> q);
}
interface StoredEvent {
// when the execution was done for this event, this will be called.
boolean isDone();
// the execution should call this to update that it is done.
void onDone();
}将发生的情况是,只有在调用isDone时,才会在调用pollNextEventInto时将新事件放入队列中。要使其正常工作,执行任务必须在最后调用onDone。
什么时候调用pollNextEventInto?当我们的队列实现被轮询时,它可能会被调用。这是唯一能保证在executor运行时会继续发生的事情。
从队列到用户轮询
这也很简单。首先对所有存储的密钥执行pollNextEventInto,以确保更新优先级队列。然后简单地从优先级队列中轮询。
实施摘要
这不是一个完美的想法。必须确保它的并发性,而抽象并不是最好的。但这些都给出了一个方向。还要注意大部分工作是如何在poll中完成的。这是因为poll在内部用于执行,而push更多地是外部的,以某种方式与其他系统交互。
如果你想对此进行更多的头脑风暴,我很乐意。
编辑
我有机会坐下来检查这个实现。它工作得很好。不是最干净的代码...尽管需要一些努力才能做到这一点。
事件队列实现应该支持阻塞线程的poll,线程从中轮询并执行。
在运行时,我依靠Future来检查事件处理是否完成。它对异步工作非常有用。
发布于 2020-05-14 05:37:07
我认为您需要一组actor,以映射的形式组织,每个actor通过一个键进行访问。具有不同密钥的参与者并行工作。每个具有给定关键字的参与者都会按顺序处理事件。
Map<Key,Actor> actors = new ActorMap();
void receiveEvent(Event e) {
Actor actor = actors.get(e.key);
actor.onNext(e);
}
class ActorMap extends ConcurrentHashMap<Key,Actor> {
public Actor get(K key) {
return computeIfAbsent(key, (key)->new Actor(key));
}
}作为一个角色库,您可以使用Akka、my DF4J,甚至https://github.com/akaigoro/CodeSamples/blob/master/src/main/java/actor/simpleactor/SimpleActor.java
https://stackoverflow.com/questions/61782436
复制相似问题