首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >执行从RDS到Mysql的Select - Insert时Flink SQL内存不足

执行从RDS到Mysql的Select - Insert时Flink SQL内存不足
EN

Stack Overflow用户
提问于 2021-11-24 11:17:53
回答 1查看 135关注 0票数 0

在我的工作流程中,我使用pyflink从RDS加载和转换数据,并将其存入MYSQL。使用FLINK CDC,我能够从RDS获得我想要的数据,并使用JDBC库汇到MYSQL。我的目标是读取一个表,并使用下面的代码示例创建另外10个表,在一个作业中(基本上将一个巨大的表分成较小的表)。我面临的问题是,尽管在flink cdc中使用RocksDB作为状态后端和选项,如scan.incremental.snapshot.chunk.sizescan.snapshot.fetch.sizedebezium.min.row. count.to.stream.result,但内存使用量仍在增长,导致2 2GB内存的任务管理器失败。我的直觉是,一个简单的select- insert查询无论如何都会加载内存中的所有表!如果是这样的话,我能以某种方式避免这种情况吗?表的大小约为500k行。

代码语言:javascript
复制
env = StreamExecutionEnvironment.get_execution_environment()

t_env = StreamTableEnvironment.create(env)
stmt_set = t_env.create_statement_set()


create_kafka_source= (
        """
            CREATE TABLE somethin(
               bla INT,
               bla1 DOUBLE,
               bla2 TIMESTAMP(3),
              PRIMARY KEY(bla2) NOT ENFORCED
         ) WITH (
        'connector'='mysql-cdc',
        'server-id'='1000',
        'debezium.snapshot.mode' = 'when_needed',   
        'debezium.poll.interval.ms'='5000',         
        'hostname'= 'som2',
        'port' ='som2',
        'database-name'='som3',
        'username'='som4',
        'password'='somepass',
        'table-name' = 'atable'
        )
        """
    )
create_kafka_dest = (
        """CREATE TABLE IF NOT EXISTS atable(
                    time1 TIMESTAMP(3),
                    blah2 DOUBLE,
                    PRIMARY KEY(time_stamp)  NOT ENFORCED

                    ) WITH (                       'connector'= 'jdbc',
                    'url' = 'jdbc:mysql://name1:3306/name1',
                    'table-name' = 't1','username' = 'user123',
                    'password' = '123'   
        )"""
    )



t_env.execute_sql(create_kafka_source)
t_env.execute_sql(create_kafka_dest)

stmt_set.add_insert_sql(
    "INSERT INTO atable SELECT  DISTINCT bla2,bla1,"
    "FROM somethin"
)
EN

回答 1

Stack Overflow用户

发布于 2021-11-24 19:30:45

在流查询中使用DISTINCT是昂贵的,特别是当对独特性没有任何时间限制时(例如,计算每天的唯一访问者)。我想这就是为什么你的查询需要很多状态。

但是,您应该能够让它工作。RocksDB并不总是表现良好;有时它会消耗比分配的内存更多的内存。

你使用的是什么版本的Flink?在Flink 1.11中进行了改进(通过切换到jemalloc),在Flink 1.14中进行了进一步的改进(通过升级到更新版本的RocksDB)。所以升级Flink可能会解决这个问题。否则,您可能需要基本上说谎并告诉Flink它的内存比它实际拥有的内存要少一些,这样当RocksDB超出限制时,它不会导致内存溢出错误。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70095121

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档