是否可以在Spring Cloud Stream中使用带有@EnableBinding注释的类或带有@StreamListener的方法中的交互式查询(InteractiveQueryService)?我尝试在提供的KStreamMusicSampleApplication类和进程方法中实例化ReadOnlyKeyValueStore,但总是为空。
我的@StreamListener方法正在监听一堆KTables和KStreams,在处理拓扑的过程中,例如过滤,我必须检查来自KStream的键是否已经存在于特定的KTable中。
我试图弄清楚如何扫描传入的KTable以检查密钥是否已经存在,但没有成功。然后我遇到了InteractiveQueryService,它的get()方法可以用来检查状态存储materializedAs中是否存在来自KTable的键。问题是我无法通过流程拓扑(@EnableBinding或@StreamListener)访问它。它只能从这些注释之外访问,例如RestController。
有没有办法扫描传入的KTable以检查键或值是否存在?如果不是,那么我们可以在流程拓扑中访问InteractiveQueryService吗?
发布于 2019-05-01 22:16:04
Spring Cloud Stream中的InteractiveQueryService不能在StreamListener的实际拓扑中使用。正如您所提到的,它应该在您的主拓扑之外使用。但是,使用您描述的用例,您仍然可以使用主流中的状态存储。例如,如果您有一个传入的KStream和一个被物化为状态存储的KTable,那么您可以在KStream上调用process并以这种方式访问状态存储。以下是实现这一点的粗略代码。您需要将其转换为适合您的特定用例,但这里有一个想法。
ReadOnlyKeyValueStore<Object, String> store;
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");
}
@Override
public void process(Object key, Object value) {
//find the key
store.get(key);
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "my-store");https://stackoverflow.com/questions/55931237
复制相似问题