我试图在Google平台中作为数据流作业执行apache管道。
我的项目结构如下:
root_dir/
__init__.py
setup.py
main.py
utils/
__init__.py
log_util.py
config_util.py这是我的setup.py
setuptools.setup(
name='dataflow_example',
version='1.0',
install_requires=[
"google-cloud-tasks==2.2.0",
"google-cloud-pubsub>=0.1.0",
"google-cloud-storage==1.39.0",
"google-cloud-bigquery==2.6.2",
"google-cloud-secret-manager==2.0.0",
"google-api-python-client==2.3.0",
"oauth2client==4.1.3",
"apache-beam[gcp]>=2.20.0",
"wheel>=0.36.2"
],
packages=setuptools.find_packages()
)这是我的管道代码:
import math
import apache_beam as beam
from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions
from utils.log_util import LogUtil
from utils.config_util import ConfigUtil
class DataflowExample:
config = {}
def __init__(self):
self.config = ConfigUtil.get_config(module_config=["config"])
self.project = self.config['project']
self.region = self.config['location']
self.bucket = self.config['core_bucket']
self.batch_size = 10
def execute_pipeline(self):
try:
LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")
query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"
beam_options = {
"project": self.project,
"region": self.region,
"job_name": "dataflow_example",
"runner": "DataflowRunner",
"temp_location": f"gs://{self.bucket}/temp_location/"
}
options = PipelineOptions(**beam_options, save_main_session=True)
with beam.Pipeline(options=options) as pipeline:
data = (
pipeline
| 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
| 'Count records' >> beam.combiners.Count.Globally()
| 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
)
LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
except Exception as e:
LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")
class PrintCount(beam.DoFn):
def __init__(self):
self.logger = LogUtil()
def process(self, row_count, batch_size):
try:
current_date = datetime.today().date()
total = int(math.ceil(row_count / batch_size))
self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")
self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
except Exception as e:
self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process - {str(e)}")
if __name__ == "__main__":
df_example = DataflowExample()
df_example.execute_pipeline()管道的功能是
对querying.
我使用使用command - python3 - main.py的云shell运行作业
虽然数据流作业开始了,但是在几分钟后,工作节点在说"ModuleNotFoundError: No模块“‘utils’之后抛出错误。
"utils“文件夹是可用的,在使用"DirectRunner”执行时,相同的代码可以正常工作。
log_util和config_util文件分别是用于日志记录和配置获取的自定义util文件。
此外,我尝试使用setup_file选项作为python3 - main.py --setup_file </path/of/setup.py>运行,这使得作业冻结,即使在15分钟后也不会继续运行。
如何用“ModuleNotFoundError”解析DataflowRunner?
发布于 2021-09-23 01:18:52
作为社区维基发布。正如@GopinathS所确认的,错误和修复如下:
工作人员遇到的错误是Beam SDK base version 2.32.0 does not match Dataflow Python worker version 2.28.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed。
要修复这个,“apache-beamgcp>=2.20.0”将从install_requires of setup.py中移除,因为'>='正在分配最新的可用版本(本文编写时为2.32.0),而辅助版本仅为2.28.0。
更新的setup.py:
setuptools.setup(
name='dataflow_example',
version='1.0',
install_requires=[
"google-cloud-tasks==2.2.0",
"google-cloud-pubsub>=0.1.0",
"google-cloud-storage==1.39.0",
"google-cloud-bigquery==2.6.2",
"google-cloud-secret-manager==2.0.0",
"google-api-python-client==2.3.0",
"oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
"wheel>=0.36.2"
],
packages=setuptools.find_packages()
)更新管道代码中的beam_options:
beam_options = {
"project": self.project,
"region": self.region,
"job_name": "dataflow_example",
"runner": "DataflowRunner",
"temp_location": f"gs://{self.bucket}/temp_location/",
"setup_file": "./setup.py"
}还要确保同时传递所有管道选项,而不是部分传递。
如果在命令中传递--setup_file </path/of/setup.py>,那么确保使用代码中的argument_parser读取并将安装文件路径附加到已经定义的beam_options变量中。
为了避免解析参数并将其附加到beam_options中,我将其直接添加到beam_options中作为"setup_file": "./setup.py"添加。
发布于 2022-06-18 10:49:30
数据流在安装平台锁定在孤立网络中的软件包时可能会出现问题。如果没有网络,它将无法编译它们。或者,它可能尝试安装它们,但由于无法编译下载车轮?不知道。
仍然要能够使用像psycopg2 (二进制文件)或google云秘密管理器(没有二进制文件,但依赖项有二进制文件)这样的包,您需要安装所有没有二进制文件(没有-任何)和没有二进制文件的依赖项的包,这些包是由requirements.txt安装的,其余的都是由--extra_packages param with has安装的。示例:
--extra_packages=package_1_needed_by_2-manylinux.whl \
--extra_packages=package_2_needed_by_3-manylinux.whl \
--extra_packages=what-you-need_needing_3-none-any.whlhttps://stackoverflow.com/questions/69227398
复制相似问题