首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >java.lang.NullPointerException:使用Avro格式的Spring Cloud Stream 3 Kafka函数生成器为空

java.lang.NullPointerException:使用Avro格式的Spring Cloud Stream 3 Kafka函数生成器为空
EN

Stack Overflow用户
提问于 2019-12-11 23:46:47
回答 1查看 1K关注 0票数 1

我正在升级一个现有的Spring Cloud Stream应用程序,以使用新的Spring Cloud函数生成器。生成的消息采用Avro格式。

这是我的整个设置:

代码语言:javascript
复制
spring:
  cloud:
    stream:
      schema-registry-client:
        endpoint: ${schema-registry.url:http://localhost:8081}
      bindings:
        info-out-0:
          destination: info
          producer:
            useNativeEncoding: true
          contentType: application/*+avro
      kafka:
        binder:
          brokers: ${kafka.brokers:localhost}
          transaction:
            transaction-id-prefix: info-tx-
            producer:
              configuration:
                retries: 2
                acks: all
                key:
                  serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
                value:
                  serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
                schema:
                  registry:
                    url: ${spring.cloud.stream.schema-registry-client.endpoint}
                useNativeEncoding: true
    function:
      definition: info
  kafka:
    bootstrap-servers: ${kafka.brokers:localhost:9092}
代码语言:javascript
复制
@Configuration
class SchemaRegistryConfiguration {

    @Bean
    fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
        val client = ConfluentSchemaRegistryClient()
        client.setEndpoint(endpoint)
        return client
    }
}
代码语言:javascript
复制
@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun infoMonoProcessor(): MonoProcessor<Message<*>> {
        return MonoProcessor.create<Message<*>>()
    }

    @Bean
    fun info(): Supplier<Mono<Message<*>>> {
        return Supplier { infoMonoProcessor() }
    }
代码语言:javascript
复制
@Component
class InfoProducer(@Qualifier("infoMonoProcessor") private val infoProcessor: MonoProcessor<Message<*>>) {

    @Transactional
    fun send(info: Info): Mono<Unit> {
        return Mono.fromCallable {
                val infoReceived = InfoReceived(info)
                val message = MessageBuilder.withPayload(infoReceived)
                        .setHeader(KafkaHeaders.TIMESTAMP, dateTime)
                        .build()
                infoProcessor.onNext(message)
            }
        }.subscribeOn(Schedulers.elastic())
    }

}

有一个REST端点,它接收一些信息并使用InfoProducer将其发送到输出主题

代码语言:javascript
复制
@RestController
@RequestMapping("/api/v1/info")
class InfoRestController(private val infoProducer: InfoProducer) {

    @PostMapping
    @ResponseStatus(CREATED)
    fun registerInfo(@RequestBody info: Info): Mono<Unit> {
        return infoProducer.send(info)
    }

问题是我得到了这个丑陋的异常:

代码语言:javascript
复制
java.lang.NullPointerException: null
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:601) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputPublisherIfNecessary$4(BeanFactoryAwareFunctionRegistry.java:640) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at xxx.InfoProducer$send$1.call(InfoProducer.kt:48) ~[classes/:na]
    at xxx.InfoProducer$send$1.call(InfoProducer.kt:25) ~[classes/:na]
    at reactor.core.publisher.MonoCallable.call(MonoCallable.java:91) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:225) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2019-12-11 16:41:43.985 DEBUG 22622 --- [      elastic-2] o.s.http.codec.json.Jackson2JsonEncoder  : [23177c31] Encoding [kotlin.Unit]

BeanFactoryAwareFunctionRegistry中,acceptedOutputMimeTypes是一个空数组。

这里有什么问题?

EN

回答 1

Stack Overflow用户

发布于 2019-12-12 04:37:38

由于您使用的是原生编码和Confluent提供的avro序列化程序,因此我认为这里没有必要使用Spring Cloud模式注册表组件。您可以从设置中删除SchemaRegistryConfiguration。你也不需要这样:

代码语言:javascript
复制
spring:
  cloud:
    stream:
      schema-registry-client:
        endpoint: ${schema-registry.url:http://localhost:8081}

我建议在这里也不要使用${spring.cloud.stream.schema-registry-client.endpoint},而是使用更明确的东西。

代码语言:javascript
复制
schema:
                  registry:
                    url: http://localhost:8081

参见this example

如果你仍然有问题,你能分享一个可重现的样本吗?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59289402

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档