我用春云条纹Kstream。我测试一个主题&一个@StreamListner。没事的。
我为两个KStream输入修改我的代码。(两个@StreamListener)但是,弹簧云错误..。
***************************
APPLICATION FAILED TO START
***************************
Description:
The bean 'stream-builder-process', defined in null, could not be registered. A bean with that name has already been defined in null and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
Process finished with exit code 1第一听众
package com.kstream.spring.cloud.test1;
import static com.kstream.spring.cloud.test1.MyBinding.TOPIC1_IN;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
@Component
public class Topic1Source {
@StreamListener
public void process(@Input(TOPIC1_IN) KStream<String, GenericRecord> logs) {
logs
.foreach((key, value) -> {
System.out.println("Test Topic1 : " + value);
});
}
}只有第一个侦听器是可以的。
第二听者
package com.kstream.spring.cloud.test1;
import static com.kstream.spring.cloud.test1.MyBinding.TOPIC2_IN;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
@Component
public class Topic2Source {
@StreamListener
public void process(@Input(TOPIC2_IN) KStream<String, GenericRecord> logs) {
logs
.foreach((key, value) -> {
System.out.println("Test Topic2 : " + value);
});
}
}但这是错误
application.properties
spring.application.name=kafka-streams-test
spring.kafka.bootstrap-servers=my brokers
# defaults
spring.cloud.stream.kafka.streams.binder.brokers=my brokers
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=my server
# topic1
spring.cloud.stream.bindings.topic1In.destination=topic1
spring.cloud.stream.bindings.topic1In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic1In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
# topic2
spring.cloud.stream.bindings.topic2In.destination=topic2
spring.cloud.stream.bindings.topic2In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic2In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde发布于 2019-03-25 07:10:18
我找到了错误的原因。因为我定义了两个相同的方法名'process‘。
发布于 2019-03-13 15:58:10
您需要为两个输入提供单独的应用程序id。参见this问题和答案。
发布于 2019-03-14 07:10:34
我修改pom.xml版本。起作用了。但是这个版本不使用应用程序id的属性。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<!--<version>2.1.3.RELEASE</version>-->
<version>2.0.1.BUILD-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<!--<spring-cloud.version>Greenwich.SR1</spring-cloud.version>-->
<spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
</properties>春-启动-启动-父版本
spring-cloud.version
我不明白为什么最新版本不起作用。
https://stackoverflow.com/questions/55136863
复制相似问题