首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用selectKey将tombstone事件委托给KTable

如何使用selectKey将tombstone事件委托给KTable
EN

Stack Overflow用户
提问于 2019-03-04 17:16:57
回答 1查看 399关注 0票数 0

我有以下kafka流配置。

代码语言:javascript
复制
StreamBuilder builder = stream("TopicA", Serdes.String(), new 
        SpecificAvroSerde<TestObject>())
    .filter((key, value) -> value!=null)
    .selectKey((key, value) -> value.getSomeProperty())
    .groupByKey(Grouped.with(Serdes.Long(), new 
        SpecificAvroSerde<TestObject>()))
    .reduce((oldValue, newValue) -> newValue), 
        Materialized.as("someStore"));

这正如我所期望的那样工作,但我不知道如何处理TestObject的墓碑消息,即使我删除了

代码语言:javascript
复制
.filter((key, value) -> value!=null)

我不知道如何处理'selectKey‘,当值到达为null时,我不能发送带有'value.getSomeProperty()’的逻辑删除消息,而值也将为null。

你会如何处理这个问题?

EN

回答 1

Stack Overflow用户

发布于 2019-03-05 14:52:55

您可以使用transform()而不是selectKey(),并将旧的<key,value>对存储在状态存储中。这样,当处理<key,null>时,您可以从存储中获取先前的值,并获取先前提取的新密钥并发送相应的tombstone。

但是,null ()不能处理任何具有reduce键或reduce值的记录(这些记录将被删除)。因此,您需要使用代理值而不是null来将记录放入Reduce函数。如果接收到代理,则Reduce可以返回null

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54980027

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档