首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >维护单独的KTable

维护单独的KTable
EN

Stack Overflow用户
提问于 2020-06-17 19:46:51
回答 1查看 63关注 0票数 0

我有一个主题,其中包含每个会话的用户连接和断开事件。我想使用Kafka stream来处理这个话题,并根据一些条件更新KTable。每条记录都无法更新KTable。因此,我需要处理多个记录,以了解是否需要更新KTable。

例如,按用户然后按sessionid处理流和聚合。如果该用户至少有一个会话If仅具有已连接的事件,则必须将KTable更新为用户在线(如果尚未更新)。

如果用户的所有sessionId都有Disconnected事件,则必须将KTable更新为用户脱机(如果尚未更新)。

我该如何实现这样的逻辑呢?

我们是否可以在所有应用程序实例中实现此KTable,以便每个实例都可以在本地使用此数据?

EN

回答 1

Stack Overflow用户

发布于 2021-01-03 06:54:15

听起来像是一个相当复杂的场景。

也许,在这种情况下最好使用处理器API?KTable基本上只是一个KV- store,使用处理器API,您可以应用复杂的处理来决定是否要更新状态存储。KTable本身不允许您应用复杂的逻辑,但它将应用它接收到的每个更新。

因此,使用DSL时,您需要执行一些预处理,并且如果您想要更新KTable,则仅在这种情况下发送更新记录。如下所示:

代码语言:javascript
复制
KStream stream = builder.stream("input-topic");
// apply your processing and write an update record into `updates` when necessary
KStream updates = stream...
KTable table = updates.toTable();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62428255

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档