首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Alpakka Kafka Java测试

Alpakka Kafka Java测试
EN

Stack Overflow用户
提问于 2022-02-09 05:38:31
回答 1查看 97关注 0票数 0

我试图为一个应用程序编写Junit测试,该应用程序使用kafka流将数据从kafka发送到websocket连接。我已经能够在本地运行应用程序,发布到主题并看到套接字连接上返回的数据。然而,我正在努力正确地设置测试环境。

代码测试

当前,我要测试的方法如下:

代码语言:javascript
复制
    public Source<JsonNode, ?> getKafkaStream(String groupId, Set<Id> idsBelongingToUser) {
        return Consumer.plainSource(getKafkaConsumerSettings(groupId), Subscriptions.topics(this.kafkaTopic))
            .map(ConsumerRecord::value)
            .filter(event -> idsBelongingToUser.contains(event.getId())
            .map(consumerRecord -> objectMapper.convertValue(consumerRecord, JsonNode.class));
    }

然后,我在这里的流中使用这个源:

代码语言:javascript
复制
    public Flow<JsonNode, JsonNode, ?> getDataStream(String groupId, Set<Id> idsBelogingToUser) {
        return Flow.fromSinkAndSource(
            Sink.ignore(),
            this.getKafkaStream(accountOfListener, idsBelogingToUser).runWith(BroadcastHub.of(JsonNode.class), this.materializer)
        );
    }

我的试题

我最近的尝试是用TestcontainersKafkaJunit4Test测试代码,下面是我如何设计它的修改文件。

代码语言:javascript
复制
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings;
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test;
import akka.stream.testkit.javadsl.TestSink;
import akka.testkit.javadsl.TestKit;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.SneakyThrows;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;

import java.util.concurrent.CompletionStage;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;


@TestInstance(Lifecycle.PER_CLASS)
public class KafkaTest extends TestcontainersKafkaJunit4Test {
    KafkaService toTest;
    String topic;

    private static final ActorSystem system = ActorSystem.create("TestkitTestcontainersTest");
    KafkaTest() {
        super(
            system,

            KafkaTestkitTestcontainersSettings.create(system)
                .withSchemaRegistry(true)
                .withNumBrokers(1)
                .withInternalTopicsReplicationFactor(1)
        );
        this.setUpAdminClient();
    }

    @SneakyThrows
    public <T> T getResultFromFuture(CompletionStage<T> future) {
        await().atMost(5, SECONDS).until(() -> future.toCompletableFuture().isDone());
        return future.toCompletableFuture().get();
    }

    @Test
    void test() {
        this.topic = createTopic();
        toTest = new KafkaService();

        JsonNode event1 = createRandomEvent();
        JsonNode event2 = createRandomEvent();
        String group = "some-key";

        //produce some messages
        getResultFromFuture(this.produce(this.topic, new StringSerializer(), new JsonSerdes<>(JsonNode.class).serializer(),
            Pair.create(group, event1),
            Pair.create(group, event2)));


        //Here I want to test the source is getting the messages produced above

        //One Attempt
        toTest.getKafkaStream(group, getSetOfIds())
            .runWith(TestSink.probe(system), materializer)
            .expectNext(event1)
            .expectNext(event2);
    }

    JsonNode createRandomEvent() { ... }
    @AfterAll
    void shutdownActorSystem() {
        this.cleanUpAdminClient();
        TestKit.shutdownActorSystem(system);
    }
}

我相信设置已经接近完成,因为当我从使用源流到使用消息并使用使用者对象来使用消息时,我可以看到这些事件。我尝试在map(ConsumerRecord::value)函数中打印,并且没有日志,这使我相信它从未被执行过。不幸的是,alpakka kafka没有很多关于如何测试的例子,而且作为kafka和scala的初学者,我很难将现有的示例应用到我的代码中。我使用了以下链接来设置这个链接:alpakka kafka测试文档 akka流测试

我稍微修改了代码,以使其更加通用。如果有任何信息不清楚,我会解释/编辑这个问题。预先感谢您的任何帮助/建议

EN

回答 1

Stack Overflow用户

发布于 2022-03-03 15:54:51

对我有用的方法是在我的测试用例中,我必须将kafka自动偏移配置设置为最早,而它是我用来创建源代码的使用者属性中的最新特性。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71044484

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档