我有一个像gs://bucket/some_dir/{partition_value}/filename这样的谷歌桶中的CSV文件列表。我想从这样的URI列表中创建一个pyarrow.Dataset (这是some_dir中的一个文件子集)。
我如何做到这一点,并提取partition_value作为一个列?
到目前为止,我已经:
import gcsfs
import pyarrow as pa
import pyarrow.csv
import pyarrow.dataset as ds
from pyarrow.fs import FSSpecHandler, PyFileSystem
fs = gcsfs.GCSFileSystem()
schema = pa.schema([("gene_id", pa.string()), ("raw_count", pa.float32()), ("scaled_estimate", pa.float32())])
# these data are publicly accessible, btw
uris = [
"gs://gdc-tcga-phs000178-open/0b8b258e-1671-4f86-82e7-59b12ad40d9c/unc.edu.4c243ea9-dfe1-42f0-a887-3c901fb38542.2477720.rsem.genes.results",
"gs://gdc-tcga-phs000178-open/c8ee8367-c529-4dd6-98b4-fde57991134b/unc.edu.a64ae1f5-a189-4173-be13-903bd7637869.2476757.rsem.genes.results",
"gs://gdc-tcga-phs000178-open/78354f8d-5ce8-4617-bba4-79614f232e97/unc.edu.ac19f7cf-670b-4dcc-a26b-db0f56377231.2509607.rsem.genes.results",
]
dataset = ds.FileSystemDataset.from_paths(
uris,
schema,
format=ds.CsvFileFormat(parse_options=pa.csv.ParseOptions(delimiter="\t")),
filesystem=PyFileSystem(FSSpecHandler(fs)),
# partitions=["bucket", "file_gcs_id"],
# root_partition="gdc-tcga-phs000178-open",
)
dataset.to_table()这给了我一个很好的表,其中包含了我的模式中的字段。
但是,我希望partition_key是我的数据集中的另一个字段。我猜我需要:
将此作为字段添加到我的架构中,并在调用FileSystemDataset.from_paths时添加
我尝试使用root_partition,但发现一个错误,我提供的字符串不是pyarrow.Expression (不知道这是什么)。另外,我尝试指定partitions,但我得到了ValueError: The number of files resulting from paths_or_selector must be equal to the number of partitions.
发布于 2022-04-11 23:41:41
在数据集发现期间,使用文件名信息(以及指定的分区)生成附加到片段的“保证”。例如,当我们看到文件foo/x=7/bar.parquet并且使用“单元分区”时,我们可以附加保证x == 7。这些保证被存储为“表达式”,因为我们目前不需要讨论的各种原因。
两种解决方案闪现在脑海中。首先,您可以自己创建保证并将它们附加到您的路径(这是partitions参数在from_paths方法中表示的内容)。表达式应该是ds.field("column_name") == value。
其次,可以允许数据集发现过程正常运行。这将生成您需要的所有片段(有些您不需要),并且已经附加了保证。然后,您可以将片段列表缩减到所需的片段列表,并从中创建一个数据集。
(我猜我需要)将它作为字段添加到我的模式中
是。在上述两种方法中,您都希望确保将分区列添加到架构中。
下面是一个代码示例,展示了这两种方法:
import shutil
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.fs as fs
shutil.rmtree('my_dataset', ignore_errors=True)
table = pa.Table.from_pydict({
'x': [1, 2, 3, 4, 5, 6],
'part': ['a', 'a', 'a', 'b', 'b', 'b']
})
ds.write_dataset(table, 'my_dataset', partitioning=['part'], format='parquet')
print('# Created by dataset factory')
partitioning = ds.partitioning(schema=pa.schema([pa.field('part', pa.string())]))
dataset = ds.dataset('my_dataset',partitioning=partitioning)
print(dataset.to_table())
print()
desired_paths = [
'my_dataset/a/part-0.parquet'
]
# Note that table.schema used below includes the partitioning
# column so we've added that to the schema.
print('# Created from paths')
filesystem = fs.LocalFileSystem()
dataset_from_paths = ds.FileSystemDataset.from_paths(
desired_paths,
table.schema,
format=ds.ParquetFileFormat(),
filesystem=filesystem)
print(dataset_from_paths.to_table())
print()
print('# Created from paths with explicit partition information')
dataset_from_paths = ds.FileSystemDataset.from_paths(
desired_paths,
table.schema,
partitions=[
ds.field('part') == "a"
],
format=ds.ParquetFileFormat(),
filesystem=filesystem)
print(dataset_from_paths.to_table())
print()
print('# Created from discovery then trimmed')
trimmed_fragments = [frag for frag in dataset.get_fragments() if frag.path in desired_paths]
trimmed_dataset = ds.FileSystemDataset(trimmed_fragments, dataset.schema, dataset.format, filesystem=dataset.filesystem)
print(trimmed_dataset.to_table())https://stackoverflow.com/questions/71834568
复制相似问题