
数据 I/O 是数据分析 pipeline 的入口和出口。无论你是从日志文件中提取信息、从数据库导出报表,还是在不同系统之间传输数据,读取和写入都是绕不开的第一步。
Polars 作为 Rust 生态中最强大的 DataFrame 库,原生支持多种数据格式的读写:
格式 | 说明 | Feature |
|---|---|---|
CSV | 最通用的文本表格格式 | csv |
Parquet | 列式存储,大数据首选 | parquet |
JSON / NDJson | Web API 常用格式 | json |
IPC (Arrow) | 跨语言零拷贝交换 | ipc |
在 Polars 中,数据读写分为两种模式:
read_* 方法,立即将整个文件加载到内存中的 DataFramescan_* 方法,构建查询计划,在 collect() 时才真正读取数据这一课,我们将系统地学习每种格式的读写方法,掌握 Eager 与 Lazy 的核心区别,并学会处理实际开发中的各种 I/O 场景。
CSV(Comma-Separated Values)是数据交换的"世界语"。Polars 提供了两种 CSV 读取器:CsvReader(Eager)和 LazyCsvReader(Lazy)。
CsvReader 是最基础的 CSV 读取方式,调用 finish() 后立即返回 DataFrame。在 polars 0.53 中,推荐使用 CsvReadOptions 构建器模式来配置读取选项:
use polars::prelude::*;
fn read_csv_eager() -> PolarsResult<DataFrame> {
// 使用 CsvReadOptions 构建器配置读取参数
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some("data/data.csv".into()))?
.finish()?;
println!("{}", df);
Ok(df)
}也可以直接从 File 对象创建 CsvReader:
use std::fs::File;try_into_reader
fn read_csv_from_file() -> PolarsResult<DataFrame> {
let file = File::open("data.csv")?;
let df = CsvReader::new(file)
.finish()?;
Ok(df)
}LazyCsvReader 不会立即读取数据,而是构建一个 LazyFrame 查询计划。这是处理大文件时的推荐方式:
fn read_csv_lazy() -> PolarsResult<LazyFrame> {
let lf = LazyCsvReader::new("data.csv".into())
.with_has_header(true)
.with_separator(b',') // 分隔符,默认逗号
.with_try_parse_dates(true) // 尝试自动解析日期
.with_skip_rows(0) // 跳过前 N 行
.with_infer_schema_length(100) // 推断 schema 的行数
.finish()?; // 返回 LazyFrame,而非 DataFrame
Ok(lf)
}下面是 CSV 读取中最常用的配置项,用一个完整的例子来展示:
fn read_csv_with_options() -> PolarsResult<DataFrame> {
let options = CsvReadOptions::default()
.with_has_header(true)
// 指定分隔符(通过 parse_options 配置)
.map_parse_options(|opts| {
opts.with_separator(b'\t') // 制表符分隔的 TSV 文件
})
.with_n_rows(Some(1000)) // 只读取前 1000 行
.with_skip_rows(1) // 跳过第 1 行(常用于跳过注释行)
.with_infer_schema_length(Some(100)) // 用前 100 行推断类型
.with_low_memory(true) // 低内存模式(牺牲速度换内存)
.with_ignore_errors(true); // 遇到解析错误跳过,不中断
let df = options
.try_into_reader_with_file_path(Some("data.csv".into()))?
.finish()?;
Ok(df)
}配置项速查表:
配置 | 说明 | 默认值 |
|---|---|---|
with_has_header | 文件是否有表头行 | true |
with_separator | 列分隔符(通过 map_parse_options) | b',' |
with_try_parse_dates | 自动尝试解析日期列 | false |
with_skip_rows | 跳过前 N 行 | 0 |
with_skip_lines | 按换行符跳过前 N 行(不尊重 CSV 转义) | 0 |
with_n_rows | 限制读取行数 | None(全部) |
with_infer_schema_length | 推断 schema 的采样行数 | 100 |
with_low_memory | 低内存模式 | false |
with_ignore_errors | 忽略解析错误 | false |
Polars 的 CSV 读取器默认支持 UTF-8 编码。对于中文 CSV 文件,通常需要注意:
encoding_rs 等库转换fn read_chinese_csv() -> PolarsResult<DataFrame> {
// UTF-8 编码的中文 CSV 可以直接读取
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some("chinese_data.csv".into()))?
.finish()?;
Ok(df)
}小贴士:如果你的 CSV 文件是 GBK 编码(常见于 Windows 导出的中文数据),建议先用命令行工具或
iconv转换为 UTF-8,再交给 Polars 处理。
Parquet 是大数据领域的"明星格式"。它采用列式存储,自带压缩和 schema 信息,是 Polars 最推荐的长期存储格式。
polars的features需要开启parquet
为什么推荐 Parquet?因为它的设计天然契合列式计算:
fn read_parquet_eager() -> PolarsResult<DataFrame> {
let file = File::open("data.parquet")?;
let df = ParquetReader::new(file)
.set_low_memory(true) // 低内存模式
.read_parallel(ParallelStrategy::Auto) // 并行读取(默认)
.with_row_index(Some(RowIndex {
name: "idx".into(),
offset: 0,
})) // 添加行索引列
.finish()?;
Ok(df)
}对于 Parquet 文件,强烈推荐使用 Lazy API。scan_parquet 可以利用文件中的统计信息进行查询优化:
fn read_parquet_lazy() -> PolarsResult<DataFrame> {
// 创建 LazyFrame(此时不读取任何数据)
let lf = LazyFrame::scan_parquet(
"data.parquet",
ScanArgsParquet::default(),
)?;
// 构建查询链(谓词下推 + 投影下推)
let result = lf
.filter(col("age").gt(lit(25))) // 谓词下推:只读取 age > 25 的行
.select([col("name"), col("age")]) // 投影下推:只读取 name 和 age 列
.collect()?; // 此时才真正执行 I/O
println!("{}", result);
Ok(result)
}关键点:
scan_parquet配合filter和select,Polars 优化器会自动将过滤条件和列选择"下推"到文件读取层,只读取必要的数据块。这就是 Lazy API 的威力所在。
polars的features需要开启json
JSON 是 Web 开发中无处不在的数据格式。Polars 支持两种 JSON 格式:
JsonFormat::Json):整个文件是一个 JSON 数组 [{...}, {...}]JsonFormat::JsonLines):每行一个独立的 JSON 对象fn read_json() -> PolarsResult<DataFrame> {
let file = File::open("data.json")?;
let df = JsonReader::new(file)
// 默认读取 JSON 数组格式
.finish()?;
println!("{}", df);
Ok(df)
}NDJson(Newline-Delimited JSON)格式在大数据场景中非常常见,因为可以逐行流式处理:
fn read_ndjson() -> PolarsResult<DataFrame> {
let file = File::open("data.ndjson")?;
let df = JsonReader::new(file)
.with_json_format(JsonFormat::JsonLines) // 指定为 NDJson 格式
.with_batch_size(unsafe { NonZero::new_unchecked(10_000) }) // 批量大小
.finish()?;
Ok(df)
}JSON 同样支持 Lazy 读取:
fn read_json_lazy() -> PolarsResult<DataFrame> {
// NDJson 的 Lazy 扫描
let lf = LazyJsonLineReader::new("data.ndjson".into())
.finish()?;
let result = lf
.filter(col("status").eq(lit("active")))
.collect()?;
Ok(result)
}注意:JSON 格式没有 Parquet 那样的列式存储优势,读取性能相对较低。如果你的数据量较大,建议将 JSON 转换为 Parquet 后再处理。
这是 Polars 中最重要的概念之一,理解它能让你写出性能更好的代码。
特性 | Eager(read_*) | Lazy(scan_*) |
|---|---|---|
执行时机 | 立即执行 | 调用 collect() 时执行 |
返回类型 | DataFrame | LazyFrame |
查询优化 | 无 | 有(谓词下推、投影下推等) |
内存占用 | 全量加载 | 按需加载 |
适用场景 | 小文件、快速探索 | 大文件、生产 pipeline |
Lazy API 的核心优势在于查询优化器。当你构建一个查询链时,Polars 不会立即执行,而是先分析整个查询计划,进行优化:
谓词下推(Predicate Pushdown):将 filter 条件推到数据源层执行。
// Eager:先读取全部 1 亿行,再过滤 → 内存爆炸!
let df = CsvReader::new(File::open("huge.csv")?)?.finish()?;
let result = df.filter(&col("age").gt(lit(30)))?;
// Lazy:优化器将过滤条件下推到 CSV 读取层,只加载满足条件的行
let lf = LazyCsvReader::new("huge.csv".into())
.with_has_header(true)
.finish()?
.filter(col("age").gt(lit(30))) // 这个条件会被下推
.collect()?;投影下推(Projection Pushdown):只读取查询中用到的列。
// Lazy:只读取 name 和 age 两列,其他列直接跳过
let lf = LazyFrame::scan_parquet(
"wide_table.parquet",
ScanArgsParquet::default(),
)?
.select([col("name"), col("age")]) // 投影下推
.collect()?;假设你有一个 10GB 的 CSV 文件,但只需要其中 city = "Beijing" 的数据(约 1GB):
fn compare_eager_vs_lazy() -> PolarsResult<()> {
// === Eager 方式 ===
let start = std::time::Instant::now();
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some("large.csv".into()))?
.finish()?;
let eager_result = df
.lazy()
.filter(col("value").gt(lit(100)))
.select([col("id"), col("value")])
.collect()?;
println!("Eager 耗时: {:?}", start.elapsed());
// === Lazy 方式 ===
let start = std::time::Instant::now();
let lazy_result = LazyCsvReader::new("large.csv".into())
.with_has_header(true)
.finish()?
.filter(col("value").gt(lit(100)))
.select([col("id"), col("value")])
.collect()?;
println!("Lazy 耗时: {:?}", start.elapsed());
Ok(())
}经验法则:文件超过内存的 50% 时,务必使用 Lazy API。日常开发中,养成优先使用 Lazy API 的习惯,只有需要立即查看数据时才用 Eager。
数据处理的最终目的往往是将结果保存下来。Polars 提供了丰富的写入方法。
fn write_csv_example(df: &mut DataFrame) -> PolarsResult<()> {
// 基础写入
let mut file = File::create("output.csv")?;
CsvWriter::new(&mut file)
.with_separator(b',') // 分隔符
.include_header(true) // 包含表头
.finish( df)?;
Ok(())
}fn write_parquet_example(df: &DataFrame) -> PolarsResult<()> {
let mut file = File::create("output.parquet")?;
ParquetWriter::new(&mut file)
.with_compression(ParquetCompression::Snappy) // 压缩算法
.finish(df)?;
Ok(())
}常用压缩算法对比:
算法 | 压缩率 | 速度 | 适用场景 |
|---|---|---|---|
Snappy | 中等 | 快 | 默认推荐,读写均衡 |
Gzip | 高 | 慢 | 存储空间敏感 |
Lz4 | 中等 | 最快 | 速度优先 |
Zstd | 高 | 中等 | 综合最优 |
None | 无 | 最快 | 临时文件 |
fn write_json_example(df: &mut DataFrame) -> PolarsResult<()> {
// 写入 JSON 数组格式
let file = File::create("output.json")?;
JsonWriter::new(file).finish(df)?;
// 写入 NDJson 格式(每行一个 JSON 对象)
// use polars_io::prelude::*;
let mut file = File::create("data.ndjson")?;
JsonWriter::new(&mut file)
.with_json_format(JsonFormat::JsonLines) // ←←← 这行是关键!生成 NDJSON
.finish(df)?;
println!("✅ NDJSON 文件已成功写入:data.ndjson");
Ok(())
}有时候 Polars 的自动类型推断不够准确,比如它把一个整数列推断为了字符串。这时你需要手动覆盖 schema:
use polars::prelude::*;
use std::sync::Arc;
fn read_with_schema_override() -> PolarsResult<DataFrame> {
// 方法 1:使用 with_schema_overwrite 按列名覆盖类型
let schema = Schema::from_iter([
Field::new("id".into(), DataType::Int64),
Field::new("name".into(), DataType::String),
Field::new("age".into(), DataType::Int32),
Field::new("salary".into(), DataType::Float64),
]);
let df = CsvReadOptions::default()
.with_has_header(true)
.with_schema_overwrite(Some(Arc::new(schema)))
.try_into_reader_with_file_path(Some("employees.csv".into()))?
.finish()?;
println!("{}", df);
Ok(df)
}如果你不知道列名,但知道列的顺序和类型,可以使用 with_dtype_overwrite:
fn read_with_dtype_overwrite() -> PolarsResult<DataFrame> {
// 按列顺序指定类型(不需要列名)
let dtypes = vec![
DataType::Int64, // 第 1 列
DataType::String, // 第 2 列
DataType::Float64, // 第 3 列
];
let df = CsvReadOptions::default()
.with_has_header(true)
.with_dtype_overwrite(Some(Arc::new(dtypes)))
.try_into_reader_with_file_path(Some("data.csv".into()))?
.finish()?;
Ok(df)
}LazyCsvReader 提供了 with_schema_modify 方法,可以在推断 schema 后进行修改:
fn lazy_schema_modify() -> PolarsResult<DataFrame> {
let df = LazyCsvReader::new("data.csv".into())
.with_has_header(true)
.with_schema_modify(|schema| {
// 修改特定列的类型
let mut schema = schema;
schema.set_dtype("price".into(), DataType::Float64);
Ok(schema)
})?
.finish()?
.collect()?;
Ok(df)
}Polars 支持通过 CloudOptions 直接读取云存储上的文件,包括:
// 概念示例(需要额外的 feature 支持)
fn read_from_cloud() -> PolarsResult<LazyFrame> {
let lf = LazyCsvReader::new("s3://my-bucket/data.csv".into())
.with_has_header(true)
.with_cloud_options(Some(CloudOptions::default()))
.finish()?;
Ok(lf)
}注意:云存储功能需要启用额外的 feature(如
cloud),并配置相应的认证信息。具体配置方式请参考 Polars 官方文档。
在 Rust 中,Polars 的所有 I/O 操作都返回 Result<T, PolarsError>。正确处理这些错误是写出健壮代码的关键。
PolarsError 是 Polars 定义的错误类型,涵盖了各种异常情况:
use polars::prelude::PolarsError;
fn handle_error() {
match read_csv_eager() {
Ok(df) => println!("成功读取 {} 行", df.height()),
Err(PolarsError::IoError(e)) => {
eprintln!("IO 错误: {}", e);
}
Err(PolarsError::SchemaMismatch(msg)) => {
eprintln!("Schema 不匹配: {}", msg);
}
Err(PolarsError::ComputeError(msg)) => {
eprintln!("计算错误: {}", msg);
}
Err(e) => {
eprintln!("其他错误: {}", e);
}
}
}在大多数情况下,使用 ? 操作符是最简洁的错误处理方式:
fn read_and_process() -> PolarsResult<DataFrame> {
// ? 操作符会自动将 PolarsError 向上传播
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some("data.csv".into()))?
.finish()?;
let result = df
.lazy()
.filter(col("value").gt(lit(0)))
.collect()?; // 同样使用 ? 传播错误
Ok(result)
}让我们把前面学到的知识串联起来,完成一个完整的数据处理 pipeline:读取 CSV → 转换为 Parquet → 读回 LazyFrame 并过滤。
读取一个包含中文列名和特殊字符的 CSV 文件,正确设置 schema,过滤出满足条件的数据,并保存为 Parquet。
示例 CSV 内容(sales.csv):
订单编号,商品名称,单价,数量,日期
ORD001,笔记本电脑,5999.99,2,2024-01-15
ORD002,机械键盘,399.00,5,2024-01-16
ORD003,无线鼠标,129.50,10,2024-01-17
ORD004,4K显示器,2899.00,1,2024-01-18
ORD005,USB-C扩展坞,259.00,8,2024-01-19要求:
订单编号 为 String,单价 为 Float64,数量 为 Int64,日期 为 String)单价 > 500 的商品总价 = 单价 * 数量filtered_sales.parquet提示:
Schema::from_iter 创建 schemawith_schema_overwrite 覆盖自动推断的类型col("单价") * col("数量") 计算总价alias("总价") 为新列命名这一课,我们系统地学习了 Polars 的数据 I/O 能力:
CsvReadOptions 构建器 + LazyCsvReader,支持丰富的配置选项scan_parquet 配合 Lazy API 性能最佳write_* 方法 + sink_* 流式写入PolarsError + ? 操作符核心原则:优先使用 Lazy API,数据长期存储用 Parquet。