背景
我想将运行在DB上的作业移到Flink,使系统更“实时”工作。此作业将每10秒重新计算所有帐户的“状态”,如果该值满足某些条件,我们将通知用户。帐户的“状态”是根据他们拥有的股票数量和这些股票的当前价格计算出来的。
我的解决方案
我的想法是创建一个具有以下输入的管道:
我的解决方案是:加入BeginStream和StockStream,然后加入PriceStream (都是keyBy StockName),为了计算BeginStream的每一只股票,我将创建一个名为stockStates的ListState,其中包含关于股票数量和当前价格的信息。
对于来自StockStream和PriceStream的每个事件,我将更新stockStates,然后计算“状态”。如果这个值满足某些条件,我们将发送一条消息给其他Kafka主题,并从这个ListState中删除这个帐户。
BeginStream
.keyby(StockName)
.connect(
StockStream.
.keyby(StockName))
.flatMap(new EnrichmentFucntion())
...
.connect(
PriceStream.
.keyby(StockName))
.flatMap(new EnrichmentFucntion())该系统包含约500.000个账户,1.000个股票,每个账户包含10-20个股票,PriceStream和StockStream的吞吐量约为1.000条消息/秒。
问题
我是Flink的新手,因此我不太确定我的解决方案是否是一个好方法?对于类似的问题有什么设计模式吗?对于大约1000个ListState(每个列表包含大约500.000*10/ 1000 =5.000帐户的状态),我应该使用RocksDB进行状态存储吗?
如有任何建议,将不胜感激。
发布于 2022-05-27 23:56:44
是的,您的用例可以用flink实现。RocksDB作为一个状态后端是一个很好的选择。
https://stackoverflow.com/questions/71420452
复制相似问题