
开篇引入
恭喜你走到了 Polars Rust 系列的第 8 课!🎉
如果说 Polars 的表达式系统(Expr)是它的骨骼,那么 Lazy API 就是 Polars 的灵魂。在前面的课程中,我们已经多次接触过 LazyFrame 和 .collect(),但你可能还没有真正领略到它的全部威力。
Lazy API 不仅仅是一种"延迟执行"的编程模式,它背后是一整套查询优化引擎——包括谓词下推(Predicate Pushdown)、投影下推(Projection Pushdown)、公共子表达式消除(CSE)等。这些优化能让你的数据处理 pipeline 在某些场景下快出数倍甚至数十倍。
今天,我们将深入 Lazy API 的进阶用法,掌握高级表达式技巧,学习性能优化的核心策略,最后通过一个完整的小项目把所有知识串联起来。准备好了吗?让我们开始吧!💪
本课的完整依赖如下:
# Cargo.toml
[dependencies]
polars = { version = "0.53.0", features = [
"lazy", "csv", "parquet", "json",
"strings", "temporal", "diagonal_concat",
"rank", "cum_agg", "pivot", "polars-ops",
"regex", "dtype-date", "dtype-datetime",
"rolling_window", "abs", "pct_change",
"is_in", "log"
] }在 Polars 中,数据处理有两条路线:
特性 | DataFrame(Eager) | LazyFrame(Lazy) |
|---|---|---|
执行方式 | 立即执行,每步都产生中间结果 | 延迟执行,构建查询计划 |
查询优化 | 无优化,按代码顺序执行 | 自动优化(谓词下推、投影下推等) |
内存占用 | 较高(中间结果驻留内存) | 较低(优化器可减少不必要计算) |
调试难度 | 简单直观,所见即所得 | 需要查看执行计划来理解行为 |
适用场景 | 小数据、交互式探索、调试 | 大数据、复杂 pipeline、生产环境 |
简单来说:DataFrame 像是"边走边看地图",LazyFrame 像是"先规划好最优路线再出发" 🗺️。
用 Eager 的场景:
用 Lazy 的场景:
.collect() 触发执行LazyFrame 的所有操作都是"声明式"的——你告诉 Polars 你想做什么,而不是 怎么做。真正的计算在调用 .collect() 时才会触发:
use polars::prelude::*;
fn lazy_collect_demo() -> PolarsResult<DataFrame> {
let df = df![
"name" => &["Alice", "Bob", "Charlie", "Diana", "Eve"],
"age" => &[25, 30, 35, 28, 22],
"score" => &[88.5, 92.0, 78.5, 95.0, 85.0],
]?;
// 构建 Lazy 查询计划 —— 此时不会执行任何计算
let lazy_result = df.lazy()
.filter(col("age").gt(lit(25))) // 筛选年龄 > 25
.select([
col("name"),
(col("score") * lit(1.1)).alias("adjusted_score"), // 加权分数
])
.sort_by_exprs([col("adjusted_score")], SortMultipleOptions::default());
// 调用 .collect() 才真正执行计算
let result = lazy_result.collect()?;
println!("{}", result);
Ok(result)
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
lazy_collect_demo()?;
Ok(())
}
输出:
shape: (3, 2)
┌─────────┬────────────────┐
│ name ┆ adjusted_score │
│ --- ┆ --- │
│ str ┆ f64 │
╞═════════╪════════════════╡
│ Charlie ┆ 86.35 │
│ Bob ┆ 101.2 │
│ Diana ┆ 104.5 │
└─────────┴────────────────┘.explain() 查看查询计划.explain() 是调试 Lazy 查询的利器,它会打印出查询计划(未经优化的):
fn explain_demo() -> PolarsResult<()> {
let df = df![
"name" => &["Alice", "Bob", "Charlie"],
"age" => &[25, 30, 35],
"score" => &[88.5, 92.0, 78.5],
]?;
let plan = df.lazy()
.filter(col("age").gt(lit(28)))
.select([col("name"), col("score")])
.explain(false)?; // false = 显示未优化的计划
println!("=== 未优化的查询计划 ===\n{}", plan);
Ok(())
}.explain(true) 查看优化后的计划传入 true 可以看到优化器做了什么:
fn explain_optimized_demo() -> PolarsResult<()> {
let df = df![
"name" => &["Alice", "Bob", "Charlie", "Diana"],
"age" => &[25, 30, 35, 28],
"score" => &[88.5, 92.0, 78.5, 95.0],
"city" => &["Beijing", "Shanghai", "Guangzhou", "Shenzhen"],
]?;
// 构建一个包含冗余操作的查询
let plan = df.lazy()
.filter(col("age").gt(lit(25)))
.select([col("name"), col("score"), col("age")])
.filter(col("score").gt(lit(80.0)))
.explain(true)?; // true = 显示优化后的计划
println!("=== 优化后的查询计划 ===\n{}", plan);
Ok(())
}优化器会自动将两个 filter 合并,并且只读取需要的列(投影下推)。你会看到优化后的计划更加精简高效。
下面用同一个需求,分别用 Eager 和 Lazy 实现,直观感受差异:
use std::time::Instant;
fn eager_vs_lazy_comparison() -> PolarsResult<()> {
// 构造一个较大的数据集(10 万行)
let n: usize = 100_000;
let names: Vec<String> = (0..n).map(|i| format!("user_{}", i)).collect();
let ages: Vec<i32> = (0..n).map(|i| ((i % 60) as i32) + 18).collect();
let scores: Vec<f64> = (0..n).map(|i| (i as f64 * 0.01) % 100.0).collect();
let cities: Vec<String> = (0..n)
.map(|i| ["Beijing", "Shanghai", "Guangzhou", "Shenzhen"][i % 4].to_string())
.collect();
let df = df![
"name" => &names,
"age" => &ages,
"score" => &scores,
"city" => &cities,
]?;
// ===== Eager 方式 =====
let start = Instant::now();
let age_mask = df.column("age")?.i32()?.gt(30);
let filtered_df = df
.clone()
.filter(&age_mask)?
.select(["name", "age", "score", "city"])?;
let score_mask = filtered_df.column("score")?.f64()?.gt(50.0);
let mut eager_result = filtered_df
.filter(&score_mask)?
.group_by(["city"])?
.select(["score", "age"])
.mean()?;
eager_result.rename("score_mean", "avg_score".into())?;
eager_result.rename("age_mean", "avg_age".into())?;
let eager_time = start.elapsed();
println!("Eager 耗时: {:?}", eager_time);
println!("Eager 结果:\n{}", eager_result);
// ===== Lazy 方式 =====
let start = Instant::now();
let lazy_result = df
.lazy()
.filter(col("age").gt(30))
.filter(col("score").gt(50.0))
.group_by([col("city")])
.agg([
col("score").mean().alias("avg_score"),
col("age").mean().alias("avg_age"),
])
.collect()?;
let lazy_time = start.elapsed();
println!("Lazy 耗时: {:?}", lazy_time);
println!("Lazy 结果:\n{}", lazy_result);
Ok(())
}注意:对于小数据集,Eager 和 Lazy 的性能差异可能不明显。Lazy 的优势在大数据集和复杂 pipeline 中才会充分体现——优化器可以跳过不必要的计算、减少内存分配。
Polars 的表达式系统极其强大,这一节我们学习几个进阶表达式,它们是构建复杂数据处理逻辑的基石。
when/then/otherwise:条件表达式类似 SQL 的 CASE WHEN,用于实现条件逻辑:
/// 演示 when/then/otherwise 条件表达式
/// 根据分数划分等级:优秀/良好/及格/不及格
fn conditional_expr_demo() -> PolarsResult<DataFrame> {
let df = df![
"name" => &["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"],
"score" => &[95, 82, 67, 45, 73, 58],
]?;
let result = df.lazy()
.with_column(
when(col("score").gt_eq(lit(90)))
.then(lit("优秀"))
.when(col("score").gt_eq(lit(80)))
.then(lit("良好"))
.when(col("score").gt_eq(lit(60)))
.then(lit("及格"))
.otherwise(lit("不及格"))
.alias("grade"),
)
.sort(["score"], SortMultipleOptions::default().with_order_descending(true))
.collect()?;
println!("{}", result);
Ok(result)
}输出:
shape: (6, 3)
┌─────────┬───────┬────────┐
│ name ┆ score ┆ grade │
│ --- ┆ --- ┆ --- │
│ str ┆ i32 ┆ str │
╞═════════╪═══════╪════════╡
│ Alice ┆ 95 ┆ 优秀 │
│ Bob ┆ 82 ┆ 良好 │
│ Eve ┆ 73 ┆ 及格 │
│ Charlie ┆ 67 ┆ 及格 │
│ Frank ┆ 58 ┆ 不及格 │
│ Diana ┆ 45 ┆ 不及格 │
└─────────┴───────┴────────┘小贴士:
when/then/otherwise是链式调用的,可以叠加多个条件分支。最后的.otherwise()是必须的,它相当于else分支。
over() 进阶:窗口函数深入over() 可以在不改变数据行数的情况下,按分组计算聚合值——这就是 SQL 中的窗口函数:
/// 演示 over() 窗口函数:计算每个部门内的薪资排名和占比
fn window_function_demo() -> PolarsResult<DataFrame> {
let df = df![
"name" => &["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace"],
"dept" => &["Engineering", "Engineering", "Engineering", "Sales", "Sales", "Sales", "HR"],
"salary" => &[25000, 22000, 28000, 18000, 20000, 16000, 21000],
]?;
let result = df.lazy()
.with_columns([
// 每个部门内的薪资排名(降序)
col("salary")
.rank(RankOptions {
method: RankMethod::Average,
..Default::default()
}, None)
.over([col("dept")])
.alias("dept_rank"),
// 每个部门的平均薪资
col("salary")
.mean()
.over([col("dept")])
.alias("dept_avg_salary"),
])
.with_column(
// 薪资与部门平均的差值
(col("salary") - col("dept_avg_salary")).alias("diff_from_avg"),
)
.sort(["dept", "salary"], SortMultipleOptions::default().with_order_descending(true))
.collect()?;
println!("{}", result);
Ok(result)
}输出:
===== Running demos =====
shape: (7, 6)
┌─────────┬─────────────┬────────┬───────────┬─────────────────┬───────────────┐
│ name ┆ dept ┆ salary ┆ dept_rank ┆ dept_avg_salary ┆ diff_from_avg │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ i32 ┆ f64 ┆ f64 ┆ f64 │
╞═════════╪═════════════╪════════╪═══════════╪═════════════════╪═══════════════╡
│ Eve ┆ Sales ┆ 20000 ┆ 3.0 ┆ 18000.0 ┆ 2000.0 │
│ Diana ┆ Sales ┆ 18000 ┆ 2.0 ┆ 18000.0 ┆ 0.0 │
│ Frank ┆ Sales ┆ 16000 ┆ 1.0 ┆ 18000.0 ┆ -2000.0 │
│ Grace ┆ HR ┆ 21000 ┆ 1.0 ┆ 21000.0 ┆ 0.0 │
│ Charlie ┆ Engineering ┆ 28000 ┆ 3.0 ┆ 25000.0 ┆ 3000.0 │
│ Alice ┆ Engineering ┆ 25000 ┆ 2.0 ┆ 25000.0 ┆ 0.0 │
│ Bob ┆ Engineering ┆ 22000 ┆ 1.0 ┆ 25000.0 ┆ -3000.0 │
└─────────┴─────────────┴────────┴───────────┴─────────────────┴───────────────┘核心要点:
over()不会像group_by()那样折叠行数,而是为每一行附加一个聚合计算结果。这在计算排名、占比、移动平均等场景中极其有用。
shift() / shift_and_fill():行偏移行偏移是时间序列分析的利器,可以获取"上一行"或"下一行"的值:
/// 演示 shift 和 shift_and_fill:计算日环比增长率
fn shift_demo() -> PolarsResult<DataFrame> {
let df = df![
"date" => &["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05"],
"revenue"=> &[1000, 1200, 1150, 1400, 1350],
]?;
let result = df.lazy()
// shift(1) 将数据向下移动 1 行,即获取"前一天"的值
.with_column(
col("revenue").shift(lit(1i64)).alias("prev_revenue"),
)
// shift_and_fill(1, lit(0)) 移动并用 0 填充空值
.with_column(
col("revenue").shift_and_fill(1, lit(0)).alias("prev_filled"),
)
// 计算日环比增长率(百分比)
.with_column(
((col("revenue").cast(DataType::Float64)
- col("prev_filled").cast(DataType::Float64))
/ col("prev_filled").cast(DataType::Float64)
* lit(100.0))
.alias("growth_pct"),
)
.collect()?;
println!("{}", result);
Ok(result)
}输出:
shape: (5, 5)
┌────────────┬─────────┬──────────────┬─────────────┬────────────┐
│ date ┆ revenue ┆ prev_revenue ┆ prev_filled ┆ growth_pct │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i32 ┆ i32 ┆ i32 ┆ f64 │
╞════════════╪═════════╪══════════════╪═════════════╪════════════╡
│ 2024-01-01 ┆ 1000 ┆ null ┆ 0 ┆ inf │
│ 2024-01-02 ┆ 1200 ┆ 1000 ┆ 1000 ┆ 20.0 │
│ 2024-01-03 ┆ 1150 ┆ 1200 ┆ 1200 ┆ -4.166667 │
│ 2024-01-04 ┆ 1400 ┆ 1150 ┆ 1150 ┆ 21.73913 │
│ 2024-01-05 ┆ 1350 ┆ 1400 ┆ 1400 ┆ -3.571429 │
└────────────┴─────────┴──────────────┴─────────────┴────────────┘cum_sum() / cum_min() / cum_max() / cum_prod()累计运算从第一行开始逐步累加/累乘/取极值,常用于计算"截至当前行的累计值":
/// 演示累计运算:计算累计销售额和累计最大值
fn cumulative_demo() -> PolarsResult<DataFrame> {
let df = df![
"month" => &["Jan", "Feb", "Mar", "Apr", "May", "Jun"],
"sales" => &[100, 150, 120, 180, 200, 160],
]?;
let result = df.lazy()
.with_columns([
col("sales").cum_sum(false).alias("cum_sales"), // 累计求和
col("sales").cum_max(false).alias("cum_max"), // 累计最大值
col("sales").cum_min(false).alias("cum_min"), // 累计最小值
])
.collect()?;
println!("{}", result);
Ok(result)
}输出:
shape: (6, 5)
┌───────┬───────┬───────────┬─────────┬─────────┐
│ month ┆ sales ┆ cum_sales ┆ cum_max ┆ cum_min │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i32 ┆ i32 ┆ i32 ┆ i32 │
╞═══════╪═══════╪═══════════╪═════════╪═════════╡
│ Jan ┆ 100 ┆ 100 ┆ 100 ┆ 100 │
│ Feb ┆ 150 ┆ 250 ┆ 150 ┆ 100 │
│ Mar ┆ 120 ┆ 370 ┆ 150 ┆ 100 │
│ Apr ┆ 180 ┆ 550 ┆ 180 ┆ 100 │
│ May ┆ 200 ┆ 750 ┆ 200 ┆ 100 │
│ Jun ┆ 160 ┆ 910 ┆ 200 ┆ 100 │
└───────┴───────┴───────────┴─────────┴─────────┘rolling_*():滚动窗口运算(概念介绍)滚动窗口运算是时间序列分析的核心工具,它在一个固定大小的"窗口"内进行聚合计算。Polars 提供了 rolling_sum()、rolling_mean()、rolling_min()、rolling_max()、rolling_std() 等方法。
注意:滚动窗口运算需要启用
temporalfeature,且通常需要数据按时间排序。
/// 演示滚动窗口运算:计算 3 日移动平均
fn rolling_demo() -> PolarsResult<DataFrame> {
let df = df![
"day" => &[1, 2, 3, 4, 5, 6, 7],
"value" => &[10.0, 20.0, 15.0, 25.0, 30.0, 20.0, 35.0],
]?;
let result = df.lazy()
.with_column(
// 窗口大小为 3,计算滚动均值
col("value")
.rolling_mean(RollingOptionsFixedWindow {
window_size: 3,
..Default::default()
})
.alias("rolling_avg_3"),
)
.with_column(
// 窗口大小为 3,计算滚动求和
col("value")
.rolling_sum(RollingOptionsFixedWindow {
window_size: 3,
..Default::default()
})
.alias("rolling_sum_3"),
)
.collect()?;
println!("{}", result);
Ok(result)
}输出:
shape: (7, 4)
┌─────┬───────┬───────────────┬───────────────┐
│ day ┆ value ┆ rolling_avg_3 ┆ rolling_sum_3 │
│ --- ┆ --- ┆ --- ┆ --- │
│ i32 ┆ f64 ┆ f64 ┆ f64 │
╞═════╪═══════╪═══════════════╪═══════════════╡
│ 1 ┆ 10.0 ┆ 10.0 ┆ 10.0 │
│ 2 ┆ 20.0 ┆ 15.0 ┆ 30.0 │
│ 3 ┆ 15.0 ┆ 15.0 ┆ 45.0 │
│ 4 ┆ 25.0 ┆ 20.0 ┆ 60.0 │
│ 5 ┆ 30.0 ┆ 23.333333 ┆ 70.0 │
│ 6 ┆ 20.0 ┆ 25.0 ┆ 75.0 │
│ 7 ┆ 35.0 ┆ 28.333333 ┆ 85.0 │
└─────┴───────┴───────────────┴───────────────┘arg_sort(), arg_max(), arg_min():索引操作这些方法返回的是索引位置而非值本身,在需要定位特定行的场景中非常有用:
/// 演示 arg_* 系列方法:获取排序索引和极值位置
fn arg_operations_demo() -> PolarsResult<DataFrame> {
let df = df![
"product" => &["A", "B", "C", "D", "E"],
"sales" => &[150, 300, 200, 450, 100],
]?;
let result = df.lazy()
.with_columns([
// 返回按 sales 升序排列的索引
col("sales").arg_sort(false, false).alias("sort_idx"),
])
.collect()?;
// 单独演示 arg_max 和 arg_min
let sales = result.column("sales")?.as_materialized_series();
println!("销量最高的产品索引: {:?}", sales.arg_max()); // Some(3) -> "D"
println!("销量最低的产品索引: {:?}", sales.arg_min()); // Some(4) -> "E"
println!("{}", result);
Ok(result)
}这一节是本课的精华所在!掌握这些优化技巧,能让你的 Polars 代码性能再上一个台阶。
rechunk():减少内存碎片Polars 的底层基于 Apache Arrow,数据以"块(chunk)"为单位存储。经过多次 filter、select 等操作后,一个列可能被分散到多个不连续的 chunk 中,导致遍历时需要额外的跳转开销。rechunk() 将所有 chunk 合并为一个连续的内存块:
/// 演示 rechunk() 的使用:减少内存碎片提升性能
fn rechunk_demo() -> PolarsResult<()> {
let df = df![
"a" => &[1, 2, 3, 4, 5],
"b" => &[10, 20, 30, 40, 50],
]?;
// 多次操作后,内部可能产生碎片化的 chunk
let mut fragmented = df
.lazy()
.filter(col("a").gt(2))
.select([col("a"), col("b")])
.collect()?;
// 使用 rechunk() 合并内存块,提升后续操作性能
let consolidated = fragmented.rechunk_mut();
println!("rechunk 后的 DataFrame chunk 数更少,内存更连续");
println!("{}", consolidated);
Ok(())
}最佳实践:在 ETL pipeline 的中间节点,如果后续有大量计算操作,可以适时调用
rechunk()。Lazy API 的优化器在某些情况下会自动进行 rechunk。
with_columns 批量操作 vs 多次 with_column这是最常见的性能陷阱之一! 每次调用 with_column(注意是单数)都会创建一个新的 LazyFrame 节点,而 with_columns(复数)可以一次性添加多列,减少查询计划的复杂度:
/// 演示 with_columns 批量操作的性能优势
fn batch_columns_demo() -> PolarsResult<DataFrame> {
let df = df![
"name" => &["Alice", "Bob", "Charlie"],
"math" => &[90, 85, 78],
"english"=> &[88, 92, 80],
"science"=> &[95, 76, 88],
]?;
// ❌ 不推荐:多次 with_column,每次都产生新的查询节点
let bad = df.clone().lazy()
.with_column(
(col("math") + col("english") + col("science")).alias("total"),
)
.with_column(
(col("total") / lit(3.0)).alias("average"),
)
.with_column(
when(col("average").gt_eq(lit(85.0)))
.then(lit("A"))
.otherwise(lit("B"))
.alias("grade"),
);
// ✅ 推荐:一次 with_columns 批量添加多列
let good = df.lazy()
.with_columns([
(col("math") + col("english") + col("science")).alias("total"),
((col("math") + col("english") + col("science")) / lit(3.0)).alias("average"),
when(((col("math") + col("english") + col("science")) / lit(3.0)).gt_eq(lit(85.0)))
.then(lit("A"))
.otherwise(lit("B"))
.alias("grade"),
]);
// 查看两种方式的查询计划差异
println!("=== 批量方式查询计划 ===");
println!("{}", good.explain(false)?);
let result = good.collect()?;
println!("{}", result);
Ok(result)
}核心原则:强烈推荐能用一次
with_columns搞定的,就用一次。这不仅让代码更简洁、可读性更高,也能让查询计划更清晰,给优化器提供更好的优化机会。”
Polars 底层利用 Rust 的 SIMD(Single Instruction Multiple Data)指令集进行向量化计算,一条 CPU 指令可以同时处理多个数据元素。启用 performant feature 后,Polars 会自动使用更优化的算法路径:
# 在 Cargo.toml 中启用 performant feature
polars = { version = "0.53", features = ["lazy", "performant"] }说明:
performantfeature 会启用额外的 SIMD 优化和更高效的字符串处理。在大多数场景下,这是"免费的性能提升",建议在生产环境中始终启用。
如果需要更极致的 SIMD 加速,可以使用 Rust nightly 编译器配合 simd feature,但这需要 nightly 工具链:
# 仅在 nightly 下可用
polars = { version = "0.53", features = ["simd"] }POLARS_MAX_THREADSPolars 默认会使用所有可用的 CPU 核心进行并行计算。在某些场景下(如共享服务器、内存受限环境),你可能需要限制并行度:
/// 演示并行度控制
fn thread_control_demo() -> PolarsResult<()> {
// 方式 1:通过环境变量设置(推荐)
// 在程序启动前设置:
// export POLARS_MAX_THREADS=4
// 运行中的程序里不再调用 set_var,避免线程环境变更带来的未定义行为风险。
// 方式 2:通过 LazyFrame 的 with_context 配置
let df = df![
"a" => &[1, 2, 3, 4, 5, 6, 7, 8],
"b" => &[10, 20, 30, 40, 50, 60, 70, 80],
]?;
let result = df.lazy()
.select([(col("a") * col("b")).alias("product")])
.collect()?;
println!("{}", result);
Ok(())
}最佳实践:在容器化部署(Docker/K8s)中,务必设置
POLARS_MAX_THREADS以匹配容器的 CPU 限额,避免过度竞争。
mimalloc / jemalloc内存分配器的选择对性能有显著影响。Rust 默认使用系统分配器,但在高频分配/释放的场景下,mimalloc 或 jemalloc 通常表现更好:
# Cargo.toml - 使用 mimalloc 作为全局分配器
[dependencies]
polars = { version = "0.53", features = ["lazy", "csv", "parquet", "performant"] }
mimalloc = { version = "0.1", default-features = false }use mimalloc::MiMalloc;
// 将 mimalloc 设为全局分配器
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
fn main() -> PolarsResult<()> {
// 此后所有内存分配都通过 mimalloc
let df = df![
"a" => &[1, 2, 3],
"b" => &[4, 5, 6],
]?;
println!("{}", df);
Ok(())
}实测效果:在大规模数据处理中,mimalloc 通常能带来 5%~15% 的性能提升,尤其是在频繁创建和销毁 DataFrame 的场景下。
Lazy API 最大的价值在于查询优化,其中最核心的两个优化是:
谓词下推(Predicate Pushdown):
将 filter 条件尽可能早地执行,减少后续操作需要处理的数据量。
# 未优化:先读取所有列,再筛选
scan_csv("big_file.csv") -> select(["name", "age"]) -> filter(age > 25)
# 谓词下推优化后:读取时就筛选
scan_csv("big_file.csv", filter: age > 25) -> select(["name", "age"])投影下推(Projection Pushdown): 只读取实际需要的列,跳过不需要的列。
# 未优化:读取所有列(包括不需要的 address, phone 等)
scan_csv("big_file.csv") -> select(["name", "age"])
# 投影下推优化后:只读取 name 和 age 列
scan_csv("big_file.csv", columns: ["name", "age"])/// 演示谓词下推和投影下推
fn optimization_demo() -> PolarsResult<()> {
let df = df![
"name" => &["Alice", "Bob", "Charlie", "Diana"],
"age" => &[25, 30, 35, 28],
"score" => &[88, 92, 78, 95],
"city" => &["Beijing", "Shanghai", "Guangzhou", "Shenzhen"],
"email" => &["a@test.com", "b@test.com", "c@test.com", "d@test.com"],
]?;
// 这个查询同时触发了谓词下推和投影下推
let plan = df.lazy()
.filter(col("age").gt(28)) // 谓词下推:尽早过滤
.select([col("name"), col("score")]) // 投影下推:只读需要的列
.sort(["score"], SortMultipleOptions::default().with_order_descending(true))
.explain(true)?;
println!("=== 优化后的查询计划 ===\n{}", plan);
// 你会看到 FILTER 被推到了最前面,且只选择了 name 和 score 列
Ok(())
}collect() vs sink_*() 流式处理对于非常大的数据集,.collect() 会将所有结果一次性加载到内存中,可能导致 OOM(Out of Memory)。Polars 提供了 sink_*() 系列方法,支持流式处理——数据分批处理并直接写入磁盘,不需要全部驻留内存:
/// 演示 sink_parquet 流式写入:处理大数据集时避免 OOM
fn sink_parquet_demo() -> PolarsResult<()> {
let df = df![
"id" => &[1, 2, 3, 4, 5],
"name" => &["Alice", "Bob", "Charlie", "Diana", "Eve"],
"value" => &[100, 200, 150, 300, 250],
]?;
// 当前特性组合下不使用 sink_parquet,改为先执行 Lazy 再写入 Parquet。
let mut result = df.lazy()
.filter(col("value").gt(120))
.sort(["value"], SortMultipleOptions::default().with_order_descending(true))
.collect()?;
let mut file = File::create("output.parquet")?;
ParquetWriter::new(&mut file).finish(&mut result)?;
println!("数据已写入 output.parquet");
// 验证写入结果
let result = LazyFrame::scan_parquet("output.parquet".into(), Default::default())?
.collect()?;
println!("{}", result);
Ok(())
}适用场景:当你需要处理 GB 级别的数据,但内存有限时,
sink_parquet()、sink_csv()和sink_ipc()是救星。它们让 Polars 可以处理远超内存容量的数据集。
.explain(true) 打印优化前后对比.explain(true) 是你最好的调试工具。通过对比优化前后的查询计划,你可以直观地看到优化器做了什么:
/// 演示 explain 对比:理解优化器的行为
fn debug_with_explain() -> PolarsResult<()> {
let df = df![
"a" => &[1, 2, 3, 4, 5],
"b" => &[10, 20, 30, 40, 50],
"c" => &[100, 200, 300, 400, 500],
"d" => &[1000, 2000, 3000, 4000, 5000],
]?;
let query = df.lazy()
.filter(col("a").gt(2))
.filter(col("b").lt(40))
.select([col("a"), col("b"), col("c")])
.filter(col("c").gt(200))
.sort(["a"], SortMultipleOptions::default());
println!("=== 优化前 ===");
println!("{}", query.explain(false)?);
println!("\n=== 优化后 ===");
println!("{}", query.explain(true)?);
// 优化后的计划中,你会看到:
// 1. 多个 filter 被合并为一个
// 2. 只选择了 a, b, c 列(d 列被投影下推跳过)
// 3. filter 被推到了最前面(谓词下推)
Ok(())
}.profile() 查看各步骤耗时.profile() 会在执行查询的同时记录每个节点的耗时,帮你定位性能瓶颈:
/// 演示 profile():定位查询中的性能瓶颈
fn profile_demo() -> PolarsResult<()> {
let df = df![
"name" => &["Alice", "Bob", "Charlie", "Diana", "Eve"],
"age" => &[25, 30, 35, 28, 22],
"score" => &[88.5, 92.0, 78.5, 95.0, 85.0],
]?;
let (result, profile) = df.lazy()
.filter(col("age").gt(25))
.with_columns([
(col("score") * lit(1.1)).alias("weighted_score"),
col("age").alias("age_copy"),
])
.sort(["weighted_score"], SortMultipleOptions::default().with_order_descending(true))
.profile()?;
println!("=== 查询结果 ===\n{}", result);
println!("\n=== 性能 Profile ===\n{}", profile);
Ok(())
}profile 的输出会显示每个操作节点(如 FILTER、SORT、 WITH_COLUMNS 等)的耗时,让你快速定位哪个步骤最慢。
在 Polars 开发中,有几个常见的性能陷阱需要注意:
陷阱 1:在循环中反复 collect
// ❌ 极慢:每次循环都触发完整查询
for col_name in &["a", "b", "c"] {
let result = df.lazy().select([col(col_name)]).collect()?;
println!("{}", result);
}
// ✅ 推荐:一次 collect 获取所有需要的列
let result = df.lazy()
.select([col("a"), col("b"), col("c")])
.collect()?;陷阱 2:过早 collect
// ❌ 不推荐:过早 collect,丢失后续优化机会
let filtered = df.lazy().filter(col("age").gt(25)).collect()?;
let result = filtered.lazy().select([col("name")]).collect()?;
// ✅ 推荐:保持 Lazy 直到最后的 collect
let result = df.lazy()
.filter(col("age").gt(25))
.select([col("name")])
.collect()?;陷阱 3:使用 slice 后忘记重新排序
// slice 会打乱原始顺序,如果后续需要排序结果,记得重新 sort
let result = df.lazy()
.filter(col("score").gt(80))
.slice(0, 10) // 取前 10 行
.sort_by(["score"], SortMultipleOptions::default().with_order_descending(true))
.collect()?;恭喜你完成了 Polars Rust 系列第 8 课的学习!🎉 这也是本系列的最后一课,感谢您的陪伴,希望本系统对您学习 rust 及 rust polars 有帮助。
Lazy API 是 Polars 最强大的特性之一。通过今天的学习,你已经掌握了:
sink_*() 系列方法explain() 和 profile()记住:Lazy 不是银弹,但它是处理大规模数据的利器。在实际开发中,根据场景灵活选择 Eager 和 Lazy,才能发挥 Polars 的最大威力。