我正在尝试为kafka消息设置我的集成测试,并从使用Embedded-Kafka转向使用Testcontainers。给出了用于所有集成测试的docker-组合和基类的以下配置:
卡夫卡-作曲家
version: '3.3'
services:
zookeeper:
image: "wurstmeister/zookeeper"
kafka:
image: "wurstmeister/kafka:2.12-2.2.2"
ports:
- "9092:9092"
depends_on:
- "zookeeper"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_HOST_NAME: "${KAFKA_HOST:-localhost}"
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_CREATE_TOPICS: "recoverer-test:1:1,some-topic"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"@SpringBootTest
@Slf4j
public class IntegrationTest {
private static final DockerComposeContainer kafkaContainer = initializeKafkaContainer();
protected static DockerComposeContainer initializeKafkaContainer() {
log.info(
"Initializing kafka container. Should be called only once. Current value of the kafkaContainer: {}",
kafkaContainer);
try {
var kafkaContainer =
new DockerComposeContainer(new File("src/test/resources/kafka-compose.yml"))
.withExposedService("kafka_1", 9092);
kafkaContainer.start();
var bootstrapServers =
format(
"PLAINTEXT://%s:%s",
kafkaContainer.getServiceHost("kafka_1", 9092),
kafkaContainer.getServicePort("kafka_1", 9092));
System.setProperty("spring.embedded.kafka.brokers", bootstrapServers);
return kafkaContainer;
} catch (Throwable t) {
log.error("Can't initialize the Kafka test container.", t);
throw t;
}
}@DirtiesContext(classMode = ClassMode.BEFORE_CLASS)
class PerformSomethingInboundAdapterTest extends IntegrationTest {
private static final String GROUP_ID = "test-group-id";
private static final TopicPartition PARTITION = new TopicPartition(SOME_TOPIC, 0);
private static final Instant RECEIVED_AT = now();
private static final CustomerNumber CUSTOMER_NUMBER = CustomerNumber.of(600830);
@Autowired private KafkaListenerEndpointRegistry kafkaListenerRegistry;
@Autowired private ConsumerFactory<String, String> consumerFactory;
@Autowired private KafkaTemplate<Object, Object> kafkaTemplate;
@MockBean private ActivateSomethingActivities activateCampaignActivities;
private Consumer<String, String> consumer;
private long initiallyCommittedOffset;
@BeforeEach
void startKafkaListener() {
kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::start);
}
@AfterEach
void stopKafkaListener() {
kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::stop);
}
@Test
void shouldPerformSomething() {
...
}我遇到的问题很少:
@KafkaListener,并为特定的@SpringBootTest显式启用它们,@KafkaListener附带了一个卡夫卡模块。这一款采用了融合卡夫卡码头形象,在配置上非常顽固。例如,您不能设置一些代理属性,也不能告诉容器应该在启动后创建哪个主题。在与这个模块斗争之后,我决定使用带有wurstmeister/kafka映像的对接-合成模块。后一种方法的问题是,当我使用命令行maven运行测试时,会收到错误消息,说明kafka已经在9092端口上运行。在mvn test期间,maven似乎很少启动JVM,因此静态字段kafkaContainer被初始化了几次。为什么会这样?发布于 2020-06-29 14:20:26
当测试完成时,
@DirtiesContext关闭侦听器。不知道第二条。
发布于 2020-06-29 16:43:03
可以通过在不同的测试中使用不同的上下文来克服1
示例:
@ExtendWith(SpringExtension.class)
@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@TestExecutionListeners({DependencyInjectionTestExecutionListener.class,
FlywayTestExecutionListener.class})
@FlywayTest
@ActiveProfiles({"test"})
public abstract class AbstractDatabaseTest {这是我为集成测试创建的测试,我只想在需要数据库层的地方扩展这个测试,您可以尝试为Kafka测试创建一个类似的测试。
一般来说,@SpringBootTest会引导整个应用程序,这可能需要更长的时间,我个人不喜欢。
2很难说。您可以尝试在initializeKafkaContainer()中打印堆栈跟踪,以查看哪个测试可以做到这一点。或者,如果您尝试使用前面的方法,您可以在抽象类中的静态块中进行初始化,然后对其进行扩展的每个测试都将使用现有的静态容器,并且只初始化一次。
https://stackoverflow.com/questions/62634909
复制相似问题