首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Polars Rust 第 4 课:数据 I/O(读取与写入)

Polars Rust 第 4 课:数据 I/O(读取与写入)

作者头像
不吃草的牛德
发布2026-04-23 13:05:02
发布2026-04-23 13:05:02
1060
举报
文章被收录于专栏:RustRust

一、开篇引入

数据 I/O 是数据分析 pipeline 的入口和出口。无论你是从日志文件中提取信息、从数据库导出报表,还是在不同系统之间传输数据,读取和写入都是绕不开的第一步。

Polars 作为 Rust 生态中最强大的 DataFrame 库,原生支持多种数据格式的读写:

格式

说明

Feature

CSV

最通用的文本表格格式

csv

Parquet

列式存储,大数据首选

parquet

JSON / NDJson

Web API 常用格式

json

IPC (Arrow)

跨语言零拷贝交换

ipc

在 Polars 中,数据读写分为两种模式:

  • Eager(立即执行):调用 read_* 方法,立即将整个文件加载到内存中的 DataFrame
  • Lazy(延迟执行):调用 scan_* 方法,构建查询计划,在 collect() 时才真正读取数据

这一课,我们将系统地学习每种格式的读写方法,掌握 Eager 与 Lazy 的核心区别,并学会处理实际开发中的各种 I/O 场景。


二、CSV 读取

CSV(Comma-Separated Values)是数据交换的"世界语"。Polars 提供了两种 CSV 读取器:CsvReader(Eager)和 LazyCsvReader(Lazy)。

2.1 CsvReader(Eager 读取)

CsvReader 是最基础的 CSV 读取方式,调用 finish() 后立即返回 DataFrame。在 polars 0.53 中,推荐使用 CsvReadOptions 构建器模式来配置读取选项:

代码语言:javascript
复制
use polars::prelude::*;

fn read_csv_eager() -> PolarsResult<DataFrame> {
    // 使用 CsvReadOptions 构建器配置读取参数
    let df = CsvReadOptions::default()
        .with_has_header(true)
        .try_into_reader_with_file_path(Some("data/data.csv".into()))?
        .finish()?;

    println!("{}", df);

    Ok(df)
}

也可以直接从 File 对象创建 CsvReader

代码语言:javascript
复制
use std::fs::File;try_into_reader

fn read_csv_from_file() -> PolarsResult<DataFrame> {
    let file = File::open("data.csv")?;
    let df = CsvReader::new(file)
        .finish()?;
    Ok(df)
}

2.2 LazyCsvReader(Lazy 扫描)

LazyCsvReader 不会立即读取数据,而是构建一个 LazyFrame 查询计划。这是处理大文件时的推荐方式:

代码语言:javascript
复制
fn read_csv_lazy() -> PolarsResult<LazyFrame> {
    let lf = LazyCsvReader::new("data.csv".into())
        .with_has_header(true)
        .with_separator(b',')          // 分隔符,默认逗号
        .with_try_parse_dates(true)    // 尝试自动解析日期
        .with_skip_rows(0)             // 跳过前 N 行
        .with_infer_schema_length(100) // 推断 schema 的行数
        .finish()?;                    // 返回 LazyFrame,而非 DataFrame

    Ok(lf)
}

2.3 常用配置详解

下面是 CSV 读取中最常用的配置项,用一个完整的例子来展示:

代码语言:javascript
复制
fn read_csv_with_options() -> PolarsResult<DataFrame> {
    let options = CsvReadOptions::default()
        .with_has_header(true)
        // 指定分隔符(通过 parse_options 配置)
        .map_parse_options(|opts| {
            opts.with_separator(b'\t')  // 制表符分隔的 TSV 文件
        })
        .with_n_rows(Some(1000))        // 只读取前 1000 行
        .with_skip_rows(1)              // 跳过第 1 行(常用于跳过注释行)
        .with_infer_schema_length(Some(100))  // 用前 100 行推断类型
        .with_low_memory(true)          // 低内存模式(牺牲速度换内存)
        .with_ignore_errors(true);      // 遇到解析错误跳过,不中断

    let df = options
        .try_into_reader_with_file_path(Some("data.csv".into()))?
        .finish()?;

    Ok(df)
}

配置项速查表

配置

说明

默认值

with_has_header

文件是否有表头行

true

with_separator

列分隔符(通过 map_parse_options)

b','

with_try_parse_dates

自动尝试解析日期列

false

with_skip_rows

跳过前 N 行

0

with_skip_lines

按换行符跳过前 N 行(不尊重 CSV 转义)

0

with_n_rows

限制读取行数

None(全部)

with_infer_schema_length

