我有一个从RabbitMQ消耗的工作,我正在使用FS状态后端,但似乎状态的大小变得更大,然后我决定将我的状态转移到RocksDB。问题是,在运行作业的最初几个小时内,如果流量变慢,则会在更多时间后发生事件,但当流量再次变高时,消费者开始出现问题(事件被标记为未确认),然后这些问题会反映在应用程序的其余部分中。
我有:4个CPU核心
本地磁盘
16 RAM
Unix环境
Flink 1.11
Scala版本2.11
1个单作业运行,keyedStreams很少,大约有10个转换,并沉没到Postgres
一些配置
flink.buffer_timeout=50
flink.maxparallelism=4
flink.memory=16
flink.cpu.cores=4
#checkpoints
flink.checkpointing_compression=true
flink.checkpointing_min_pause=30000
flink.checkpointing_timeout=120000
flink.checkpointing_enabled=true
flink.checkpointing_time=60000
flink.max_current_checkpoint=1
#RocksDB configuration
state.backend.rocksdb.localdir=home/username/checkpoints (this is not working don't know why)
state.backend.rocksdb.thread.numfactory=4
state.backend.rocksdb.block.blocksize=16kb
state.backend.rocksdb.block.cache-size=512mb
#rocksdb or heap
state.backend.rocksdb.timer-service.factory=heap (I have test with rocksdb too and is the same)
state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED如果需要更多信息,请告诉我?
发布于 2020-09-10 22:09:07
state.backend.rocksdb.localdir应该是绝对路径,而不是相对路径。此设置不是用于指定检查点的位置(不应位于本地磁盘上),而是用于指定保存工作状态的位置(应位于本地磁盘上)。
你的工作正在经历反压力,这意味着管道的某些部分跟不上。最常见的背压原因是(1)下沉跟不上,(2)资源不足(例如,并行度太低)。
您可以通过运行带有丢弃接收器的作业来测试postgres是否存在问题。
查看各种指标可以让您了解哪些资源可能配置不足。
https://stackoverflow.com/questions/63830871
复制相似问题