TypeError: 'PCollection' object does not support indexing以上错误是由于试图将Pcollection转换为list而导致的:
filesList = (files | beam.combiners.ToList())
lines = (p | 'read' >> beam.Create(ReadSHP().ReadSHP(filesList))
| 'map' >> beam.Map(_to_dictionary))和:
def ReadSHP(self, filesList):
"""
"""
sf = shp.Reader(shp=filesList[1], dbf=filesList[2]) 如何解决这个问题?任何帮助都是非常感谢的。
发布于 2018-12-05 17:30:50
通常,不能将PCollection转换为列表。
PCollection是一组具有潜在无界且无序的项的集合。Beam允许您将转换应用于PCollection。将PTransform应用于PCollection会产生另一个PCollection。转换的应用过程可能分布在一组机器上。因此,在一般情况下,不可能将这样的东西转换为本地内存中的元素集合。
组合子是PTransforms的一个特殊类。他们所做的就是把他们看到的所有元素积累起来,将一些组合逻辑应用到元素上,然后输出合并的结果。例如,组合器可以查看传入的元素,对它们进行求和,然后输出和作为结果。这种组合器将元素的PCollection转换为这些元素和的PCollection。
beam.combiners.ToList只是应用于PCollection的另一个转换,它可能在一组工人机器上应用,并产生另一个PCollection。但是在生成输出元素之前,它并没有进行任何复杂的组合,它只将所有已见元素累加到一个列表中,然后输出所见元素的列表。因此,它接受键值对的元素(在多台机器上),将它们放入列表中,并输出这些列表。
缺少的是从潜在的多台机器获取这些列表并在需要时将它们加载到本地程序的逻辑。这个问题很难(如果有的话)通过通用方式(在所有运行程序、所有可能的IOs和管道结构之间)解决。
解决方法之一是在管道中添加另一个步骤,将合并的输出(例如和或列表)写入公共存储,例如某个数据库中的表或文件。然后,当管道完成时,您的程序可以从该位置加载管道执行的结果。
详情请参阅文件:
发布于 2018-12-18 08:28:14
另一种选择是使用GCE并使用像GeoJSON这样的工具将shapefiles转换为ogr2ogr。然后可以将GeoJSON加载到BigQuery中,并使用BigQuery地理信息系统进行查询。
这是一个包含更多细节的博客
https://stackoverflow.com/questions/53499640
复制相似问题