我正在尝试在springboot中测试使用@KafkaListener创建的侦听器,但是侦听器总是在localhost:9092上侦听,而不是使用这个embededKafka
我的监听器看起来像这样:
@Component
@Slf4j
class SomeListener {
private final List<String> receivedMessages = new ArrayList<>();
@KafkaListener(topics = "some-ultra-cool-topic")
public void onKafkaMessage(String theMessage) {
log.info("Message received {}", theMessage);
receivedMessages.add(theMessage);
}
Collection<String> getAll() {
return unmodifiableCollection(receivedMessages);
}
}spock测试就像这样:
@SpringBootTest
@EmbeddedKafka
@TestPropertySource(properties = ['spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}', 'spring.kafka.consumer.auto-offset-reset=earliest'])
class SomeListenerTest extends Specification {
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker
@Autowired
SomeListener someListener
void 'should receive message'() {
given:
def sender = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(KafkaTestUtils.producerProps(embeddedKafkaBroker)))
when:
sender.send('some-ultra-cool-topic', 'first message content')
then:
someListener.all.size() == 1
}
}我的application.yaml没有配置bootstraps服务器-所以它是spring-boot的purly默认值。
我可以在日志中看到producer正在向broker发送消息(每次都在不同的随机端口上启动)。但侦听器始终尝试连接到localhost:9092上的代理
我如何配置它来使用这个嵌入式的?
发布于 2020-08-25 17:14:04
感谢@sawim的提示
实际的问题是在测试中。我最终用lib org.awaitility:awaitility做了这个测试。
then:
waitAtMost(5, SECONDS)
.untilAsserted({ ->
assertThat(personFacade.findAll(), hasSize(1))
})第一个示例中的配置工作正常,但是在启动期间,我可以看到kafka-logs尝试连接到localhost:9200 -似乎我们可以忽略它
https://stackoverflow.com/questions/63556389
复制相似问题