首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我们可以使用CompletableFutures进行并行Kafka流处理吗

我们可以使用CompletableFutures进行并行Kafka流处理吗
EN

Stack Overflow用户
提问于 2020-07-25 01:38:06
回答 1查看 170关注 0票数 0

是否可以使用Java CompletableFutures在Kafka stream应用程序中执行并行工作?

我想从1Kafka主题中读取,创建两个窗口计数,1分钟,另一个小时,但它们是并行的。

我写了一些示例代码。我能够让它工作,但是看一下Kafka流文档,因为KafkaStreams为每个分区分配一个任务,并且它不能超过一个线程,我不确定这个代码是否会有预期的效果。

代码语言:javascript
复制
CompletableFuture completableFutureOfMinute = CompletableFuture.runAsync(() -> {
        inputStream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                        "minute-store")
                        .withRetention(Duration.ofMinutes(1)))
                .toStream()
                .to("result-topic");
    });

    CompletableFuture completableFutureOfHour = CompletableFuture.runAsync(() -> {
        inputStream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofHours(1)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                        "hour-store")
                        .withRetention(Duration.ofHours(1)))
                .toStream()
                .to("result-topic-2", produced);
    });

    final CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(completableFutureOfMinute,
            completableFutureOfHour);

    try {
        combinedFutures.get();
    } catch (final Exception ex) {

    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-04 05:53:24

您的程序似乎不正确。

请注意,使用DSL,您基本上组装了一个数据流程序,只有在调用KafkaStreams#start()时才开始数据处理。因此,在指定处理逻辑的同时使用Futures是没有帮助的,因为还没有处理任何数据。

Kafka Streams基于任务进行并行化。因此,如果您希望并行处理这两个窗口,则需要“复制”输入主题以将您的程序(称为Topology)拆分为独立的部分(调用SubTopology):

代码语言:javascript
复制
KStream input = builder.stream(...);
input.groupByKey().windowBy(/* 1 min */).count(...);
input.repartition().groupByKey().windowBy(/* 1 hour */).count();

使用repartition(),您的程序将被分成两个子拓扑,并且您将获得每个子拓扑的任务,这些任务可以由不同的线程并行处理。

然而,我真的怀疑这个程序是否会增加您的吞吐量。如果你真的想增加吞吐量,你应该增加输入主题分区的数量,以获得更多的并行任务。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63078748

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档