推断 schema 的采样行数

100

with_low_memory

低内存模式

false

with_ignore_errors

忽略解析错误

false

2.4 处理中文/特殊字符编码

Polars 的 CSV 读取器默认支持 UTF-8 编码。对于中文 CSV 文件,通常需要注意:

  • • 确保文件是 UTF-8 编码(现代系统默认)
  • • 如果文件包含 BOM(Byte Order Mark),Polars 会自动处理
  • • 对于 GBK/GB2312 等编码,需要先用 encoding_rs 等库转换
代码语言:javascript
复制
fn read_chinese_csv() -> PolarsResult<DataFrame> {
    // UTF-8 编码的中文 CSV 可以直接读取
    let df = CsvReadOptions::default()
        .with_has_header(true)
        .try_into_reader_with_file_path(Some("chinese_data.csv".into()))?
        .finish()?;

    Ok(df)
}

小贴士:如果你的 CSV 文件是 GBK 编码(常见于 Windows 导出的中文数据),建议先用命令行工具或 iconv 转换为 UTF-8,再交给 Polars 处理。


三、Parquet 读取

Parquet 是大数据领域的"明星格式"。它采用列式存储,自带压缩和 schema 信息,是 Polars 最推荐的长期存储格式。

polars的features需要开启parquet

3.1 Parquet 的列式存储优势

为什么推荐 Parquet?因为它的设计天然契合列式计算:

  • 只读需要的列:10 列的表,只查 2 列?Parquet 只读那 2 列的数据,速度提升 5 倍
  • 自带压缩:默认使用 Snappy 压缩,文件体积小,读取快
  • 内置 Schema:不需要推断类型,读取即知列名和数据类型
  • 支持谓词下推:配合 Lazy API,可以在读取时就跳过不满足条件的数据块

3.2 ParquetReader(Eager)

代码语言:javascript
复制
fn read_parquet_eager() -> PolarsResult<DataFrame> {
    let file = File::open("data.parquet")?;
    let df = ParquetReader::new(file)
        .set_low_memory(true)          // 低内存模式
        .read_parallel(ParallelStrategy::Auto)  // 并行读取(默认)
        .with_row_index(Some(RowIndex {
            name: "idx".into(),
            offset: 0,
        }))                            // 添加行索引列
        .finish()?;

    Ok(df)
}

3.3 LazyFrame::scan_parquet(Lazy 扫描,推荐)

对于 Parquet 文件,强烈推荐使用 Lazy APIscan_parquet 可以利用文件中的统计信息进行查询优化:

代码语言:javascript
复制
fn read_parquet_lazy() -> PolarsResult<DataFrame> {
    // 创建 LazyFrame(此时不读取任何数据)
    let lf = LazyFrame::scan_parquet(
        "data.parquet",
        ScanArgsParquet::default(),
    )?;

    // 构建查询链(谓词下推 + 投影下推)
    let result = lf
        .filter(col("age").gt(lit(25)))      // 谓词下推:只读取 age > 25 的行
        .select([col("name"), col("age")])   // 投影下推:只读取 name 和 age 列
        .collect()?;                          // 此时才真正执行 I/O

    println!("{}", result);
    Ok(result)
}

关键点scan_parquet 配合 filterselect,Polars 优化器会自动将过滤条件和列选择"下推"到文件读取层,只读取必要的数据块。这就是 Lazy API 的威力所在。


四、JSON / NDJson 读取

polars的features需要开启json

JSON 是 Web 开发中无处不在的数据格式。Polars 支持两种 JSON 格式:

  • JSON 数组JsonFormat::Json):整个文件是一个 JSON 数组 [{...}, {...}]
  • NDJson / JSON LinesJsonFormat::JsonLines):每行一个独立的 JSON 对象

4.1 JsonReader(JSON 数组)

代码语言:javascript
复制
fn read_json() -> PolarsResult<DataFrame> {
    let file = File::open("data.json")?;
    let df = JsonReader::new(file)
        // 默认读取 JSON 数组格式
        .finish()?;

    println!("{}", df);
    Ok(df)
}

4.2 NdJsonReader(每行一个 JSON 对象)

NDJson(Newline-Delimited JSON)格式在大数据场景中非常常见,因为可以逐行流式处理:

代码语言:javascript
复制
fn read_ndjson() -> PolarsResult<DataFrame> {
    let file = File::open("data.ndjson")?;
    let df = JsonReader::new(file)
        .with_json_format(JsonFormat::JsonLines)  // 指定为 NDJson 格式
        .with_batch_size(unsafe { NonZero::new_unchecked(10_000) })  // 批量大小
        .finish()?;

    Ok(df)
}

