从两个不同的异步输入事件实时创建统一事件(输出)的最佳方法是什么?
举个例子
"input_event_A" : { "correlation_id" : "abc_123", "payload_event_a": { some event A data } }
"input_event_B" : { "correlation_id" : "abc_123", "payload_event_b": { some event B data } }
因此,当事件B到达时,我需要基于相同的correlation_id合并来自A和B的有效载荷数据
我要找的输出是事件C的输出
"output_event_C" : { "correlation_id" : "abc_123", "payload_event_C": { some event A+B data } }
所面临的挑战是事件A和B是异步的,在事件侦听器中,可以在任何给定的时间捕获A或B,并且由于合并过程只有在事件B被处理时才会触发,因此可能在事件A尚未到达期间,如何解决这个问题?
我目前使用的方法是
我在两个单独的事件存储中异步保存这两个事件。当事件B出现时,我在事件存储中搜索correlation_id以获得A,如果在那里,我将提取数据并合并事件,另一方面,如果数据不在事件存储A中,我将无限地重试,直到事件A中的数据可用为止。这个重试过程可以创建竞争条件和使用大量资源,因此我正在寻找一个更好的方法来处理这个整合。
注意:这2个输入事件出现在JMS上,我有一个基于Java的监听器来处理这些事件。
发布于 2022-03-31 11:11:05
CEP引擎将是最好的。
但是在这种情况下,像番石榴缓存这样的东西就足够了,因为唯一需要的就是全局映射。
class Example {
class MessageListener1 {
void onMessage(Message msg) {
Event e = toEvent(msg);
fireOn(e);
}
}
class MessageListener2 {
void onMessage(Message msg) {
Event e = toEvent(msg);
fireOn(e);
}
}
Cache<String, Event> eventStorage = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
.build();
void fireOn(Event e1) {
Event e2 = eventStorage.get(e1.correlId(), () -> e1);
if (e1.equals(e2)) {
// nothing to do yet
return;
}
// we have two corelated events
Event e3 = doSomething(e1, e2);
}
}番石榴缓存很好,因为到期将负责释放内存,而带有加载器的get 方法负责继承竞争条件。
https://stackoverflow.com/questions/71604734
复制相似问题