是否可以使用Java CompletableFutures在Kafka stream应用程序中执行并行工作?
我想从1Kafka主题中读取,创建两个窗口计数,1分钟,另一个小时,但它们是并行的。
我写了一些示例代码。我能够让它工作,但是看一下Kafka流文档,因为KafkaStreams为每个分区分配一个任务,并且它不能超过一个线程,我不确定这个代码是否会有预期的效果。
CompletableFuture completableFutureOfMinute = CompletableFuture.runAsync(() -> {
inputStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"minute-store")
.withRetention(Duration.ofMinutes(1)))
.toStream()
.to("result-topic");
});
CompletableFuture completableFutureOfHour = CompletableFuture.runAsync(() -> {
inputStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofHours(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"hour-store")
.withRetention(Duration.ofHours(1)))
.toStream()
.to("result-topic-2", produced);
});
final CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(completableFutureOfMinute,
completableFutureOfHour);
try {
combinedFutures.get();
} catch (final Exception ex) {
}发布于 2021-01-04 05:53:24
您的程序似乎不正确。
请注意,使用DSL,您基本上组装了一个数据流程序,只有在调用KafkaStreams#start()时才开始数据处理。因此,在指定处理逻辑的同时使用Futures是没有帮助的,因为还没有处理任何数据。
Kafka Streams基于任务进行并行化。因此,如果您希望并行处理这两个窗口,则需要“复制”输入主题以将您的程序(称为Topology)拆分为独立的部分(调用SubTopology):
KStream input = builder.stream(...);
input.groupByKey().windowBy(/* 1 min */).count(...);
input.repartition().groupByKey().windowBy(/* 1 hour */).count();使用repartition(),您的程序将被分成两个子拓扑,并且您将获得每个子拓扑的任务,这些任务可以由不同的线程并行处理。
然而,我真的怀疑这个程序是否会增加您的吞吐量。如果你真的想增加吞吐量,你应该增加输入主题分区的数量,以获得更多的并行任务。
https://stackoverflow.com/questions/63078748
复制相似问题