首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >IOError:没有基于文件模式的文件

IOError:没有基于文件模式的文件
EN

Stack Overflow用户
提问于 2017-03-27 20:31:27
回答 1查看 1.9K关注 0票数 1

我正在尝试运行Python中的示例。但是,这个堆栈跟踪的错误如下所示。注意:第一个管道确实创建了“./name”文件,但是第二个管道似乎无法从中读取。

代码语言:javascript
复制
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Traceback (most recent call last):
  File "example.py", line 17, in <module>
    | 'save' >> beam.io.WriteToText(greetings_file))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/textio.py", line 391, in __init__
    skip_header_lines=skip_header_lines)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/textio.py", line 88, in __init__
    validate=validate)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", line 97, in __init__
    self._validate()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", line 173, in _validate
    'No files found based on the file pattern %s' % self._pattern)
IOError: No files found based on the file pattern ./names

示例代码如下:

代码语言:javascript
复制
import apache_beam as beam
def add_greeting(name, messages):
    for msg in messages:
        yield '%s %s' % (msg, name)

names_file = './names'
greetings_file = './greetings'

p = beam.Pipeline('DirectRunner')
(p | 'add names' >> beam.Create(['Ann', 'Joe'])
   | 'save' >> beam.io.WriteToText(names_file))
p.run()

(p
 | 'load names' >> beam.io.ReadFromText(names_file)
 | 'add greetings' >> beam.FlatMap(add_greetings, ['Hello', 'Hola'])
 | 'save' >> beam.io.WriteToText(greetings_file))
p.run()

环境:我在google云shell上运行这个

代码语言:javascript
复制
$ pip list --local --format=columns | grep dataflow
google-cloud-dataflow              0.6.0 
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-28 20:27:57

当管道运行时,Beam中的运行程序不会等待它完成,因此您应该在调用wait_until_finish()之后添加对p.run()的调用。

此外,Beam管道有延迟执行,因此当您为管道定义新步骤时,它们将被添加到每次运行管道时完全执行的图形中。简而言之,这意味着如果您想要运行不同步骤的管道,就需要创建一个新的Pipeline对象。

这应该是可行的:

代码语言:javascript
复制
p = beam.Pipeline('DirectRunner')
(p | 'add names' >> beam.Create(['Ann', 'Joe'])
   | 'save' >> beam.io.WriteToText('./names'))
p.run().wait_until_finish()

p = beam.Pipeline('DirectRunner')
(p
 | 'load names' >> beam.io.ReadFromText('./names*')
 | 'add greetings' >> beam.FlatMap(add_greeting, ['Hello', 'Hola'])
 | 'save' >> beam.io.WriteToText(greetings_file))
p.run().wait_until_finish()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43055922

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档