我试图为一个应用程序编写Junit测试,该应用程序使用kafka流将数据从kafka发送到websocket连接。我已经能够在本地运行应用程序,发布到主题并看到套接字连接上返回的数据。然而,我正在努力正确地设置测试环境。
代码测试
当前,我要测试的方法如下:
public Source<JsonNode, ?> getKafkaStream(String groupId, Set<Id> idsBelongingToUser) {
return Consumer.plainSource(getKafkaConsumerSettings(groupId), Subscriptions.topics(this.kafkaTopic))
.map(ConsumerRecord::value)
.filter(event -> idsBelongingToUser.contains(event.getId())
.map(consumerRecord -> objectMapper.convertValue(consumerRecord, JsonNode.class));
}然后,我在这里的流中使用这个源:
public Flow<JsonNode, JsonNode, ?> getDataStream(String groupId, Set<Id> idsBelogingToUser) {
return Flow.fromSinkAndSource(
Sink.ignore(),
this.getKafkaStream(accountOfListener, idsBelogingToUser).runWith(BroadcastHub.of(JsonNode.class), this.materializer)
);
}我的试题
我最近的尝试是用TestcontainersKafkaJunit4Test测试代码,下面是我如何设计它的修改文件。
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings;
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test;
import akka.stream.testkit.javadsl.TestSink;
import akka.testkit.javadsl.TestKit;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.SneakyThrows;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import java.util.concurrent.CompletionStage;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
@TestInstance(Lifecycle.PER_CLASS)
public class KafkaTest extends TestcontainersKafkaJunit4Test {
KafkaService toTest;
String topic;
private static final ActorSystem system = ActorSystem.create("TestkitTestcontainersTest");
KafkaTest() {
super(
system,
KafkaTestkitTestcontainersSettings.create(system)
.withSchemaRegistry(true)
.withNumBrokers(1)
.withInternalTopicsReplicationFactor(1)
);
this.setUpAdminClient();
}
@SneakyThrows
public <T> T getResultFromFuture(CompletionStage<T> future) {
await().atMost(5, SECONDS).until(() -> future.toCompletableFuture().isDone());
return future.toCompletableFuture().get();
}
@Test
void test() {
this.topic = createTopic();
toTest = new KafkaService();
JsonNode event1 = createRandomEvent();
JsonNode event2 = createRandomEvent();
String group = "some-key";
//produce some messages
getResultFromFuture(this.produce(this.topic, new StringSerializer(), new JsonSerdes<>(JsonNode.class).serializer(),
Pair.create(group, event1),
Pair.create(group, event2)));
//Here I want to test the source is getting the messages produced above
//One Attempt
toTest.getKafkaStream(group, getSetOfIds())
.runWith(TestSink.probe(system), materializer)
.expectNext(event1)
.expectNext(event2);
}
JsonNode createRandomEvent() { ... }
@AfterAll
void shutdownActorSystem() {
this.cleanUpAdminClient();
TestKit.shutdownActorSystem(system);
}
}我相信设置已经接近完成,因为当我从使用源流到使用消息并使用使用者对象来使用消息时,我可以看到这些事件。我尝试在map(ConsumerRecord::value)函数中打印,并且没有日志,这使我相信它从未被执行过。不幸的是,alpakka kafka没有很多关于如何测试的例子,而且作为kafka和scala的初学者,我很难将现有的示例应用到我的代码中。我使用了以下链接来设置这个链接:alpakka kafka测试文档 akka流测试
我稍微修改了代码,以使其更加通用。如果有任何信息不清楚,我会解释/编辑这个问题。预先感谢您的任何帮助/建议
发布于 2022-03-03 15:54:51
对我有用的方法是在我的测试用例中,我必须将kafka自动偏移配置设置为最早,而它是我用来创建源代码的使用者属性中的最新特性。
https://stackoverflow.com/questions/71044484
复制相似问题