在这里的文档(https://beam.apache.org/documentation/programming-guide/#additional-outputs) 4.5.2中,生成了一个pvalue.TaggedOutput()。
pvalue似乎很难导入,我从Apache文档中复制了导入行,在启动管道之前,我使用了--save_main_session选项以及def run()中的save_main_session=True以及pipeline_options.view_as(SetupOptions).save_main_session = save_main_session。所有导入都适用于所有函数,所有类都在所有函数中工作。但不是pvalue。我也尝试了所有这些在每一个可能的组合,以及把他们排除在外。pvalue总是未知的。
我把我所有的代码从食谱上拿出来:pardo.py
然而,没有pvalue。
NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']
此错误仅在使用Dataflowrunner时生成,而不是在使用Directrunner时生成。
我的 DoFn示例
class Splitter(beam.DoFn):
TAG1 = 'kleintje'
TAG2 = 'grootje'
def process(self, element):
splittertid = element.get('id')
if splittertid < 100:
yield pvalue.TaggedOutput(self.TAG1, element)
else:
yield pvalue.TaggedOutput(self.TAG2, element)我的()示例
def run(argv=None, save_main_session=True):
sources = [
json.loads('{"id":72234,"value":1'),
json.loads('{"id":23,"value":2}')
]
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
| beam.Create(sources)
| beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)**我的进口品**
from __future__ import absolute_import
import argparse
import logging
import re
import json
import datetime
from jsonschema import validate
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json发布于 2019-12-03 09:18:04
不知何故,对Dataflowrunner的依赖关系搞砸了。通过加载一组错误的依赖项,然后再删除它们,事情突然开始工作了。毕竟,像from apache_beam import pvalue这样的进口似乎是正确的。
也许这里学到的教训是,可能存在损坏的依赖关系,您可以通过安装和卸载旧的或错误的apache_beam包触发强制重新安装来修复这些依赖关系。
发布于 2019-12-03 00:39:41
您应该尝试在类拆分器中导入pvalue,因为当使用Apache时,依赖项应该在类和函数中声明。
你的代码应该是这样
class Splitter(beam.DoFn):
from apache_beam import pvalue
TAG1 = 'kleintje'
TAG2 = 'grootje'
def process(self, element):
splittertid = element.get('id')
if splittertid < 100:
yield pvalue.TaggedOutput(self.TAG1, element)
else:
yield pvalue.TaggedOutput(self.TAG2, element)由于代码在本地运行,所以您可以在Directrunner中正常使用from apache_beam import pvalue;但是,当使用Dataflowrunner时,代码应该遵循一个结构来正确地处理依赖关系。
https://stackoverflow.com/questions/59138545
复制相似问题