首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将numpy导入运行在GCP Dataflow上的Apache Beam管道?

如何将numpy导入运行在GCP Dataflow上的Apache Beam管道?
EN

Stack Overflow用户
提问于 2019-10-03 05:49:21
回答 1查看 777关注 0票数 1

我正在尝试使用Python (3.7)编写一个Apache光束管道。我在导入numpy时遇到了问题,特别是在我编写的一个DoFn转换类中尝试使用numpy时。

当在GCP DataFlow中运行时,我得到以下错误:"NameError: name 'numpy‘is not defined“

首先,在使用DirectRunner时,一切都是按照预期进行的。这个问题只出现在使用GCP的DataFlow运行器时。

我认为问题与GCP DataFlow中的作用域如何工作有关,而不是导入本身。例如,如果我将导入添加到类内的"process“方法中,则可以成功地使导入工作,但当我在文件顶部添加导入时,则无法成功。

我尝试使用需求文件和setup.py文件作为管道的命令选项,但没有任何变化。再说一次,我不认为问题是引入了numpy,而更多的是因为DataFlow具有意外的类/函数作用域。

setup.py文件

代码语言:javascript
复制
from __future__ import absolute_import
from __future__ import print_function
import setuptools

REQUIRED_PACKAGES = [
    'numpy',
    'Cython',
    'scipy',
    'google-cloud-bigtable'
]

setuptools.setup(
    name='my-pipeline',
    version='0.0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
)

总而言之,我遇到了许多关于“范围”的问题,我希望有人能帮助我,因为Apache光束文档确实没有很好地涵盖这一点。

代码语言:javascript
复制
from __future__ import absolute_import
from __future__ import division

import apache_beam as beam
import numpy

class Preprocess(beam.DoFn):

    def process(self, element, *args, **kwargs):
        # Demonstrating how I want to call numpy in the process function
        if numpy.isnan(numpy.sum(element['signal'])):
            return [MyOject(element['signal'])]

def run(argv=None):
    parser = argparse.ArgumentParser()
    args, pipeline_args = parser.parse_known_args(argv)
    options = PipelineOptions(pipeline_args)

    p = beam.Pipeline(options=options)
    messages = (p | beam.io.ReadFromPubSub(subscription=args.input_subscription).with_output_types(bytes))
    lines = messages | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
    json_messages = lines | "Jsonify" >> beam.Map(lambda x: json.loads(x))

    preprocess_messages = json_messages | "Preprocess" >> beam.ParDo(Preprocess())
    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

我希望管道的工作方式类似于在本地使用DirectRunner运行时的工作方式,但作用域/导入的工作方式不同,导致管道崩溃。

EN

回答 1

Stack Overflow用户

发布于 2019-10-03 06:46:15

当您从桌面启动Apache Beam程序时,该程序正在您的桌面上运行。您已经在本地安装了numpy库。但是,您尚未通知Dataflow下载并安装numpy。这就是为什么您的程序以DirectRunner身份运行,但以DataflowRunner身份运行失败。

编辑/创建一个普通的Python文件,并包含所有的依赖项,比如requirements.txt。我更喜欢使用virtualdev,导入所需的包,确保我的程序在DirectRunner下运行,然后运行pip freeze为requirements.txt创建我的包列表。现在,Dataflow将知道要导入哪些包,以便您的程序在Dataflow群集上运行。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58209797

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档