首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >数据流作业中的ModuleNotFoundError

数据流作业中的ModuleNotFoundError
EN

Stack Overflow用户
提问于 2021-09-17 17:26:02
回答 2查看 925关注 0票数 1

我试图在Google平台中作为数据流作业执行apache管道。

我的项目结构如下:

代码语言:javascript
复制
root_dir/
  __init__.py
  ​setup.py
  ​main.py
  ​utils/
    __init__.py
    log_util.py
    config_util.py

这是我的setup.py

代码语言:javascript
复制
setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3",
        "apache-beam[gcp]>=2.20.0",
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

这是我的管道代码:

代码语言:javascript
复制
import math
import apache_beam as beam

from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions

from utils.log_util import LogUtil
from utils.config_util import ConfigUtil


class DataflowExample:
    config = {}

    def __init__(self):
        self.config = ConfigUtil.get_config(module_config=["config"])
        self.project = self.config['project']
        self.region = self.config['location']
        self.bucket = self.config['core_bucket']
        self.batch_size = 10

    def execute_pipeline(self):
        try:
            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")

            query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"

            beam_options = {
                "project": self.project,
                "region": self.region,
                "job_name": "dataflow_example",
                "runner": "DataflowRunner",
                "temp_location": f"gs://{self.bucket}/temp_location/"
            }

            options = PipelineOptions(**beam_options, save_main_session=True)

            with beam.Pipeline(options=options) as pipeline:
                data = (
                        pipeline
                        | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
                        | 'Count records' >> beam.combiners.Count.Globally()
                        | 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
                )

            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
        except Exception as e:
            LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")


class PrintCount(beam.DoFn):

    def __init__(self):
        self.logger = LogUtil()

    def process(self, row_count, batch_size):
        try:
            current_date = datetime.today().date()
            total = int(math.ceil(row_count / batch_size))

            self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")

            self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
        except Exception as e:
            self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process  - {str(e)}")


if __name__ == "__main__":
    df_example = DataflowExample()
    df_example.execute_pipeline()

管道的功能是

对querying.

  • Print表的
  1. 查询。
  2. 使用显示在utils文件夹中的自定义日志模块计算从BigQuery获取的总记录。

我使用使用command - python3 - main.py的云shell运行作业

虽然数据流作业开始了,但是在几分钟后,工作节点在说"ModuleNotFoundError: No模块“‘utils’之后抛出错误。

"utils“文件夹是可用的,在使用"DirectRunner”执行时,相同的代码可以正常工作。

log_utilconfig_util文件分别是用于日志记录和配置获取的自定义util文件。

此外,我尝试使用setup_file选项作为python3 - main.py --setup_file </path/of/setup.py>运行,这使得作业冻结,即使在15分钟后也不会继续运行。

如何用“ModuleNotFoundError”解析DataflowRunner?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-09-23 01:18:52

作为社区维基发布。正如@GopinathS所确认的,错误和修复如下:

工作人员遇到的错误是Beam SDK base version 2.32.0 does not match Dataflow Python worker version 2.28.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed

要修复这个,“apache-beamgcp>=2.20.0”将install_requires of setup.py中移除,因为'>='正在分配最新的可用版本(本文编写时为2.32.0),而辅助版本仅为2.28.0。

更新的setup.py:

代码语言:javascript
复制
setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

更新管道代码中的beam_options

代码语言:javascript
复制
    beam_options = {
        "project": self.project,
        "region": self.region,
        "job_name": "dataflow_example",
        "runner": "DataflowRunner",
        "temp_location": f"gs://{self.bucket}/temp_location/",
        "setup_file": "./setup.py"
    }

还要确保同时传递所有管道选项,而不是部分传递。

如果在命令中传递--setup_file </path/of/setup.py>,那么确保使用代码中的argument_parser读取并将安装文件路径附加到已经定义的beam_options变量中。

为了避免解析参数并将其附加到beam_options中,我将其直接添加到beam_options中作为"setup_file": "./setup.py"添加。

票数 2
EN

Stack Overflow用户

发布于 2022-06-18 10:49:30

数据流在安装平台锁定在孤立网络中的软件包时可能会出现问题。如果没有网络,它将无法编译它们。或者,它可能尝试安装它们,但由于无法编译下载车轮?不知道。

仍然要能够使用像psycopg2 (二进制文件)或google云秘密管理器(没有二进制文件,但依赖项有二进制文件)这样的包,您需要安装所有没有二进制文件(没有-任何)和没有二进制文件的依赖项的包,这些包是由requirements.txt安装的,其余的都是由--extra_packages param with has安装的。示例:

代码语言:javascript
复制
--extra_packages=package_1_needed_by_2-manylinux.whl \
--extra_packages=package_2_needed_by_3-manylinux.whl \
--extra_packages=what-you-need_needing_3-none-any.whl
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69227398

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档