首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Alpakka Kafka流永远不会终止

Alpakka Kafka流永远不会终止
EN

Stack Overflow用户
提问于 2020-05-22 11:54:03
回答 2查看 266关注 0票数 0

我们使用Alpakka Kafka streams来消费来自Kafka的事件。下面是如何定义流的方式:

代码语言:javascript
复制
ConsumerSettings<GenericKafkaKey, GenericKafkaMessage> consumerSettings = 
    ConsumerSettings
        .create(actorSystem, new KafkaJacksonSerializer<>(GenericKafkaKey.class), 
                new KafkaJacksonSerializer<>(GenericKafkaMessage.class))
        .withBootstrapServers(servers).withGroupId(groupId)
        .withClientId(clientId).withProperties(clientConfigs.defaultConsumerConfig());
CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
        .withMaxBatch(20)
        .withMaxInterval(Duration.ofSeconds(30));
Consumer.DrainingControl<Done> control = 
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topics))
        .mapAsync(props.getMessageParallelism(), msg ->
                CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
                        .thenCompose(param -> CompletableFuture.supplyAsync(() -> msg.committableOffset())))
        .toMat(Committer.sink(committerSettings), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);

下面是关闭流的代码片段:

代码语言:javascript
复制
CompletionStage<Done> completionStage = control.drainAndShutdown(actorSystem.dispatcher());
completionStage.toCompletableFuture().join();

我也尝试过在可完成的未来做一个get。但是,join和get on future都不会返回。其他人也遇到过类似的问题吗?我是不是做错了什么?

EN

回答 2

Stack Overflow用户

发布于 2020-05-23 21:57:11

如果想要从流外控制流的终止,则需要使用KillSwitch:https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html

票数 0
EN

Stack Overflow用户

发布于 2020-06-01 16:00:09

你的用法看起来是正确的,我找不到任何会妨碍排泄的东西。

Alpakka Kafka消费者最常错过的一件事是stop-timeout,它的默认设置是30秒。使用DrainingControl时,您可以安全地将其设置为0秒。

请参阅https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#draining-control

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61947594

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档