我有以下kafka流配置。
StreamBuilder builder = stream("TopicA", Serdes.String(), new
SpecificAvroSerde<TestObject>())
.filter((key, value) -> value!=null)
.selectKey((key, value) -> value.getSomeProperty())
.groupByKey(Grouped.with(Serdes.Long(), new
SpecificAvroSerde<TestObject>()))
.reduce((oldValue, newValue) -> newValue),
Materialized.as("someStore"));这正如我所期望的那样工作,但我不知道如何处理TestObject的墓碑消息,即使我删除了
.filter((key, value) -> value!=null)我不知道如何处理'selectKey‘,当值到达为null时,我不能发送带有'value.getSomeProperty()’的逻辑删除消息,而值也将为null。
你会如何处理这个问题?
发布于 2019-03-05 14:52:55
您可以使用transform()而不是selectKey(),并将旧的<key,value>对存储在状态存储中。这样,当处理<key,null>时,您可以从存储中获取先前的值,并获取先前提取的新密钥并发送相应的tombstone。
但是,null ()不能处理任何具有reduce键或reduce值的记录(这些记录将被删除)。因此,您需要使用代理值而不是null来将记录放入Reduce函数。如果接收到代理,则Reduce可以返回null。
https://stackoverflow.com/questions/54980027
复制相似问题