在我的工作流程中,我使用pyflink从RDS加载和转换数据,并将其存入MYSQL。使用FLINK CDC,我能够从RDS获得我想要的数据,并使用JDBC库汇到MYSQL。我的目标是读取一个表,并使用下面的代码示例创建另外10个表,在一个作业中(基本上将一个巨大的表分成较小的表)。我面临的问题是,尽管在flink cdc中使用RocksDB作为状态后端和选项,如scan.incremental.snapshot.chunk.size、scan.snapshot.fetch.size和debezium.min.row. count.to.stream.result,但内存使用量仍在增长,导致2 2GB内存的任务管理器失败。我的直觉是,一个简单的select- insert查询无论如何都会加载内存中的所有表!如果是这样的话,我能以某种方式避免这种情况吗?表的大小约为500k行。
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
stmt_set = t_env.create_statement_set()
create_kafka_source= (
"""
CREATE TABLE somethin(
bla INT,
bla1 DOUBLE,
bla2 TIMESTAMP(3),
PRIMARY KEY(bla2) NOT ENFORCED
) WITH (
'connector'='mysql-cdc',
'server-id'='1000',
'debezium.snapshot.mode' = 'when_needed',
'debezium.poll.interval.ms'='5000',
'hostname'= 'som2',
'port' ='som2',
'database-name'='som3',
'username'='som4',
'password'='somepass',
'table-name' = 'atable'
)
"""
)
create_kafka_dest = (
"""CREATE TABLE IF NOT EXISTS atable(
time1 TIMESTAMP(3),
blah2 DOUBLE,
PRIMARY KEY(time_stamp) NOT ENFORCED
) WITH ( 'connector'= 'jdbc',
'url' = 'jdbc:mysql://name1:3306/name1',
'table-name' = 't1','username' = 'user123',
'password' = '123'
)"""
)
t_env.execute_sql(create_kafka_source)
t_env.execute_sql(create_kafka_dest)
stmt_set.add_insert_sql(
"INSERT INTO atable SELECT DISTINCT bla2,bla1,"
"FROM somethin"
)发布于 2021-11-24 19:30:45
在流查询中使用DISTINCT是昂贵的,特别是当对独特性没有任何时间限制时(例如,计算每天的唯一访问者)。我想这就是为什么你的查询需要很多状态。
但是,您应该能够让它工作。RocksDB并不总是表现良好;有时它会消耗比分配的内存更多的内存。
你使用的是什么版本的Flink?在Flink 1.11中进行了改进(通过切换到jemalloc),在Flink 1.14中进行了进一步的改进(通过升级到更新版本的RocksDB)。所以升级Flink可能会解决这个问题。否则,您可能需要基本上说谎并告诉Flink它的内存比它实际拥有的内存要少一些,这样当RocksDB超出限制时,它不会导致内存溢出错误。
https://stackoverflow.com/questions/70095121
复制相似问题