首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用EmbeddedKafkaRule/ EmbeddedKafka设置Spring测试来修复TopicExistsException间歇错误?

如何使用EmbeddedKafkaRule/ EmbeddedKafka设置Spring测试来修复TopicExistsException间歇错误?
EN

Stack Overflow用户
提问于 2020-01-26 05:12:15
回答 1查看 8.6K关注 0票数 1

我一直有问题,测试我的卡夫卡消费者和制片人。集成测试在TopicExistsException中间歇性地失败。

这就是我当前的测试类- UserEventListenerTest对于其中一个消费者来说的样子:

代码语言:javascript
复制
@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
    "application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
    private val logger: Logger = LoggerFactory.getLogger(javaClass)

    @Value("\${application.kafka.user-event-topic}")
    private lateinit var userEventTopic: String

    @Autowired
    private lateinit var kafkaConfigProperties: KafkaConfigProperties

    private lateinit var embeddedKafka: EmbeddedKafkaRule
    private lateinit var sender: KafkaSender<String, UserEvent>
    private lateinit var receiver: KafkaReceiver<String, UserEvent>

    @BeforeAll
    fun setup() {
        embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
        embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
        embeddedKafka.before()

        val producerProps: HashMap<String, Any> = hashMapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
        )
        val senderOptions = SenderOptions.create<String, UserEvent>(producerProps)
        sender = KafkaSender.create(senderOptions)

        val consumerProps: HashMap<String, Any> = hashMapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to kafkaConfigProperties.deserializer,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
            "schema.registry.url" to kafkaConfigProperties.schemaRegistry,
            ConsumerConfig.GROUP_ID_CONFIG to "test-consumer"
        )
        val receiverOptions = ReceiverOptions.create<String, UserEvent>(consumerProps)
            .subscription(Collections.singleton("some-topic-after-UserEvent"))
        receiver = KafkaReceiver.create(receiverOptions)
    }
}

// Some tests
// Not shown as they are irrelevant
...
...
...

UserEventListener类使用来自user-event-topic-UserEventListenerTest的消息并将消息发布到some-topic-after-UserEvent

从设置中可以看到,我有一个测试生成器,它将向user-event-topic-UserEventListenerTest发布一条消息,以便测试UserEventListener是否使用该消息,以及一个将使用来自some-topic-after-UserEvent的消息的测试使用者,以便在处理记录后,UserEventListener是否将消息发布到some-topic-after-UserEvent

KafkaConfigProperties类如下所示。

代码语言:javascript
复制
@Component
@ConfigurationProperties(prefix = "application.kafka")
data class KafkaConfigProperties(
    var bootstrap: String = "",
    var schemaRegistry: String = "",
    var deserializer: String = "",
    var userEventTopic: String = "",
)

application.yml看起来是这样的。

代码语言:javascript
复制
application:
  kafka:
    user-event-topic: "platform.user-events.v1"
    bootstrap: "localhost:9092"
    schema-registry: "http://localhost:8081"
    deserializer: com.project.userservice.config.MockAvroDeserializer

错误日志

代码语言:javascript
复制
com.project.userservice.user.UserEventListenerTest > initializationError FAILED
    kafka.common.KafkaException:
        at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:354)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.lambda$createKafkaTopics$4(EmbeddedKafkaBroker.java:341)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.doWithAdmin(EmbeddedKafkaBroker.java:368)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.createKafkaTopics(EmbeddedKafkaBroker.java:340)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:284)
        at org.springframework.kafka.test.rule.EmbeddedKafkaRule.before(EmbeddedKafkaRule.java:114)
        at com.project.userservice.user.UserEventListenerTest.setup(UserEventListenerTest.kt:62)

        Caused by:
        java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
            at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
            at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
            at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
            at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
            at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:351)
            ... 6 more

            Caused by:
            org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.

我试过的是:

@SpringBootTest(properties = ["application.kafka.bootstrap=localhost:2345"])

  • Use

  • 通过指定引导配置在每个测试中使用不同的引导服务器地址,例如,通过@SpringBootTest覆盖主题配置,从而在每个测试中使用不同的主题名称,就像在前面的项目点

  • @DirtiesContext添加到每个测试类

中的引导服务器覆盖一样。

包版本

1.3.61

  • Spring
  • Kotlin Boot - 2.2.3.RELEASE
  • io.projectreactor.kafka:reactor-kafka:1.2.2.RELEASE
  • org.springframework.kafka:spring-kafka-test:2.3.4.RELEASE (仅测试实现)

问题

我有多个使用EmbeddedKafkaRule的测试类,它们的设置大致相同。对于它们中的每一个,我都指定了不同的kafka引导服务器地址和主题名称,但我仍然间歇性地看到TopicAlreadyExists异常。

怎样才能使我的测试类保持一致?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-01-26 14:27:56

我指定了不同的kafka引导服务器地址和主题名称,但是我仍然间歇性地看到TopicAlreadyExists异常。

这是没有意义的;如果每次都有一个新端口,特别是新的主题名称,那么主题就不可能已经存在了。

一些建议:

由于您使用的是

  1. ,所以不要使用JUnit4 EmbeddedKafkaRule,而是使用EmbeddedKafkaBroker;或者简单地添加@EmbeddedKafka,代理将作为bean添加到Spring应用程序上下文及其由Spring管理的生命周期(使用@DirtiesContext销毁);对于非Spring测试,代理将由JUnit5 EmbeddedKafkaCondition创建(并销毁),并可通过JUnit5使用显式端口访问;让代理使用其默认的随机端口,并使用embeddedKafka.getBrokersAsString()作为引导服务器属性。如果您必须自己管理代理(在@BeforeAll中),请在embeddedKafka.getBrokersAsString()中使用destroy()
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59915744

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档