首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当使用`FileSystemDataset.from_paths`构造吡箭数据集时,如何指定分区?

当使用`FileSystemDataset.from_paths`构造吡箭数据集时,如何指定分区?
EN

Stack Overflow用户
提问于 2022-04-11 21:36:26
回答 1查看 400关注 0票数 1

我有一个像gs://bucket/some_dir/{partition_value}/filename这样的谷歌桶中的CSV文件列表。我想从这样的URI列表中创建一个pyarrow.Dataset (这是some_dir中的一个文件子集)。

我如何做到这一点,并提取partition_value作为一个列?

到目前为止,我已经:

代码语言:javascript
复制
import gcsfs
import pyarrow as pa
import pyarrow.csv
import pyarrow.dataset as ds
from pyarrow.fs import FSSpecHandler, PyFileSystem

fs = gcsfs.GCSFileSystem()
schema = pa.schema([("gene_id", pa.string()), ("raw_count", pa.float32()), ("scaled_estimate", pa.float32())])

# these data are publicly accessible, btw
uris = [
    "gs://gdc-tcga-phs000178-open/0b8b258e-1671-4f86-82e7-59b12ad40d9c/unc.edu.4c243ea9-dfe1-42f0-a887-3c901fb38542.2477720.rsem.genes.results",
    "gs://gdc-tcga-phs000178-open/c8ee8367-c529-4dd6-98b4-fde57991134b/unc.edu.a64ae1f5-a189-4173-be13-903bd7637869.2476757.rsem.genes.results",
    "gs://gdc-tcga-phs000178-open/78354f8d-5ce8-4617-bba4-79614f232e97/unc.edu.ac19f7cf-670b-4dcc-a26b-db0f56377231.2509607.rsem.genes.results",
]

dataset = ds.FileSystemDataset.from_paths(
    uris,
    schema,
    format=ds.CsvFileFormat(parse_options=pa.csv.ParseOptions(delimiter="\t")),
    filesystem=PyFileSystem(FSSpecHandler(fs)),
    # partitions=["bucket", "file_gcs_id"],
    # root_partition="gdc-tcga-phs000178-open",
)

dataset.to_table()

这给了我一个很好的表,其中包含了我的模式中的字段。

但是,我希望partition_key是我的数据集中的另一个字段。我猜我需要:

将此作为字段添加到我的架构中,并在调用FileSystemDataset.from_paths时添加

  • 以添加某些内容

我尝试使用root_partition,但发现一个错误,我提供的字符串不是pyarrow.Expression (不知道这是什么)。另外,我尝试指定partitions,但我得到了ValueError: The number of files resulting from paths_or_selector must be equal to the number of partitions.

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-04-11 23:41:41

在数据集发现期间,使用文件名信息(以及指定的分区)生成附加到片段的“保证”。例如,当我们看到文件foo/x=7/bar.parquet并且使用“单元分区”时,我们可以附加保证x == 7。这些保证被存储为“表达式”,因为我们目前不需要讨论的各种原因。

两种解决方案闪现在脑海中。首先,您可以自己创建保证并将它们附加到您的路径(这是partitions参数在from_paths方法中表示的内容)。表达式应该是ds.field("column_name") == value

其次,可以允许数据集发现过程正常运行。这将生成您需要的所有片段(有些您不需要),并且已经附加了保证。然后,您可以将片段列表缩减到所需的片段列表,并从中创建一个数据集。

(我猜我需要)将它作为字段添加到我的模式中

是。在上述两种方法中,您都希望确保将分区列添加到架构中。

下面是一个代码示例,展示了这两种方法:

代码语言:javascript
复制
import shutil

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.fs as fs

shutil.rmtree('my_dataset', ignore_errors=True)

table = pa.Table.from_pydict({
    'x': [1, 2, 3, 4, 5, 6],
    'part': ['a', 'a', 'a', 'b', 'b', 'b']
    })

ds.write_dataset(table, 'my_dataset', partitioning=['part'], format='parquet')

print('# Created by dataset factory')
partitioning = ds.partitioning(schema=pa.schema([pa.field('part', pa.string())]))
dataset = ds.dataset('my_dataset',partitioning=partitioning)
print(dataset.to_table())
print()

desired_paths = [
    'my_dataset/a/part-0.parquet'
]

# Note that table.schema used below includes the partitioning
# column so we've added that to the schema.
print('# Created from paths')
filesystem = fs.LocalFileSystem()
dataset_from_paths = ds.FileSystemDataset.from_paths(
    desired_paths,
    table.schema,
    format=ds.ParquetFileFormat(),
    filesystem=filesystem)
print(dataset_from_paths.to_table())
print()

print('# Created from paths with explicit partition information')
dataset_from_paths = ds.FileSystemDataset.from_paths(
    desired_paths,
    table.schema,
    partitions=[
        ds.field('part') == "a"
    ],
    format=ds.ParquetFileFormat(),
    filesystem=filesystem)
print(dataset_from_paths.to_table())
print()

print('# Created from discovery then trimmed')
trimmed_fragments = [frag for frag in dataset.get_fragments() if frag.path in desired_paths]
trimmed_dataset = ds.FileSystemDataset(trimmed_fragments, dataset.schema, dataset.format, filesystem=dataset.filesystem)
print(trimmed_dataset.to_table())
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71834568

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档