首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡加入存储器

卡夫卡加入存储器
EN

Stack Overflow用户
提问于 2020-08-03 06:32:46
回答 1查看 510关注 0票数 0

我使用Kafka连接两条具有3天连接窗口的流:

代码语言:javascript
复制
    ...
 private final long retentionHours = Duration.ofDays(3);

    ...
    var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
                                .grace(Duration.ofMillis(0));
    var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
                                 .withStoreName("STORE-1")
                                 .withName("STORE-2");
    stream1.join(stream2, streamJoiner(), joinWindow, joinStores);

通过上面的实现,我发现Kafka创建了状态文件夹:/tmp/kafka-streams,(看起来像RocksDB)和,它不断地增长。而且,卡夫卡集群中的状态存储不断增长

因此,我将streams实现更改为:

代码语言:javascript
复制
...
private final long retentionHours = Duration.ofDays(3);
    ...
    var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
                                .grace(Duration.ofMillis(0));
    var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
                                 .withStoreName("STORE-1")
                                 .withName("STORE-2")
                                 .withThisStoreSupplier(createStoreSupplier("MEM-STORE-1"))
                                 .withOtherStoreSupplier(createStoreSupplier("MEM-STORE-2"));
    stream1.join(stream2, streamJoiner(), joinWindow, joinStores);

...
private WindowBytesStoreSupplier createStoreSupplier(String storeName) {
    var window = Duration.ofMinutes(retentionHours * 2)
                         .toMillis();
    return new InMemoryWindowBytesStoreSupplier(storeName, window, window, true);
}

现在,没有状态文件夹:/tmp/kafka-streams.

这是否意味着InMemoryWindowBytesStoreSupplier根本不使用磁盘?如果是的话,它是如何工作的?

而且,我仍然看到卡夫卡集群中的状态商店不断地增长

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-03 14:42:04

,这是否意味着InMemoryWindowBytesStoreSupplier根本不使用磁盘?如果是的话,它是如何工作的?

InMemoryWindowBytesStore根本不使用磁盘。

一般来说,逻辑状态存储实际上被划分为多个状态存储“实例”(例如:每个流任务都有自己的本地状态存储实例)。具体来说,对于InMemoryWindowBytesStore,通过设计,这些存储实例管理内存中的所有本地数据。

也是如此,我仍然看到卡夫卡集群中的国家商店在不断增长。

然而,InMemoryWindowBytesStore 仍然是容错的,对于新的Kafka流开发人员来说,这常常令人困惑,因为在大多数软件中,“内存中”总是意味着“如果发生了什么事情,数据就会丢失”。然而,卡夫卡流的情况并非如此。状态存储总是“备份”到它的Kafka changelog主题,不管您是使用默认的状态存储(与RocksDB一起使用)还是使用内存中的状态存储。这解释了为什么您在Kafka cluster.中看到内存中的( changelog )数据数据不应该永远增长,顺便说一句,因为压缩变更主题来防止这种情况发生。

注意:然而,当使用内存存储时,可能发生的情况是,应用程序实例可能会耗尽内存(OOM),从而崩溃。虽然您的状态数据永远不会丢失,如上所述,您的应用程序将不会因为OOM崩溃而运行/它将只部分运行(一些应用实例运行OOM,另一些则没有)。此OOM问题不适用于默认存储(RocksDB),因为它管理磁盘上的数据,并且只为缓存目的使用内存(RAM)。但是,同样,应用程序可用性的问题与数据安全是正交的(无论应用程序是否崩溃,您的数据都是安全的)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63224747

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档