首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >数据流Flex模板验证失败,没有给出任何原因

数据流Flex模板验证失败,没有给出任何原因
EN

Stack Overflow用户
提问于 2021-06-27 16:12:12
回答 2查看 144关注 0票数 0

我一直在编写一个数据流管道,并且正在使用flex模板。

我的代码从avro读取并处理它没有问题。但是当涉及到WriteToAvro或WriteToText时,数据流作业会失败,而且看起来像是在模板验证时失败。我完全没有理由这样做。

我试过很多方法。删除输出文件的参数并将其硬编码到中。为WriteToText切换WriteToAvro,但它还是失败了。

代码语言:javascript
复制
    with beam.Pipeline(options=options) as p:
        read_from_avro = p \
                         | 'ReadFromAvro' >> ReadFromAvro(input_file)

        redact_data = read_from_avro | "RedactData" >> IdentifyRedactData(project, redact_fields)

        redact_data | 'WriteToAvro' >> WriteToAvro(
                        file_path_prefix=output_file,
                        schema=s,
                        codec='deflate',
                        file_name_suffix='.avro')

join_pcollections的输出是一个集合,每个元素都是一个字典。

数据流日志提供了以下信息:

代码语言:javascript
复制
2021-06-27 09:04:46.728 BST Workflow failed.

2021-06-27 09:04:46.763 BST Cleaning up.

2021-06-27 09:04:46.817 BST Worker pool stopped.

有人知道这是怎么回事吗。仅供参考,当我删除最后一步并运行“ProcessData”步骤时,一切都运行得很顺利。这是最后一个刚刚中断的写入步骤。

编辑以添加需求文件。

代码语言:javascript
复制
apache-beam==2.29.0
google-cloud==0.34.0
google-cloud-dlp==3.1.0
google-cloud-storage==1.35.0
google-cloud-core==1.4.1
google-cloud-datastore==1.15.0

如果我尝试使用apache-beamgcp==2.29.0,构建会失败,所以我想知道这是否与此有关。

代码语言:javascript
复制
apache-beam[gcp] 2.29.0 depends on google-cloud-dlp<2 and >=0.12.0; extra == "gcp"
EN

回答 2

Stack Overflow用户

发布于 2021-06-29 18:29:03

已修复。我认为这个问题源于没有正确配置管道选项。我还根据flex wordcount示例更改了管道的运行方式。

代码语言:javascript
复制
    options = PipelineOptions(beam_args)
    options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=options)

    project = options.get_all_options().get('project')

    read_from_avro = p \
                     | 'ReadFromAvro' >> ReadFromAvro(input_file)

    redact_data = read_from_avro | "RedactData" >> IdentifyRedactData(project, redact_fields)

    redact_data | 'WriteToAvro' >> WriteToAvro(
                    file_path_prefix=output_file,
                    schema=table_schema,
                    codec='deflate')

    result = p.run()
    result.wait_until_finish()
票数 1
EN

Stack Overflow用户

发布于 2021-06-29 03:19:17

从您的作业详细信息中,您可以导航到Cloud Logging。显示的默认日志集可能不包含错误,因此我建议更改筛选器以显示所有日志。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68149207

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档