首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >可查询过滤的KTable

可查询过滤的KTable
EN

Stack Overflow用户
提问于 2019-05-31 16:06:56
回答 1查看 588关注 0票数 1

下面是我构建的一个示例KTable,它是一个简单的聚合:

代码语言:javascript
复制
String name = stream
    .groupByKey()
    .aggregate(
        () -> new Aggregate(config),
        (key, value, aggregate) -> aggregate.addAndReturn(value),
        Materialized
            .<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
            .withCachingEnabled()
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomSerdes.ObjectSerde()))
    .filter(((key, value) -> value.isStateChanged()))
    .filter((key, value) -> !value.getRecentlyViewed().isEmpty())
    .queryableStoreName();

我需要做的是将最终的KTable(在应用过滤之后)存储在状态存储中,而不是初始的KTable中。目前,KTable.queryableStoreName()返回null

我目前的解决方案是应用filter(),然后使用KTable.toStream()转换为流,最后再次存储为KTable,我认为这效率很低。有没有其他的解决方案

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-03 11:59:03

您可以通过提供可查询的存储名称来强制实现KTable的实体化:

代码语言:javascript
复制
.aggregate()
.filter(..., Materialized.as("your-custom-store-name"));

根据您使用的版本,您可能需要指定一些泛型以使其进行编译:

代码语言:javascript
复制
Materialized<KEY_TYPE, VALUE_TYPE, KeyValueStore<Bytes, byte[]>>.as("your-custom-store-name"))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56391130

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档