我有一个主题,其中包含每个会话的用户连接和断开事件。我想使用Kafka stream来处理这个话题,并根据一些条件更新KTable。每条记录都无法更新KTable。因此,我需要处理多个记录,以了解是否需要更新KTable。
例如,按用户然后按sessionid处理流和聚合。如果该用户至少有一个会话If仅具有已连接的事件,则必须将KTable更新为用户在线(如果尚未更新)。
如果用户的所有sessionId都有Disconnected事件,则必须将KTable更新为用户脱机(如果尚未更新)。
我该如何实现这样的逻辑呢?
我们是否可以在所有应用程序实例中实现此KTable,以便每个实例都可以在本地使用此数据?
发布于 2021-01-03 06:54:15
听起来像是一个相当复杂的场景。
也许,在这种情况下最好使用处理器API?KTable基本上只是一个KV- store,使用处理器API,您可以应用复杂的处理来决定是否要更新状态存储。KTable本身不允许您应用复杂的逻辑,但它将应用它接收到的每个更新。
因此,使用DSL时,您需要执行一些预处理,并且如果您想要更新KTable,则仅在这种情况下发送更新记录。如下所示:
KStream stream = builder.stream("input-topic");
// apply your processing and write an update record into `updates` when necessary
KStream updates = stream...
KTable table = updates.toTable();https://stackoverflow.com/questions/62428255
复制相似问题