4.3 Lazy 版本

JSON 同样支持 Lazy 读取:

代码语言:javascript
复制
fn read_json_lazy() -> PolarsResult<DataFrame> {
    // NDJson 的 Lazy 扫描
    let lf = LazyJsonLineReader::new("data.ndjson".into())
            .finish()?;

    let result = lf
        .filter(col("status").eq(lit("active")))
        .collect()?;

    Ok(result)
}

注意:JSON 格式没有 Parquet 那样的列式存储优势,读取性能相对较低。如果你的数据量较大,建议将 JSON 转换为 Parquet 后再处理。


五、scan_* vs read_*:Eager vs Lazy

这是 Polars 中最重要的概念之一,理解它能让你写出性能更好的代码。

5.1 核心区别

特性

Eager(read_*)

Lazy(scan_*)

执行时机

立即执行

调用 collect() 时执行

返回类型

DataFrame

LazyFrame

查询优化

有(谓词下推、投影下推等)

内存占用

全量加载

按需加载

适用场景

小文件、快速探索

大文件、生产 pipeline

5.2 Lazy 的查询优化

Lazy API 的核心优势在于查询优化器。当你构建一个查询链时,Polars 不会立即执行,而是先分析整个查询计划,进行优化:

谓词下推(Predicate Pushdown):将 filter 条件推到数据源层执行。

代码语言:javascript
复制
// Eager:先读取全部 1 亿行,再过滤 → 内存爆炸!
let df = CsvReader::new(File::open("huge.csv")?)?.finish()?;
let result = df.filter(&col("age").gt(lit(30)))?;

// Lazy:优化器将过滤条件下推到 CSV 读取层,只加载满足条件的行
let lf = LazyCsvReader::new("huge.csv".into())
    .with_has_header(true)
    .finish()?
    .filter(col("age").gt(lit(30)))  // 这个条件会被下推
    .collect()?;

投影下推(Projection Pushdown):只读取查询中用到的列。

代码语言:javascript
复制
// Lazy:只读取 name 和 age 两列,其他列直接跳过
let lf = LazyFrame::scan_parquet(
    "wide_table.parquet",
    ScanArgsParquet::default(),
)?
.select([col("name"), col("age")])  // 投影下推
.collect()?;

5.3 大文件场景为什么必须用 Lazy

假设你有一个 10GB 的 CSV 文件,但只需要其中 city = "Beijing" 的数据(约 1GB):

  • Eager 方式:读取全部 10GB 到内存,再过滤。内存不够直接 OOM
  • Lazy 方式:优化器将过滤条件下推,在读取时就跳过不满足条件的行,内存峰值可能只有几百 MB

5.4 性能对比示例

代码语言:javascript
复制
fn compare_eager_vs_lazy() -> PolarsResult<()> {
    // === Eager 方式 ===
    let start = std::time::Instant::now();
    let df = CsvReadOptions::default()
        .with_has_header(true)
        .try_into_reader_with_file_path(Some("large.csv".into()))?
        .finish()?;
    let eager_result = df
        .lazy()
        .filter(col("value").gt(lit(100)))
        .select([col("id"), col("value")])
        .collect()?;
    println!("Eager 耗时: {:?}", start.elapsed());

    // === Lazy 方式 ===
    let start = std::time::Instant::now();
    let lazy_result = LazyCsvReader::new("large.csv".into())
        .with_has_header(true)
        .finish()?
        .filter(col("value").gt(lit(100)))
        .select([col("id"), col("value")])
        .collect()?;
    println!("Lazy 耗时: {:?}", start.elapsed());

    Ok(())
}

经验法则:文件超过内存的 50% 时,务必使用 Lazy API。日常开发中,养成优先使用 Lazy API 的习惯,只有需要立即查看数据时才用 Eager。


六、数据写入

数据处理的最终目的往往是将结果保存下来。Polars 提供了丰富的写入方法。

6.1 write_csv()

代码语言:javascript
复制

fn write_csv_example(df: &mut DataFrame) -> PolarsResult<()> {
    // 基础写入
    let mut file = File::create("output.csv")?;
    CsvWriter::new(&mut file)
        .with_separator(b',')           // 分隔符
        .include_header(true)           // 包含表头
        .finish( df)?;


    Ok(())
}

6.2 write_parquet()

代码语言:javascript
复制
fn write_parquet_example(df: &DataFrame) -> PolarsResult<()> {
    let mut file = File::create("output.parquet")?;
    ParquetWriter::new(&mut file)
        .with_compression(ParquetCompression::Snappy)  // 压缩算法
        .finish(df)?;
    Ok(())
}

