首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >NameError:未定义名称'pvalue‘

NameError:未定义名称'pvalue‘
EN

Stack Overflow用户
提问于 2019-12-02 12:01:07
回答 2查看 1.3K关注 0票数 2

在这里的文档(https://beam.apache.org/documentation/programming-guide/#additional-outputs) 4.5.2中,生成了一个pvalue.TaggedOutput()

pvalue似乎很难导入,我从Apache文档中复制了导入行,在启动管道之前,我使用了--save_main_session选项以及def run()中的save_main_session=True以及pipeline_options.view_as(SetupOptions).save_main_session = save_main_session。所有导入都适用于所有函数,所有类都在所有函数中工作。但不是pvalue。我也尝试了所有这些在每一个可能的组合,以及把他们排除在外。pvalue总是未知的。

我把我所有的代码从食谱上拿出来:pardo.py

然而,没有pvalue。

NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']

此错误仅在使用Dataflowrunner时生成,而不是在使用Directrunner时生成。

我的 DoFn示例

代码语言:javascript
复制
class Splitter(beam.DoFn):

    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

我的()示例

代码语言:javascript
复制
def run(argv=None, save_main_session=True):
    sources = [
        json.loads('{"id":72234,"value":1'),
        json.loads('{"id":23,"value":2}')
        ]

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:
           | beam.Create(sources)
           | beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)

**我的进口品**

代码语言:javascript
复制
from __future__ import absolute_import

import argparse
import logging
import re
import json
import datetime
from jsonschema import validate

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-12-03 09:18:04

不知何故,对Dataflowrunner的依赖关系搞砸了。通过加载一组错误的依赖项,然后再删除它们,事情突然开始工作了。毕竟,像from apache_beam import pvalue这样的进口似乎是正确的。

也许这里学到的教训是,可能存在损坏的依赖关系,您可以通过安装和卸载旧的或错误的apache_beam包触发强制重新安装来修复这些依赖关系。

票数 1
EN

Stack Overflow用户

发布于 2019-12-03 00:39:41

您应该尝试在类拆分器中导入pvalue,因为当使用Apache时,依赖项应该在类和函数中声明。

你的代码应该是这样

代码语言:javascript
复制
class Splitter(beam.DoFn):
    from apache_beam import pvalue
    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

由于代码在本地运行,所以您可以在Directrunner中正常使用from apache_beam import pvalue;但是,当使用Dataflowrunner时,代码应该遵循一个结构来正确地处理依赖关系

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59138545

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档