下面是我构建的一个示例KTable,它是一个简单的聚合:
String name = stream
.groupByKey()
.aggregate(
() -> new Aggregate(config),
(key, value, aggregate) -> aggregate.addAndReturn(value),
Materialized
.<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(CustomSerdes.ObjectSerde()))
.filter(((key, value) -> value.isStateChanged()))
.filter((key, value) -> !value.getRecentlyViewed().isEmpty())
.queryableStoreName();我需要做的是将最终的KTable(在应用过滤之后)存储在状态存储中,而不是初始的KTable中。目前,KTable.queryableStoreName()返回null。
我目前的解决方案是应用filter(),然后使用KTable.toStream()转换为流,最后再次存储为KTable,我认为这效率很低。有没有其他的解决方案
发布于 2019-06-03 11:59:03
您可以通过提供可查询的存储名称来强制实现KTable的实体化:
.aggregate()
.filter(..., Materialized.as("your-custom-store-name"));根据您使用的版本,您可能需要指定一些泛型以使其进行编译:
Materialized<KEY_TYPE, VALUE_TYPE, KeyValueStore<Bytes, byte[]>>.as("your-custom-store-name"))https://stackoverflow.com/questions/56391130
复制相似问题