默认状态存储是rocksdb。我还可以找到内存中的状态存储。但是,我需要弄清楚kafka streams是否可以配置为使用mongodb作为状态存储,以及我是否可以自己定义这些集合。
发布于 2021-08-08 12:55:35
您将实现StateStore和StsteStoreSupplier,但是,建议您不要使用远程数据存储,因为这将导致拓扑中的不一致和相当大的滞后,而RocksDB实现已经考虑到了这一点。它还会导致数据库上的大量读写操作频繁发生,这将影响所有客户端
您会发现通常实现的是使用Kafka Connect MongoDB接收器在完全处理之后使用内置的状态存储选项和Debezium或Mongo源连接器将数据写入Kafka进行处理。
你可以在这里找到一个Redis statestore的POC,作为尝试为Mongo - https://github.com/andreas-schroeder/redisks编写自己的POC的灵感。
https://stackoverflow.com/questions/68689127
复制相似问题