我对Flink比较陌生,今天我在Flink 1.11.3会话集群上使用Flink SQL时遇到了一个问题。
问题
我注册了一个使用jdbc驱动程序的源表。我正在尝试将一些数据从这个在线数据库移动到AWS S3中,格式为parquet格式。这个表的大小很大(~43 GB)。作业在大约1分钟后失败,任务管理器在没有任何警告的情况下崩溃。但我的猜测是任务管理器内存不足。
我的观察
我发现,当我做tableEnv.executeSql("select ... from huge_table limit 1000")时,flink试图将整个源表扫描到内存中,然后才计划执行限制。
问题
由于我只关心最近几天的数据,有没有办法限制作业按时间戳扫描多少行??
附录
这里是一个最小的设置,可以重现问题(许多噪音消除)
Env设置代码
var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);Flink SQL中的源表DDL
CREATE TABLE source_transactions (
txid STRING,
username STRING,
amount BIGINT,
ts TIMESTAMP,
PRIMARY KEY (txid) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:postgresql://my.bank',
'table-name'='transactions',
'driver'='org.postgresql.Driver',
'username'='username',
'password'='password',
'scan.fetch-size'='2000'
)Flink SQL中的接收器表DDL
CREATE TABLE sink_transactions (
create_time TIMESTAMP,
username STRING,
delta_amount DOUBLE,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector'='filesystem',
'path'='s3a://s3/path/to/transactions',
'format'='parquet'
)在Flink SQL中插入查询
INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions发布于 2020-12-23 03:45:42
您的观察是正确的,Flink不支持JDBC连接器的极限下推优化,并且有一个几乎合并的PR来支持这个特性,这将在Flink 1.13中使用,如果您迫切需要这个特性,您可以在代码中选择这个补丁。
1.JIRA:FLINK-19650支持Jdbc的限制下推。
https://stackoverflow.com/questions/65410393
复制相似问题