首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >并行和顺序事件处理

并行和顺序事件处理
EN

Stack Overflow用户
提问于 2020-05-14 02:42:48
回答 2查看 130关注 0票数 0

我正在设计一个java模块来处理其他外部系统发送的事件。系统接收到的所有事件都将经过映射,然后在ThreadPoolExecutor中排队以进行进一步处理。

相当简单。

现在这里的问题是,如果我收到一个具有相同键的事件,比如"SystemID + EventEntityID",我必须按顺序处理它们。由于事件频率是中到高,我不能按顺序处理所有的事件。这是毫无疑问的。

因此,当我收到一个事件时,在我把它推入队列之前,我需要确保任何具有相同键的事件既不在处理中也不在队列中。另外,如果我发现它在处理中或在队列中,我必须将它放在某个地方,稍后当现有的一个完成处理时,再推送到队列中。

我不确定这个解决方案的好的设计方法是什么。任何帮助都是非常有帮助的。

EN

回答 2

Stack Overflow用户

发布于 2020-05-14 04:50:22

据我所知,事件的流程是:映射到队列-> executed的接收->。现在可以有许多解决方案,具有不同的效果。因为我实际上并不了解您的系统,所以我所介绍的可能并不是最合适的,只是实现的基本方向。

要实现您所需的功能,我们实际需要修改的只是队列。所以我们将创建一个队列的实现来支持这一点。

通常,当我遇到一个键可以匹配多个值的问题时,我会使用一个Map<K, Collection<V>>,其中K是键类型,V是值类型。当然,这不是一个队列,但它将被用作队列实现的一部分。

当添加新事件时,我们可以将其放入与键匹配的map中的集合中。从那里它可以排队。

优先级队列是必需的,因此我们将使用一种PriorityQueue (实现可能会改变)。

我们需要注意几件事:

  • 将新事件接收到映射中。这是相当简单的,可以通过放入地图的集合值来完成。
  • 从地图转移到队列。我们需要将集合中的第一个值从每个键中提取到优先级队列中。但是,这只能在队列与用户之间的最后一个值为processed.
  • Poll时才能完成。只需从优先级队列中检索第一个值。

将新事件接收到地图中

这是在我们的push方法中完成的,它将接收一个事件,并且非常简单:

代码语言:javascript
复制
Collection<V> collection = map.get(key);
if (collection == null) {
    collection = (create collection)
    map.put(key, collection);
}
collection.add(value);

从映射转移到队列

这是最有问题的,因为我们需要找到一种方法来知道每种类型的事件是否在队列中,以及最后一个事件是什么时候被处理的。我的第一个想法是使用代理事件值。该值将包含该数据,如下所示:

代码语言:javascript
复制
interface StoredEventType {
    // when a new event of this type was received
    void registerNewEvent(Event e);
    // move from map to queue
    void pollNextEventInto(Queue<StoredEvent> q);
}
interface StoredEvent {
    // when the execution was done for this event, this will be called.
    boolean isDone();
    // the execution should call this to update that it is done.
    void onDone();
}

将发生的情况是,只有在调用isDone时,才会在调用pollNextEventInto时将新事件放入队列中。要使其正常工作,执行任务必须在最后调用onDone

什么时候调用pollNextEventInto?当我们的队列实现被轮询时,它可能会被调用。这是唯一能保证在executor运行时会继续发生的事情。

从队列到用户轮询

这也很简单。首先对所有存储的密钥执行pollNextEventInto,以确保更新优先级队列。然后简单地从优先级队列中轮询。

实施摘要

这不是一个完美的想法。必须确保它的并发性,而抽象并不是最好的。但这些都给出了一个方向。还要注意大部分工作是如何在poll中完成的。这是因为poll在内部用于执行,而push更多地是外部的,以某种方式与其他系统交互。

如果你想对此进行更多的头脑风暴,我很乐意。

编辑

我有机会坐下来检查这个实现。它工作得很好。不是最干净的代码...尽管需要一些努力才能做到这一点。

事件队列实现应该支持阻塞线程的poll,线程从中轮询并执行。

在运行时,我依靠Future来检查事件处理是否完成。它对异步工作非常有用。

票数 0
EN

Stack Overflow用户

发布于 2020-05-14 05:37:07

我认为您需要一组actor,以映射的形式组织,每个actor通过一个键进行访问。具有不同密钥的参与者并行工作。每个具有给定关键字的参与者都会按顺序处理事件。

代码语言:javascript
复制
Map<Key,Actor> actors = new ActorMap();

void receiveEvent(Event e) {
   Actor actor = actors.get(e.key);
   actor.onNext(e);
}


class ActorMap extends ConcurrentHashMap<Key,Actor> {
   public Actor get(K key) {
       return computeIfAbsent(key, (key)->new Actor(key));
   }
}

作为一个角色库,您可以使用Akka、my DF4J,甚至https://github.com/akaigoro/CodeSamples/blob/master/src/main/java/actor/simpleactor/SimpleActor.java

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61782436

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档