首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在spring-cloud-stream中使用kafka过程拓扑中的交互式查询?

如何在spring-cloud-stream中使用kafka过程拓扑中的交互式查询?
EN

Stack Overflow用户
提问于 2019-05-01 11:22:42
回答 1查看 1.1K关注 0票数 1

是否可以在Spring Cloud Stream中使用带有@EnableBinding注释的类或带有@StreamListener的方法中的交互式查询(InteractiveQueryService)?我尝试在提供的KStreamMusicSampleApplication类和进程方法中实例化ReadOnlyKeyValueStore,但总是为空。

我的@StreamListener方法正在监听一堆KTables和KStreams,在处理拓扑的过程中,例如过滤,我必须检查来自KStream的键是否已经存在于特定的KTable中。

我试图弄清楚如何扫描传入的KTable以检查密钥是否已经存在,但没有成功。然后我遇到了InteractiveQueryService,它的get()方法可以用来检查状态存储materializedAs中是否存在来自KTable的键。问题是我无法通过流程拓扑(@EnableBinding或@StreamListener)访问它。它只能从这些注释之外访问,例如RestController。

有没有办法扫描传入的KTable以检查键或值是否存在?如果不是,那么我们可以在流程拓扑中访问InteractiveQueryService吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-05-01 22:16:04

Spring Cloud Stream中的InteractiveQueryService不能在StreamListener的实际拓扑中使用。正如您所提到的,它应该在您的主拓扑之外使用。但是,使用您描述的用例,您仍然可以使用主流中的状态存储。例如,如果您有一个传入的KStream和一个被物化为状态存储的KTable,那么您可以在KStream上调用process并以这种方式访问状态存储。以下是实现这一点的粗略代码。您需要将其转换为适合您的特定用例,但这里有一个想法。

代码语言:javascript
复制
ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55931237

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档