目前,我在Python中有一个apache-beam管道,在该管道中,我正在读取拼花,将其转换为dataframe来进行一些熊猫的清理,然后将其转换回我想要编写文件的parquet。看起来是这样的:
with beam.Pipeline(options=pipeline_options) as p:
dataframes = p \
| 'Read' >> beam.io.ReadFromParquetBatched(known_args.input) \
| 'Convert to pandas' >> beam.Map(lambda table: table.to_pandas()) \
| 'Process df' >> beam.ParDo(ProcessDataFrame()) \
| 'Convert to parquet' >> beam.Map(lambda table: table.to_parquet()) \
| 'Write to parquet' >> beam.io.WriteToParquet(known_args.output)但是,这会像预期的那样返回一个错误,因为我丢失了作为WriteToParquet参数的模式。
Traceback (most recent call last):
File "/Users/kgallatin/dataflow/example.py", line 75, in <module>
main()
File "/Users/kgallatin/dataflow/example.py", line 70, in main
| 'Write to parquet' >> beam.io.WriteToParquet(known_args.output)
TypeError: __init__() missing 1 required positional argument: 'schema'我有100到1000 s的列,这些列在这个管道的生命周期中可能会发生变化,所以我希望避免手动编写所有这些列,就像描述的这里那样。当我打印前一步的拼图时,我可以看到一个熊猫模式和一些二进制的pyarrow之类的东西--在这一步中,有没有一种方法可以从地板上提取模式呢?
发布于 2022-10-25 02:30:05
您的代码table.to_parquet()以Parquet格式返回所有记录的序列化字节。
目前,WriteToParquet()需要一个手动给定的固定模式。因此,如果要自动生成模式,则必须实现自己的管道(PTransform)。
如果不需要并行性(如切分),可以使用低级API执行如下操作。
...
import pyarrow as pa
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.filesystem import CompressionTypes
...
def write_dataframe_to_parquet(frame):
table = pa.Table.from_pandas(frame)
with FileSystems.create(known_args.output,
mime_type='application/x-parquet',
compression_type=CompressionTypes.UNCOMPRESSED,
) as f:
pa.parquet.write_table(table, f)
...
dataframes = p \
...
| 'Process df' >> beam.ParDo(ProcessDataFrame()) \
| 'Write to parquet' >> beam.Map(write_dataframe_to_parquet)https://stackoverflow.com/questions/74186833
复制相似问题