我正在努力寻找任何关于我可以在哪里使用Spring Cloud Streams的文档,这些文档将Kafka主题放入KTable中。在使用注解在Spring boot中执行此操作的过程中,我已经查找了文档,例如这里的https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_materializing_ktable_as_a_state_store,但没有什么具体的内容。我希望我可以使用KStream创建一个简单的KTable,在我的application.properties中有以下内容:spring.cloud.stream.bindings.process-in-0.destination: my-topic
然后在我的配置中,我希望我可以这样做
@Bean
public Consumer<KStream<String, String>> process() {
return input -> input.toTable(Materialized.as("my-store"))
}请告诉我我遗漏了什么?
发布于 2021-04-22 06:13:50
如果你想做的就是以KTable的形式消费Kafka topic中的数据,那么你可以这样做。
@Bean
public Consumer<KTable<String, String>> process() {
return input -> {
};
}如果要将表物化到命名存储中,则可以将其添加到配置中。
spring.cloud.stream.kafka.streams.bindings.process_in_0.consumer.materializedAs: my-store你也可以做你在问题中已经做过的事情,例如,将它作为一个KStream接收,然后转换为KTable。但是,如果这是您需要做的全部工作,那么您可能更愿意像我在这里建议的那样,首先以KTable的形式接收它。
https://stackoverflow.com/questions/67203688
复制相似问题