我有一个使用spring云流库的kafka流处理应用程序。此应用程序利用3个application.id值侦听3个主题。对于其中两个输入主题,在处理数据之后,我将消息推送到相应的输出主题上,然后使用这些主题创建GlobalKTables,如下所示:
streamsBuilder.globalTable(firstSSTopic, Consumed.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>
as("ss-1")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
streamsBuilder.globalTable(secondSSTopic, Consumed.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>
as("ss-2")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));因此,问题是,它使用什么application.id来使用来自"firstSSTopic“和"secondSSTopic”的数据?还是仅仅是GlobalStreamThread作为一个独立的消费者,没有任何集团?当我检查默认的状态目录(tmp/kafka)时,我可以看到所有3个application.id目录下的全局状态存储的sst和日志文件。我怎么才能避免这种情况?因为这将占用3X磁盘空间,并可能导致存储被迅速填满。
发布于 2020-05-08 09:43:03
GlobalKTable只能用作流表连接的右侧输入。
,但不明白为什么它会在多个地方将相同的数据保存在同一个应用实例中
这提供了使用KStream执行联接的能力,而不必重新划分输入流。
我怎么才能避免这种情况呢?
使用GlobalKTable无法避免这种情况。
https://stackoverflow.com/questions/61675315
复制相似问题