我有一个使用Spring、使用spring-cloud-function和spring-cloud-stream-binder-kafka-streams编写的Kafka流处理应用程序。处理几个流的方法被注释为@Bean,因此它应该由spring-cloud-function (而不是使用@StreamListener)来获取。当此方法返回一个BiFunction时,它可以工作。但是当我把它当作普通的Kotlin lambda时,它不会被Spring捕获:应用程序会立即启动,因为它发现没有任何函数可以运行。
从我在文献资料中可以看到的情况来看,这应该是可行的。
下面是有效的声明:
@Bean
fun process():
BiFunction<KStream<String, Foo>, GlobalKTable<String, Bar>, KStream<String, Baz>> =
BiFunction { foo, bar ->
...下面是不起作用的声明:
@Bean
fun process():
(foo: KStream<String, Foo>, bar: GlobalKTable<String, Bar>) -> KStream<String, Baz> =
{ foo, bar ->
...(在这两种情况下,方法的内容是相同的。)
根据文档,我将spring-cloud-function-kotlin模块添加到类路径中,并将其添加到build.gradle.kts中。
implementation("org.springframework.cloud:spring-cloud-function-kotlin")Stream的版本是Hoxton.RC1。
我还需要做些什么来完成这个功能吗?或者在这种情况下我需要使用BiFunction吗?
发布于 2019-11-06 15:32:07
目前(3.0中),Kafka流绑定功能支持需要java.util.function类型。它还不适用于Kotlin中的标准函数(或者我们还没有验证这方面的任何内容)。我们计划将其作为3.1功能进行研究。如果你不介意的话,你能创造一个新的问题这里
https://stackoverflow.com/questions/58732683
复制相似问题