我有一个python进程,它使用Pyarrow=6.0.0将数据写入parquet文件。我在一个PyArrow数据集中加入了:
import pyarrow.dataset as ds
root_directory = "bucket_name_in_gcp"
fs = gcsfs.GCSFileSystem(project=project)
pa_fs = PyFileSystem(FSSpecHandler(fs))
self.partitions = ds.partitioning(pyarrow.schema([("item_id", pyarrow.string()), ("group", pyarrow.string())]), flavor="hive")
dataset = ds.dataset(source=root_directory, filesystem=fs, partitioning=partitions, format="parquet")在后面的代码中,我使用
item_id_condition = ds.field("item_id") == "xxx"
group_condition = ds.field("group") == "group_1"
filters = item_id_condition & group_condition
results = dataset.to_table(filter=filters).to_pandas()从存储中读取数据,我得到一个空的数据,这很好。
稍后我使用:
file_path = f'{root_directory}/item_id=xxx/group=group_1'
with pyarrow.BufferOutputStream() as output_buffer:
parquet.write_table(table, output_buffer)
parquet_bytes = output_buffer.getvalue().to_pybytes()
with pa_fs.open_output_stream(str(file_path)) as stream:
stream.write(parquet_bytes)要将数据表写入存储区,此时我可以查看文件及其内容。
但是,如果我再次尝试使用read函数(dataset.to_table),我仍然会得到一个空数据。为什么PyArrow数据集不能识别新文件?如果我要重新创建ds.dataset对象,它将识别所有现有数据。
我有遗漏什么吗?有办法refresh数据集吗?还是每次我都要写进去?
发布于 2022-03-15 05:19:34
我想你需要跑
dataset = ds.dataset(source=root_directory, filesystem=fs, partitioning=partitions, format="parquet")一次又一次,它应该识别新的文件。如果我正确理解到gcsfs filesystem的连接必须以这种方式“刷新”(如您所说的重新创建ds.dataset )
https://stackoverflow.com/questions/71473877
复制相似问题