常用压缩算法对比

算法

压缩率

速度

适用场景

Snappy

中等

默认推荐,读写均衡

Gzip

存储空间敏感

Lz4

中等

最快

速度优先

Zstd

中等

综合最优

None

最快

临时文件

6.3 write_json() / write_ndjson()

代码语言:javascript
复制
fn write_json_example(df: &mut DataFrame) -> PolarsResult<()> {
    // 写入 JSON 数组格式
    let  file = File::create("output.json")?;
    JsonWriter::new(file).finish(df)?;

    // 写入 NDJson 格式(每行一个 JSON 对象)
    // use polars_io::prelude::*;
    let mut file = File::create("data.ndjson")?;

    JsonWriter::new(&mut file)
        .with_json_format(JsonFormat::JsonLines)   // ←←← 这行是关键!生成 NDJSON
        .finish(df)?;

    println!("✅ NDJSON 文件已成功写入:data.ndjson");

    Ok(())
}

七、选项配置详解

7.1 Schema Override:手动指定列类型

有时候 Polars 的自动类型推断不够准确,比如它把一个整数列推断为了字符串。这时你需要手动覆盖 schema:

代码语言:javascript
复制
use polars::prelude::*;
use std::sync::Arc;

fn read_with_schema_override() -> PolarsResult<DataFrame> {
    // 方法 1:使用 with_schema_overwrite 按列名覆盖类型
    let schema = Schema::from_iter([
        Field::new("id".into(), DataType::Int64),
        Field::new("name".into(), DataType::String),
        Field::new("age".into(), DataType::Int32),
        Field::new("salary".into(), DataType::Float64),
    ]);

    let df = CsvReadOptions::default()
        .with_has_header(true)
        .with_schema_overwrite(Some(Arc::new(schema)))
        .try_into_reader_with_file_path(Some("employees.csv".into()))?
        .finish()?;

    println!("{}", df);
    Ok(df)
}

7.2 dtype 指定

如果你不知道列名,但知道列的顺序和类型,可以使用 with_dtype_overwrite

代码语言:javascript
复制
fn read_with_dtype_overwrite() -> PolarsResult<DataFrame> {
    // 按列顺序指定类型(不需要列名)
    let dtypes = vec![
        DataType::Int64,     // 第 1 列
        DataType::String,    // 第 2 列
        DataType::Float64,   // 第 3 列
    ];

    let df = CsvReadOptions::default()
        .with_has_header(true)
        .with_dtype_overwrite(Some(Arc::new(dtypes)))
        .try_into_reader_with_file_path(Some("data.csv".into()))?
        .finish()?;

    Ok(df)
}

7.3 Lazy 版本的 Schema 修改

LazyCsvReader 提供了 with_schema_modify 方法,可以在推断 schema 后进行修改:

代码语言:javascript
复制
fn lazy_schema_modify() -> PolarsResult<DataFrame> {
    let df = LazyCsvReader::new("data.csv".into())
        .with_has_header(true)
        .with_schema_modify(|schema| {
            // 修改特定列的类型
            let mut schema = schema;
            schema.set_dtype("price".into(), DataType::Float64);
            Ok(schema)
        })?
        .finish()?
        .collect()?;

    Ok(df)
}

7.4 云存储(概念介绍)

Polars 支持通过 CloudOptions 直接读取云存储上的文件,包括:

  • AWS S3
  • Google Cloud Storage (GCS)
  • Azure Blob Storage
代码语言:javascript
复制
// 概念示例(需要额外的 feature 支持)
fn read_from_cloud() -> PolarsResult<LazyFrame> {
    let lf = LazyCsvReader::new("s3://my-bucket/data.csv".into())
        .with_has_header(true)
        .with_cloud_options(Some(CloudOptions::default()))
        .finish()?;

    Ok(lf)
}

注意:云存储功能需要启用额外的 feature(如 cloud),并配置相应的认证信息。具体配置方式请参考 Polars 官方文档。


八、错误处理

在 Rust 中,Polars 的所有 I/O 操作都返回 Result<T, PolarsError>。正确处理这些错误是写出健壮代码的关键。

8.1 PolarsError 类型

PolarsError 是 Polars 定义的错误类型,涵盖了各种异常情况:

代码语言:javascript
复制
use polars::prelude::PolarsError;

