首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从google cloud composer运行Dataflow时出现的导入依赖项问题

从google cloud composer运行Dataflow时出现的导入依赖项问题
EN

Stack Overflow用户
提问于 2019-03-05 01:41:00
回答 3查看 1.4K关注 0票数 1

我正在从google cloud composer运行Dataflow,该dataflow脚本包含一些非标准依赖项,如zeep、googleads。它们需要安装在数据流工作节点上,所以我用setup.py将它们打包。当我尝试在dag中运行它时,composer正在验证数据流文件并抱怨No module names Zeep , googleads。因此,我创建了pythonvirtualenvoperator,并安装了所有需要的非标准依赖项,并尝试运行数据流作业,但它仍然抱怨导入zeep和googleads。

下面是我的代码库:

代码语言:javascript
复制
PULL_DATA = PythonVirtualenvOperator(
    task_id=PROCESS_TASK_ID,
    python_callable=execute_dataflow,
    op_kwargs={
        'main': 'main.py',
        'project': PROJECT,
        'temp_location': 'gs://bucket/temp',
        'setup_file': 'setup.py',
        'max_num_workers': 2,
        'output': 'gs://bucket/output',
        'project_id': PROJECT_ID},
    requirements=['google-cloud-storage==1.10.0', 'zeep==3.2.0',
                  'argparse==1.4.0', 'google-cloud-kms==0.2.1',
                  'googleads==15.0.2', 'dill'],
    python_version='2.7',
    use_dill=True,
    system_site_packages=True,
    on_failure_callback=on_failure_handler,
    on_success_callback=on_success_handler,
    dag='my-dag')

和我的python可调用代码:

代码语言:javascript
复制
def execute_dataflow(**kwargs):
        import subprocess
        TEMPLATED_COMMAND = """
                          python main.py \
                                 --runner DataflowRunner \
                                 --project {project} \
                                 --region us-central1 \
                                 --temp_location {temp_location} \
                                 --setup_file {setup_file} \
                                 --output {output} \
                                 --project_id {project_id} 
                          """.format(**kwargs)
        process = subprocess.Popen(['/bin/bash', '-c', TEMPLATED_COMMAND])
        process.wait()
        return process.returncode

我的main.py文件

代码语言:javascript
复制
import zeep
import googleads

{Apache-beam-code to construct dataflow pipeline}

有什么建议吗?

EN

回答 3

Stack Overflow用户

发布于 2019-03-05 08:50:35

我的工作有一个requirements.txt。它没有像您那样使用--setup_file选项,而是指定了以下内容:

代码语言:javascript
复制
--requirements_file prod_requirements.txt

这将告诉DataFlow在运行作业之前在requirements.txt中安装库。

参考:https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

票数 0
EN

Stack Overflow用户

发布于 2019-03-19 18:26:52

使用带有import googleads, zeep的示例数据流管道脚本,我设置了一个测试编写器环境。DAG和您的一样,我也得到了同样的错误。然后,我做了一些更改,以确保可以在worker机器上找到依赖项。

在DAG中,我使用普通的PythonOperator,而不是PythonVirtualenvOperator。我有我的数据流管道和设置文件(main.pysetup.py) in a Google Cloud Storage bucket,所以Composer可以找到它们。设置文件中有一个我需要的需求列表,例如zeep和googleads。我改编了一个来自here的示例安装文件,更改了以下内容:

代码语言:javascript
复制
REQUIRED_PACKAGES = [
    'google-cloud-storage==1.10.0', 'zeep==3.2.0',
'argparse==1.4.0', 'google-cloud-kms==0.2.1',
'googleads==15.0.2', 'dill'
    ]

setuptools.setup(
    name='Imports test',
    version='1',
    description='Imports test workflow package.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        # Command class instantiated and run during pip install scenarios.
        'build': build,
        'CustomCommands': CustomCommands,
        }
    )

我的DAG是

代码语言:javascript
复制
with models.DAG(  'composer_sample',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    PULL_DATA = PythonOperator(
    task_id='PULL_DATA',
    python_callable=execute_dataflow,
    op_kwargs={
        'main': '/home/airflow/gcs/data/main.py',
        'project': PROJECT,
        'temp_location': 'gs://dataflow-imports-test/temp',
        'setup_file': '/home/airflow/gcs/data/setup.py',
        'max_num_workers': 2,
        'output': 'gs://dataflow-imports-test/output',
        'project_id': PROJECT_ID})
    PULL_DATA

而不更改Python callable。但是,使用此配置时,我仍然会得到错误。

Next step,在Google Cloud Platform (GCP)控制台中,通过导航菜单转到"Composer“,然后单击环境名称。在"PyPI packages“选项卡上,我添加了zeep和googleads,然后单击"submit”。更新环境需要一段时间,但它是有效的。

完成这一步后,我的管道就能够导入依赖项并成功运行了。我还尝试使用GCP控制台上指示的依赖项运行DAG,但不是setup.py要求中的依赖项。工作流程再次中断,但在不同的位置。因此,一定要在两个地方都指出它们。

票数 0
EN

Stack Overflow用户

发布于 2021-02-18 09:35:07

您需要在Cloud Composer环境中安装库(请查看this link)。有一种方法可以在控制台中完成,但我发现这些步骤更容易:

Click of the >打开您的environments page

  • Select作曲家所在的实际环境running

  • Navigate到

  • Packages选项卡

  • 单击edit

  • Manually添加requirements.txt

  • Save

的每一行

如果您为库提供的版本太旧,可能会出现错误,因此请检查日志并根据需要更新编号。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54988712

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档