我使用Kafka连接两条具有3天连接窗口的流:
...
private final long retentionHours = Duration.ofDays(3);
...
var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
.grace(Duration.ofMillis(0));
var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
.withStoreName("STORE-1")
.withName("STORE-2");
stream1.join(stream2, streamJoiner(), joinWindow, joinStores);通过上面的实现,我发现Kafka创建了状态文件夹:/tmp/kafka-streams,(看起来像RocksDB)和,它不断地增长。而且,卡夫卡集群中的状态存储不断增长。
因此,我将streams实现更改为:
...
private final long retentionHours = Duration.ofDays(3);
...
var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
.grace(Duration.ofMillis(0));
var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
.withStoreName("STORE-1")
.withName("STORE-2")
.withThisStoreSupplier(createStoreSupplier("MEM-STORE-1"))
.withOtherStoreSupplier(createStoreSupplier("MEM-STORE-2"));
stream1.join(stream2, streamJoiner(), joinWindow, joinStores);
...
private WindowBytesStoreSupplier createStoreSupplier(String storeName) {
var window = Duration.ofMinutes(retentionHours * 2)
.toMillis();
return new InMemoryWindowBytesStoreSupplier(storeName, window, window, true);
}现在,没有状态文件夹:/tmp/kafka-streams.
这是否意味着InMemoryWindowBytesStoreSupplier根本不使用磁盘?如果是的话,它是如何工作的?
而且,我仍然看到卡夫卡集群中的状态商店不断地增长。
发布于 2020-08-03 14:42:04
,这是否意味着InMemoryWindowBytesStoreSupplier根本不使用磁盘?如果是的话,它是如何工作的?
InMemoryWindowBytesStore根本不使用磁盘。
一般来说,逻辑状态存储实际上被划分为多个状态存储“实例”(例如:每个流任务都有自己的本地状态存储实例)。具体来说,对于InMemoryWindowBytesStore,通过设计,这些存储实例管理内存中的所有本地数据。
也是如此,我仍然看到卡夫卡集群中的国家商店在不断增长。
然而,InMemoryWindowBytesStore 仍然是容错的,对于新的Kafka流开发人员来说,这常常令人困惑,因为在大多数软件中,“内存中”总是意味着“如果发生了什么事情,数据就会丢失”。然而,卡夫卡流的情况并非如此。状态存储总是“备份”到它的Kafka changelog主题,不管您是使用默认的状态存储(与RocksDB一起使用)还是使用内存中的状态存储。这解释了为什么您在Kafka cluster.中看到内存中的( changelog )数据数据不应该永远增长,顺便说一句,因为压缩变更主题来防止这种情况发生。
注意:然而,当使用内存存储时,可能发生的情况是,应用程序实例可能会耗尽内存(OOM),从而崩溃。虽然您的状态数据永远不会丢失,如上所述,您的应用程序将不会因为OOM崩溃而运行/它将只部分运行(一些应用实例运行OOM,另一些则没有)。此OOM问题不适用于默认存储(RocksDB),因为它管理磁盘上的数据,并且只为缓存目的使用内存(RAM)。但是,同样,应用程序可用性的问题与数据安全是正交的(无论应用程序是否崩溃,您的数据都是安全的)。
https://stackoverflow.com/questions/63224747
复制相似问题