我正在尝试使用Python (3.7)编写一个Apache光束管道。我在导入numpy时遇到了问题,特别是在我编写的一个DoFn转换类中尝试使用numpy时。
当在GCP DataFlow中运行时,我得到以下错误:"NameError: name 'numpy‘is not defined“
首先,在使用DirectRunner时,一切都是按照预期进行的。这个问题只出现在使用GCP的DataFlow运行器时。
我认为问题与GCP DataFlow中的作用域如何工作有关,而不是导入本身。例如,如果我将导入添加到类内的"process“方法中,则可以成功地使导入工作,但当我在文件顶部添加导入时,则无法成功。
我尝试使用需求文件和setup.py文件作为管道的命令选项,但没有任何变化。再说一次,我不认为问题是引入了numpy,而更多的是因为DataFlow具有意外的类/函数作用域。
setup.py文件
from __future__ import absolute_import
from __future__ import print_function
import setuptools
REQUIRED_PACKAGES = [
'numpy',
'Cython',
'scipy',
'google-cloud-bigtable'
]
setuptools.setup(
name='my-pipeline',
version='0.0.1',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
)总而言之,我遇到了许多关于“范围”的问题,我希望有人能帮助我,因为Apache光束文档确实没有很好地涵盖这一点。
from __future__ import absolute_import
from __future__ import division
import apache_beam as beam
import numpy
class Preprocess(beam.DoFn):
def process(self, element, *args, **kwargs):
# Demonstrating how I want to call numpy in the process function
if numpy.isnan(numpy.sum(element['signal'])):
return [MyOject(element['signal'])]
def run(argv=None):
parser = argparse.ArgumentParser()
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=options)
messages = (p | beam.io.ReadFromPubSub(subscription=args.input_subscription).with_output_types(bytes))
lines = messages | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
json_messages = lines | "Jsonify" >> beam.Map(lambda x: json.loads(x))
preprocess_messages = json_messages | "Preprocess" >> beam.ParDo(Preprocess())
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()我希望管道的工作方式类似于在本地使用DirectRunner运行时的工作方式,但作用域/导入的工作方式不同,导致管道崩溃。
发布于 2019-10-03 06:46:15
当您从桌面启动Apache Beam程序时,该程序正在您的桌面上运行。您已经在本地安装了numpy库。但是,您尚未通知Dataflow下载并安装numpy。这就是为什么您的程序以DirectRunner身份运行,但以DataflowRunner身份运行失败。
编辑/创建一个普通的Python文件,并包含所有的依赖项,比如requirements.txt。我更喜欢使用virtualdev,导入所需的包,确保我的程序在DirectRunner下运行,然后运行pip freeze为requirements.txt创建我的包列表。现在,Dataflow将知道要导入哪些包,以便您的程序在Dataflow群集上运行。
https://stackoverflow.com/questions/58209797
复制相似问题