首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >WriteToText在DirectRunner中工作,但在DataflowRunner中与TypeError一起失败

WriteToText在DirectRunner中工作,但在DataflowRunner中与TypeError一起失败
EN

Stack Overflow用户
提问于 2017-02-12 15:38:44
回答 1查看 1.3K关注 0票数 4

我可以使用DirectRunner运行这段代码,它运行得很好。对于DataflowRunner,它通过以下方式崩溃:

TypeError: process()在运行‘Write_text/Write/WriteImpl/WriteBundles’时只接受4个参数(3个给定)

我的apache是按照说明中的指示从主人克隆和构建的。它构建为apache-beam-sdk==0.6.0.dev0。不过,我对这个版本表示怀疑,因为(我想)最近我看到了没有版本更改的代码更改(NewDoFn消失了,但版本没有改变)。

我不确定这是否是问题的根源,但似乎安装的sdk和数据流容器不匹配。我得到另一个不匹配类型错误,其中DirectRunner直接将element传递给我的DoFn.process(),而DataflowRunner则传递context

我试图将其与最简单的代码隔离开来:

代码语言:javascript
复制
import uuid
import apache_beam.utils.pipeline_options
import apache_beam as beam

runner = 'DataflowRunner'
# runner = 'DirectRunner'

options = beam.utils.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'a' + str(uuid.uuid4())
gcloud_options.project = 'your-project'
gcloud_options.staging_location = 'gs://your-bucket/beam/staging'
gcloud_options.temp_location = 'gs://your-bucket/beam/temp'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = runner

p = beam.Pipeline(options=options)
(p
 | 'some_strings' >> beam.Create(tuple('asdfqwert'))
 | 'write_text' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
 )
p.run().wait_until_finish()

全部产出:

代码语言:javascript
复制
No handlers could be found for logger "oauth2client.contrib.multistore_file"
/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:136: UserWarning: Using fallback coder for typehint: Any.
  warnings.warn('Using fallback coder for typehint: %r.' % typehint)
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
Collecting google-cloud-dataflow==0.5.1
  Using cached google-cloud-dataflow-0.5.1.tar.gz
  Saved /var/folders/v3/61xx4nnn6p36n5m9fp4qdwtr0000gn/T/tmpuCWoeh/google-cloud-dataflow-0.5.1.tar.gz
Successfully downloaded google-cloud-dataflow
Traceback (most recent call last):
  File "reproduce_bug.py", line 28, in <module>
    p.run().wait_until_finish()
  File "/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/runners/dataflow_runner.py", line 706, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
(70278eb56b40fd94): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 514, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 899, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:26452)
    op.start()
  File "dataflow_worker/executor.py", line 191, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7575)
    def start(self):
  File "dataflow_worker/executor.py", line 196, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7480)
    with self.spec.source.reader() as reader:
  File "dataflow_worker/executor.py", line 206, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7425)
    self.output(windowed_value)
  File "dataflow_worker/executor.py", line 136, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:5749)
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "dataflow_worker/executor.py", line 83, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:3884)
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/executor.py", line 505, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:15525)
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 163, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:4862)
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 270, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7749)
    self.reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 281, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:8108)
    raise type(exn), args, sys.exc_info()[2]
  File "apache_beam/runners/common.py", line 268, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7660)
    self.old_dofn_process(element)
  File "apache_beam/runners/common.py", line 173, in apache_beam.runners.common.DoFnRunner.old_dofn_process (apache_beam/runners/common.c:5182)
    self._process_outputs(element, self.dofn_process(self.context))
  File "apache_beam/runners/common.py", line 152, in apache_beam.runners.common.DoFnRunner.__init__.lambda3 (apache_beam/runners/common.c:3640)
    self.dofn_process = lambda context: fn.process(context, *args)
TypeError: process() takes exactly 4 arguments (3 given) [while running 'write_text/Write/WriteImpl/WriteBundles']
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-02-13 23:08:20

您的环境似乎安装了0.5.1版本(请参阅堆栈跟踪的顶部),但是您正在使用python的头进行构建。

您可以创建一个具有正确版本的SDK的新的virtualenv环境。

  • 如果要针对Python运行,则需要在运行管道时设置sdk_location标志。
  • 如果要针对发布的版本运行,请使用pip install google-cloud-dataflow安装SDK,并正常运行管道。(最好在虚拟环境中使用virtualenv)

请注意,最好使用已发布的版本。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42189550

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档