fn handle_error() {
    match read_csv_eager() {
        Ok(df) => println!("成功读取 {} 行", df.height()),
        Err(PolarsError::IoError(e)) => {
            eprintln!("IO 错误: {}", e);
        }
        Err(PolarsError::SchemaMismatch(msg)) => {
            eprintln!("Schema 不匹配: {}", msg);
        }
        Err(PolarsError::ComputeError(msg)) => {
            eprintln!("计算错误: {}", msg);
        }
        Err(e) => {
            eprintln!("其他错误: {}", e);
        }
    }
}

8.2 使用 ? 操作符

在大多数情况下,使用 ? 操作符是最简洁的错误处理方式:

代码语言:javascript
复制
fn read_and_process() -> PolarsResult<DataFrame> {
    // ? 操作符会自动将 PolarsError 向上传播
    let df = CsvReadOptions::default()
        .with_has_header(true)
        .try_into_reader_with_file_path(Some("data.csv".into()))?
        .finish()?;

    let result = df
        .lazy()
        .filter(col("value").gt(lit(0)))
        .collect()?;  // 同样使用 ? 传播错误

    Ok(result)
}

九、实战练习

让我们把前面学到的知识串联起来,完成一个完整的数据处理 pipeline:读取 CSV → 转换为 Parquet → 读回 LazyFrame 并过滤


十、课后作业

题目

读取一个包含中文列名和特殊字符的 CSV 文件,正确设置 schema,过滤出满足条件的数据,并保存为 Parquet。

示例 CSV 内容sales.csv):

代码语言:javascript
复制
订单编号,商品名称,单价,数量,日期
ORD001,笔记本电脑,5999.99,2,2024-01-15
ORD002,机械键盘,399.00,5,2024-01-16
ORD003,无线鼠标,129.50,10,2024-01-17
ORD004,4K显示器,2899.00,1,2024-01-18
ORD005,USB-C扩展坞,259.00,8,2024-01-19

要求

  1. 1. 正确设置 schema(订单编号 为 String,单价 为 Float64,数量 为 Int64,日期 为 String)
  2. 2. 过滤出 单价 > 500 的商品
  3. 3. 计算每条订单的 总价 = 单价 * 数量
  4. 4. 保存为 filtered_sales.parquet

提示

  • • 使用 Schema::from_iter 创建 schema
  • • 使用 with_schema_overwrite 覆盖自动推断的类型
  • • 使用 col("单价") * col("数量") 计算总价
  • • 使用 alias("总价") 为新列命名

十一、总结与下节预告

这一课,我们系统地学习了 Polars 的数据 I/O 能力:

  • CSV 读取CsvReadOptions 构建器 + LazyCsvReader,支持丰富的配置选项
  • Parquet 读取:列式存储的王者,scan_parquet 配合 Lazy API 性能最佳
  • JSON 读取:支持 JSON 数组和 NDJson 两种格式
  • Eager vs Lazy:Lazy API 的查询优化(谓词下推、投影下推)是处理大数据的关键
  • 数据写入write_* 方法 + sink_* 流式写入
  • 错误处理PolarsError + ? 操作符

核心原则:优先使用 Lazy API,数据长期存储用 Parquet。

下一课,我们将深入学习 数据清洗与转换。敬请期待!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-04-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Rust火箭工坊 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、开篇引入
  • 二、CSV 读取
    • 2.1 CsvReader(Eager 读取)
    • 2.2 LazyCsvReader(Lazy 扫描)
    • 2.3 常用配置详解
    • 2.4 处理中文/特殊字符编码
  • 三、Parquet 读取
    • 3.1 Parquet 的列式存储优势
    • 3.2 ParquetReader(Eager)
    • 3.3 LazyFrame::scan_parquet(Lazy 扫描,推荐)
  • 四、JSON / NDJson 读取
    • 4.1 JsonReader(JSON 数组)
    • 4.2 NdJsonReader(每行一个 JSON 对象)
    • 4.3 Lazy 版本
  • 五、scan_* vs read_*:Eager vs Lazy
    • 5.1 核心区别
    • 5.2 Lazy 的查询优化
    • 5.3 大文件场景为什么必须用 Lazy
    • 5.4 性能对比示例
  • 六、数据写入
    • 6.1 write_csv()
    • 6.2 write_parquet()
    • 6.3 write_json() / write_ndjson()
  • 七、选项配置详解
    • 7.1 Schema Override:手动指定列类型
    • 7.2 dtype 指定
    • 7.3 Lazy 版本的 Schema 修改
    • 7.4 云存储(概念介绍)
  • 八、错误处理
    • 8.1 PolarsError 类型
    • 8.2 使用 ? 操作符
  • 九、实战练习
  • 十、课后作业
    • 题目
  • 十一、总结与下节预告
  • 下一课,我们将深入学习 数据清洗与转换。敬请期待!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档