首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink SQL:源表太大,无法装入内存

Flink SQL:源表太大,无法装入内存
EN

Stack Overflow用户
提问于 2020-12-22 14:24:53
回答 1查看 582关注 0票数 3

我对Flink比较陌生,今天我在Flink 1.11.3会话集群上使用Flink SQL时遇到了一个问题。

问题

我注册了一个使用jdbc驱动程序的源表。我正在尝试将一些数据从这个在线数据库移动到AWS S3中,格式为parquet格式。这个表的大小很大(~43 GB)。作业在大约1分钟后失败,任务管理器在没有任何警告的情况下崩溃。但我的猜测是任务管理器内存不足。

我的观察

我发现,当我做tableEnv.executeSql("select ... from huge_table limit 1000")时,flink试图将整个源表扫描到内存中,然后才计划执行限制。

问题

由于我只关心最近几天的数据,有没有办法限制作业按时间戳扫描多少行?

附录

这里是一个最小的设置,可以重现问题(许多噪音消除)

Env设置代码

代码语言:javascript
复制
var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);

Flink SQL中的源表DDL

代码语言:javascript
复制
CREATE TABLE source_transactions (
    txid STRING,
    username STRING,
    amount BIGINT,
    ts TIMESTAMP,
    PRIMARY KEY (txid) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url'='jdbc:postgresql://my.bank',
    'table-name'='transactions',
    'driver'='org.postgresql.Driver',
    'username'='username',
    'password'='password',
    'scan.fetch-size'='2000'
)

Flink SQL中的接收器表DDL

代码语言:javascript
复制
CREATE TABLE sink_transactions (
    create_time TIMESTAMP,
    username STRING,
    delta_amount DOUBLE,
    dt STRING
) PARTITIONED BY (dt) WITH (
    'connector'='filesystem',
    'path'='s3a://s3/path/to/transactions',
    'format'='parquet'
)

在Flink SQL中插入查询

代码语言:javascript
复制
INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-12-23 03:45:42

您的观察是正确的,Flink不支持JDBC连接器的极限下推优化,并且有一个几乎合并的PR来支持这个特性,这将在Flink 1.13中使用,如果您迫切需要这个特性,您可以在代码中选择这个补丁。

1.JIRA:FLINK-19650支持Jdbc的限制下推。

2.PR:https://github.com/apache/flink/pull/13800

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65410393

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档