首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink SQL CSV连续流

Flink SQL CSV连续流
EN

Stack Overflow用户
提问于 2021-03-24 07:38:58
回答 1查看 201关注 0票数 0

我正在创建2个flink sql表,1个用于CSV文件系统,另一个用于kafka。其目标是持续监视文件系统文件夹,并将新的csv文件记录推送到kafka主题。但是我在下面写的查询,推送csv文件记录一次,flink作业进入“完成”模式,任何新的文件都不会被处理。请有人告诉我如何创建flink sql连续流与源和csv文件系统和目标为Kafka。

创建源表的Flink SQL

代码语言:javascript
复制
CREATE TABLE son_hsb_source_filesystem_csv_bulk(
    file_name STRING,
    start_time STRING,
    oss_cell_id BIGINT,
    enodeb STRING,
    dl_payload FLOAT,
    rrc_conn_den BIGINT,
    rrc_conn_num BIGINT,
    pm_array_1 STRING
) WITH (
    'connector' = 'filesystem', --Don't Change this
    'path' = 'file:///opt/kafka-python-exec/files/' , -- Change file name alone
    'format' = 'csv', --Don't Change this
    'format.ignore-parse-errors' = 'true', --Don't Change this
    'csv.ignore-parse-errors' = 'true', --Don't Change this
    'csv.allow-comments' = 'true' --Don't Change this
);

创建目标表的Flink SQL

代码语言:javascript
复制
CREATE TABLE son_hsb_target_kafka_9092_filesystem_bulk_tests(
    file_name STRING,
    start_time STRING,
    oss_cell_id BIGINT,
    enodeb STRING,
    dl_payload FLOAT,
    rrc_conn_den BIGINT,
    rrc_conn_num BIGINT,
    pm_array_1 STRING
) WITH (
    'connector' = 'kafka',  --Don't Change this
    'topic' = 'son_hsb_target_kafka_9092_fs_bulk_data_tests',  -- Add any topic name you want
    'scan.startup.mode' = 'earliest-offset',  --Don't Change this
    'properties.bootstrap.servers' = 'localhost:9092', --Don't Change this
    'format' = 'json',  --Don't Change this
    'json.fail-on-missing-field' = 'false', --Don't Change this
    'json.ignore-parse-errors' = 'true' --Don't Change this
);

Flink SQL创建流作业#--这只运行一次并进入完成模式

代码语言:javascript
复制
INSERT INTO son_hsb_target_kafka_9092_filesystem_bulk_tests
SELECT file_name,start_time,oss_cell_id,enodeb,dl_payload,rrc_conn_den,rrc_conn_num,pm_array_1 FROM son_hsb_source_filesystem_csv_bulk

如何定义始终处于“运行”状态并查找新文件的流作业。请建议一下。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-24 09:04:57

文档表明,对于流文件系统源,尚未实现此功能:

用于流的

文件系统源仍在开发中。将来,社区将增加对常见流用例的支持,即分区和目录监视。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66776601

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档