首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Stream中是否可能有多个@StreamListener?

在Stream中是否可能有多个@StreamListener?
EN

Stack Overflow用户
提问于 2019-03-13 07:56:19
回答 4查看 3.8K关注 0票数 4

我用春云条纹Kstream。我测试一个主题&一个@StreamListner。没事的。

我为两个KStream输入修改我的代码。(两个@StreamListener)但是,弹簧云错误..。

代码语言:javascript
复制
***************************
APPLICATION FAILED TO START
***************************

Description:

The bean 'stream-builder-process', defined in null, could not be registered. A bean with that name has already been defined in null and overriding is disabled.

Action:

Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true


Process finished with exit code 1

第一听众

代码语言:javascript
复制
    package com.kstream.spring.cloud.test1;

    import static com.kstream.spring.cloud.test1.MyBinding.TOPIC1_IN;

    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.streams.kstream.KStream;
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;

    @Component
    public class Topic1Source {

      @StreamListener
      public void process(@Input(TOPIC1_IN) KStream<String, GenericRecord> logs) {

        logs
            .foreach((key, value) -> {
              System.out.println("Test Topic1 : " + value);
            });
      }
    }

只有第一个侦听器是可以的。

第二听者

代码语言:javascript
复制
    package com.kstream.spring.cloud.test1;

    import static com.kstream.spring.cloud.test1.MyBinding.TOPIC2_IN;

    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.streams.kstream.KStream;
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;

    @Component
    public class Topic2Source {

      @StreamListener
      public void process(@Input(TOPIC2_IN) KStream<String, GenericRecord> logs) {

        logs
            .foreach((key, value) -> {
              System.out.println("Test Topic2 : " + value);
            });
      }
    }

但这是错误

application.properties

代码语言:javascript
复制
spring.application.name=kafka-streams-test
spring.kafka.bootstrap-servers=my brokers

# defaults
spring.cloud.stream.kafka.streams.binder.brokers=my brokers
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=my server


# topic1
spring.cloud.stream.bindings.topic1In.destination=topic1
spring.cloud.stream.bindings.topic1In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic1In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde




# topic2
spring.cloud.stream.bindings.topic2In.destination=topic2
spring.cloud.stream.bindings.topic2In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic2In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2019-03-25 07:10:18

我找到了错误的原因。因为我定义了两个相同的方法名'process‘。

票数 4
EN

Stack Overflow用户

发布于 2019-03-13 15:58:10

您需要为两个输入提供单独的应用程序id。参见this问题和答案。

票数 0
EN

Stack Overflow用户

发布于 2019-03-14 07:10:34

我修改pom.xml版本。起作用了。但是这个版本不使用应用程序id的属性。

代码语言:javascript
复制
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <!--<version>2.1.3.RELEASE</version>-->
        <version>2.0.1.BUILD-SNAPSHOT</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <properties>
        <java.version>1.8</java.version>
        <!--<spring-cloud.version>Greenwich.SR1</spring-cloud.version>-->
        <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
    </properties>

春-启动-启动-父版本

  • 2.1.3.防止=>错误
  • 2.0.1. Working快照=>工作

spring-cloud.version

  • Greenwich.SR1 =>误差
  • Working快照=>工作

我不明白为什么最新版本不起作用。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55136863

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档