首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡Ktable查询

卡夫卡Ktable查询
EN

Stack Overflow用户
提问于 2018-04-18 10:50:34
回答 1查看 4.6K关注 0票数 3

试图通过KTable从“连接-信任”主题中获取记录

代码语言:javascript
复制
    public static void main(String...  args) throws InterruptedException {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test_connect-configs_12");

    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Bytes().getClass());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    StreamsBuilder builder = new StreamsBuilder();
    KTable<String, Object> ktable = builder.table("connect-configs");
    KafkaStreams streams = new KafkaStreams(builder.build(),config);
    streams.cleanUp();
    streams.start();
    System.out.println(ktable.queryableStoreName());
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    ReadOnlyKeyValueStore<String, Object> view;
    while (true) {
        try {
             System.out.println(ktable.queryableStoreName());
             view = streams.store(ktable.queryableStoreName(), QueryableStoreTypes.keyValueStore());

        } catch (InvalidStateStoreException ignored) {
          // store not yet ready for querying
          Thread.sleep(100);
        }
      }


};

ktable.queryableStoreName()始终为空。为什么哪里没有查询的商店?我看到了像"test_connect-configs_12-connect-configsSTATE-STORE-0000000000-changelog".这样的主题如何读取记录以及如何获取KTable状态的更改事件?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-04-18 14:57:51

创建KTable时,需要为基础存储指定名称。

代码语言:javascript
复制
builder.table("connect-configs", Materialized.as<...>("my-store-name"));
票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49897854

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档