首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring合同-没有为主题设置使用者[.]

Spring合同-没有为主题设置使用者[.]
EN

Stack Overflow用户
提问于 2019-12-20 10:43:01
回答 1查看 759关注 0票数 1

我目前正在使用Spring契约开发API兼容性检查。我设置了所有东西,就像文档上说的那样。但我遇到了一个问题-- java.lang.IllegalStateException: No consumer set up for topic [testSyncTopic]。该异常将在KafkaStubMessages类中抛出。所以我假设这是与图书馆有关的问题。在我的项目中,我有两个独立的maven项目。他们中的每一个都是消费者和生产者(不同的主题)。我的合同被放在其他存储库中。

所以..。我目前正在研究的场景是:我们有两个模块--模块A和B。模块A向卡夫卡主题t1和t2发送关于主题T1和T2的消息,并接收来自主题t3和T4的消息T3和t4。模块B从T1和T2接收,并发送给T3和T4。

所有使用者测试都通过每个模块。但是生产者测试的结果是主题中提到的错误。

我怀疑这是由存根创建错误造成的。所以没有合适的听众。

我尝试了不同的卡夫卡配置,但我相信情况并非如此。我还检查了spring云合同配置,但是一切看起来都很好。生成带有存根的合适的罐子。不幸的是,谷歌在这个问题上没有多大帮助。

如果你需要任何信息来帮助我,请随时询问。我现在正忙着做这件事,所以我很绝望,真的需要你的帮助。

编辑:添加堆栈跟踪和相关代码片段

堆栈跟踪:

代码语言:javascript
复制
java.lang.IllegalStateException: No consumer set up for topic [testSyncTopic]

at org.springframework.cloud.contract.verifier.messaging.kafka.Receiver.receive(KafkaStubMessages.java:110)
at org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessages.receive(KafkaStubMessages.java:80)
at org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessages.receive(KafkaStubMessages.java:42)
at com.comarch.fsm.dispatcher.rest.ContractBaseTest.setup(ContractBaseTest.groovy:56)

基类配置:

代码语言:javascript
复制
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers", brokerProperties = ["log.dir=target/embedded-kafka"])
@AutoConfigureStubRunner
abstract class BaseTestConfig extends Specification {
}

我的合同定义:

代码语言:javascript
复制
Pattern customDateTime() {
    Pattern.compile('([0-9]{4})-(1[0-2]|0[1-9])-(0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])Z')
}

Contract.make {
    label("sync")
    input {
        triggeredBy("sync()")
    }
    outputMessage {
        sentTo("testSyncTopic")
        body(
                syncStart: $(customDateTime())
        )
    }
}

ContractBaseTest类:

代码语言:javascript
复制
abstract class ContractBaseTest extends BaseTestConfig {

    @Autowired
    private KafkaService kafkaService;

    def synchronizeData() {
        kafkaService.sendKafkaMessage("testSyncTopic", null, new SyncDto(new Date()));
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-12-21 14:12:08

为什么您的基类有@AutoConfigureStubRunner,它应该有@AutoConfigureMessageVerifier?你似乎把消费者和制片人混为一谈。

请查看卡夫卡制片人的例子,这里是:https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/master/producer_kafka。出于可再生的原因,我会把它复制到这里。

THE PRODUCER

基类:https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/master/producer_kafka/src/test/java/com/example/BaseClass.java

代码语言:javascript
复制
package com.example;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
// remove::start[]
import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
import org.springframework.kafka.test.context.EmbeddedKafka;
// remove::end[]
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
// remove::start[]
@AutoConfigureMessageVerifier
@EmbeddedKafka(partitions = 1, topics = {"topic1"})
// remove::end[]
@ActiveProfiles("test")
public abstract class BaseClass {

    @Autowired
    Controller controller;

    public void trigger() {
        this.controller.sendFoo("example");
    }
}

在这里你可以找到控制器

代码语言:javascript
复制
package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import com.common.Foo1;

/**
 * @author Gary Russell
 * @since 2.2.1
 */
@RestController
public class Controller {

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @PostMapping(path = "/send/foo/{what}")
    public void sendFoo(@PathVariable String what) {
        this.template.send("topic1", new Foo1(what));
    }

}

在这里您可以看到生产配置(application.yml)

代码语言:javascript
复制
spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
logging.level.org.springframework.cloud.contract: debug

在这里您可以看到测试配置(application-test.yml)

代码语言:javascript
复制
spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}
    consumer:
      properties:
        "key.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
        "key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"
      group-id: groupId

和合同(https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/master/producer_kafka/src/test/resources/contracts/shouldSendFoo.groovy)

代码语言:javascript
复制
import org.springframework.cloud.contract.spec.Contract

Contract.make {
    label("trigger")
    input {
        triggeredBy("trigger()")
    }
    outputMessage {
        sentTo("topic1")
        body([
                foo: "example"
        ])
    }
}

消费者

现在是消费者时间(https://github.com/spring-cloud-samples/spring-cloud-contract-samples/tree/master/consumer_kafka)

代码语言:javascript
复制
package com.example;

import org.assertj.core.api.BDDAssertions;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
// remove::start[]
import org.springframework.cloud.contract.stubrunner.StubTrigger;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
import org.springframework.kafka.test.context.EmbeddedKafka;
// remove::end[]
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
// remove::start[]
@AutoConfigureStubRunner(ids = "com.example:beer-api-producer-kafka", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
@EmbeddedKafka(topics = "topic1")
// remove::end[]
@ActiveProfiles("test")
public class ApplicationTests {

    // remove::start[]
    @Autowired
    StubTrigger trigger;
    @Autowired
    Application application;

    @Test
    public void contextLoads() {
        this.trigger.trigger("trigger");

        Awaitility.await().untilAsserted(() -> {
            BDDAssertions.then(this.application.storedFoo).isNotNull();
            BDDAssertions.then(this.application.storedFoo.getFoo()).contains("example");
        });
    }
    // remove::end[]

}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59423909

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档