首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Rust 量化统计实战系列 第 2 篇:数据加载与预处理:pandas 原型探索,Polars 生产流水线

Rust 量化统计实战系列 第 2 篇:数据加载与预处理:pandas 原型探索,Polars 生产流水线

作者头像
不吃草的牛德
发布2026-04-23 13:03:23
发布2026-04-23 13:03:23
1130
举报
文章被收录于专栏:RustRust

为什么需要两层数据处理?

量化数据处理常常分为两个阶段。

阶段1:研究探索期(Python + pandas) 你刚拿到数据,最想快速回答这些问题:

  • • 数据长什么样?有多少行、哪些列?
  • • 有缺失值吗?异常值多不多?
  • • 数据范围是否合理?
  • • 该怎么处理才合适?

pandas 的 df.head()df.describe()df.info() 能让你几秒钟内看清数据,快速迭代方案。

阶段2:生产落地期(Rust + Polars) 数据量上来后,pandas 容易 OOM、速度慢、无法长期稳定运行。这时需要 Rust + Polars:

  • • LazyFrame 懒加载,不怕亿级数据
  • • 类型安全,编译时就发现问题
  • • 内存高效,支持增量更新和长期运行

Python 数据探索

使用 pandas 快速了解数据

代码语言:javascript
复制
# data_exploration.py
import pandas as pd
import numpy as np

df = pd.read_csv("data/aapl.us.txt", parse_dates=['<DATE>'])

# 快速了解数据
print("=== 数据概览 ===")
print(f"行数: {len(df)}")
print(f"列数: {len(df.columns)}")
print(f"\n列名: {df.columns.tolist()}")
print(f"\n数据类型:\n{df.dtypes}")
print(f"\n前 5 行:")
print(df.head())

print("\n=== 描述性统计 ===")
print(df.describe())

print("\n=== 缺失值统计 ===")
missing = df.isnull().sum()
print(missing[missing > 0])

print("\n=== 数据范围 ===")
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
    print(f"{col}: [{df[col].min():.2f}, {df[col].max():.2f}]")

典型输出(以 AAPL 日线为例):

代码语言:javascript
复制
=== 数据概览 ===
行数: 10473
列数: 10

列名: ['<TICKER>', '<PER>', '<DATE>', '<TIME>', '<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>', '<VOL>', '<OPENINT>']

数据类型:
<TICKER>             object
<PER>                object
<DATE>       datetime64[ns]
<TIME>                int64
<OPEN>              float64
<HIGH>              float64
<LOW>               float64
<CLOSE>             float64
<VOL>                 int64
<OPENINT>             int64
dtype: object

前 5 行:
  <TICKER> <PER>     <DATE>  <TIME>    <OPEN>    <HIGH>     <LOW>   <CLOSE>  \
0  AAPL.US     D 1984-09-07       0  0.099172  0.100390  0.097975  0.099172   
1  AAPL.US     D 1984-09-10       0  0.099172  0.099477  0.096788  0.098584   
2  AAPL.US     D 1984-09-11       0  0.099477  0.102175  0.099477  0.100390   
3  AAPL.US     D 1984-09-12       0  0.100390  0.100977  0.097367  0.097367   
4  AAPL.US     D 1984-09-13       0  0.102784  0.103077  0.102784  0.102784   

       <VOL>  <OPENINT>  
0   99242379          0  
1   77028276          0  
2  181637249          0  
3  158675628          0  
4  247131424          0  

=== 描述性统计 ===
                              <DATE>   <TIME>        <OPEN>        <HIGH>  \
count                          10473  10473.0  10473.000000  10473.000000   
mean   2005-06-10 19:02:10.965339392      0.0     31.828664     32.172663   
min              1984-09-07 00:00:00      0.0      0.054537      0.055135   
25%              1995-01-16 00:00:00      0.0      0.292073      0.297831   
50%              2005-06-09 00:00:00      0.0      1.261280      1.283240   
75%              2015-11-02 00:00:00      0.0     25.086800     25.360700   
max              2026-04-02 00:00:00      0.0    285.932000    288.350000   
std                              NaN      0.0     62.055662     62.718917   

              <LOW>       <CLOSE>         <VOL>  <OPENINT>  
count  10473.000000  10473.000000  1.047300e+04    10473.0  
mean      31.503916     31.851466  3.835056e+08        0.0  
min        0.053938      0.053938  2.856842e+06        0.0  
25%        0.286399      0.292073  1.292911e+08        0.0  
50%        1.239590      1.256140  2.492596e+08        0.0  
75%       24.826000     25.122500  4.859325e+08        0.0  
max      283.035000    285.922000  8.846194e+09        0.0  
std       61.440598     62.105043  4.067278e+08        0.0  

=== 缺失值统计 ===
Series([], dtype: int64)

=== 数据范围 ===
<TIME>: [0.00, 0.00]
<OPEN>: [0.05, 285.93]
<HIGH>: [0.06, 288.35]
<LOW>: [0.05, 283.04]
<CLOSE>: [0.05, 285.92]
<VOL>: [2856842.00, 8846193721.00]
<OPENINT>: [0.00, 0.00]

数据质量检查函数

代码语言:javascript
复制
# quality_check.py
import pandas as pd

def check_data_quality(df: pd.DataFrame) -> dict:
    issues = {}
    if df['<DATE>'].duplicated().sum() > 0:
        issues['duplicate_dates'] = df['<DATE>'].duplicated().sum()
    if df.isnull().sum().sum() > 0:
        issues['missing_values'] = df.isnull().sum().sum()
    if (df[['<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>']] < 0).any().any():
        issues['negative_prices'] = True
    if (df['<HIGH>'] < df['<LOW>']).sum() > 0:
        issues['invalid_high_low'] = (df['<HIGH>'] < df['<LOW>']).sum()
    if ((df['<CLOSE>'] > df['<HIGH>']) | (df['<CLOSE>'] < df['<LOW>'])).sum() > 0:
        issues['invalid_close'] = ((df['<CLOSE>'] > df['<HIGH>']) | (df['<CLOSE>'] < df['<LOW>'])).sum()
    if (df['<VOL>'] == 0).sum() > 0:
        issues['zero_volume'] = (df['<VOL>'] == 0).sum()
    return issues

df = pd.read_csv("data/aapl.us.txt", parse_dates=['<DATE>'])
issues = check_data_quality(df)
print("✓ 数据质量良好" if not issues else f"发现问题:{issues}")

设计预处理逻辑(Python 原型)

代码语言:javascript
复制
# preprocessing_design.py
import pandas as pd
import numpy as np

def design_preprocessing(df: pd.DataFrame) -> pd.DataFrame:
    df = df.sort_values('Date').reset_index(drop=True)
    df = df.fillna(method='ffill')                    # 前向填充
    df['Return'] = df['Close'].pct_change()
    df['LogReturn'] = np.log(df['Close'] / df['Close'].shift(1))
    
    # 异常值检测(3σ)
    std = df['Return'].std()
    df['Outlier'] = (df['Return'].abs() > 3 * std).astype(int)
    
    # 技术指标
    df['MA5'] = df['Close'].rolling(5).mean()
    df['MA20'] = df['Close'].rolling(20).mean()
    df['MA60'] = df['Close'].rolling(60).mean()
    return df

df = pd.read_csv("data/AAPL.csv", parse_dates=['Date'])
processed = design_preprocessing(df)
print(processed[['Date', 'Close', 'Return', 'MA5', 'MA20']].head())

Python 阶段的核心价值在于快速验证逻辑,发现问题立刻调整。


Rust 生产级实现(纯 Rust 版本)

1. Cargo.toml 配置(必须)

代码语言:javascript
复制
[package]
name = "rustdata_processing"
version = "0.1.0"
edition = "2024"

[dependencies]
polars = { version = "0.53", features = ["lazy", "dtype-date", "dtype-datetime", "temporal","parquet","pct_change","abs", "rolling_window", "log"] }
polars-io = { version = "0.53", features = ["csv", "parquet"] }

2. 核心预处理函数(lib.rs)

代码语言:javascript
复制
// src/lib.rs
use polars::prelude::*;

use std::fs::File;

pub fn efficient_preprocessing(file_path: &str, output_path: Option<&str>) -> PolarsResult<DataFrame> {
    // Lazy 读取 CSV(支持自动解析日期)
    let lf = LazyCsvReader::new(PlRefPath::new(file_path))
        .with_has_header(true)
        .with_try_parse_dates(true)
        .finish()?
        // 排序(必须先排序才能正确计算收益率)
        .sort(
            ["<DATE>"],
            SortMultipleOptions::default(),
        )
        // 前向填充缺失值
        .with_column(
            col("<CLOSE>").fill_null(col("<CLOSE>").shift(lit(1))).alias("Close_filled")
        )
        // 计算收益率
        .with_column(
            ((col("Close_filled") - col("Close_filled").shift(lit(1))) / col("Close_filled").shift(lit(1)))
                .alias("Return")
        )
        .with_column(
            (col("Close_filled").log(lit(std::f64::consts::E)) - col("Close_filled").shift(lit(1)).log(lit(std::f64::consts::E)))
                .alias("LogReturn")
        )
        // 滚动均线
        .with_column(
            col("Close_filled").rolling_mean(RollingOptionsFixedWindow {
                window_size: 5,
                min_periods: 5,
                center: false,
                ..Default::default()
            })
                .alias("MA5")
        )
        .with_column(
            col("Close_filled").rolling_mean(RollingOptionsFixedWindow {
                window_size: 20,
                min_periods: 20,
                center: false,
                ..Default::default()
            })
                .alias("MA20")
        )
        // 异常值
        .with_column(
            col("Return").abs()
                .gt(col("Return").std(0) * lit(3.0))
                .cast(DataType::Int32)
                .alias("Outlier")
        );

    let mut df = lf.collect()?;

    // 可选:保存为 Parquet(压缩率高、读取快)
    if let Some(out) = output_path {
        let file = File::create(out)?;
        ParquetWriter::new(file).finish(&mut df)?;
        println!("✅ 已保存到 Parquet: {}", out);
    }

    Ok(df)
}

3. 主函数运行示例(main.rs)

代码语言:javascript
复制
// src/main.rs
use rustdata_processing::efficient_preprocessing;

fn main() -> PolarsResult<()> {
    let df = efficient_preprocessing("data/AAPL.csv", Some("data/processed/AAPL.parquet"))?;
    println!("处理完成!共 {} 行数据", df.height());
    println!("前 5 行:\n{}", df.head(Some(5)));
    Ok(())
}

运行命令

代码语言:javascript
复制
cargo run --release

运行结果

代码语言:javascript
复制
✅ 已保存到 Parquet: data/processed/AAPL.parquet
处理完成!共 10473 行数据
前 5 行:
shape: (5, 16)
┌──────────┬───────┬──────────┬────────┬───┬───────────┬──────────┬──────┬─────────┐
│ <TICKER> ┆ <PER> ┆ <DATE>   ┆ <TIME> ┆ … ┆ LogReturn ┆ MA5      ┆ MA20 ┆ Outlier │
│ ---      ┆ ---   ┆ ---      ┆ ---    ┆   ┆ ---       ┆ ---      ┆ ---  ┆ ---     │
│ str      ┆ str   ┆ i64      ┆ i64    ┆   ┆ f64       ┆ f64      ┆ f64  ┆ i32     │
╞══════════╪═══════╪══════════╪════════╪═══╪═══════════╪══════════╪══════╪═════════╡
│ AAPL.US  ┆ D     ┆ 19840907 ┆ 0      ┆ … ┆ null      ┆ null     ┆ null ┆ null    │
│ AAPL.US  ┆ D     ┆ 19840910 ┆ 0      ┆ … ┆ -0.005954 ┆ null     ┆ null ┆ 0       │
│ AAPL.US  ┆ D     ┆ 19840911 ┆ 0      ┆ … ┆ 0.018156  ┆ null     ┆ null ┆ 0       │
│ AAPL.US  ┆ D     ┆ 19840912 ┆ 0      ┆ … ┆ -0.030575 ┆ null     ┆ null ┆ 0       │
│ AAPL.US  ┆ D     ┆ 19840913 ┆ 0      ┆ … ┆ 0.054142  ┆ 0.099659 ┆ null ┆ 0       │
└──────────┴───────┴──────────┴────────┴───┴───────────┴──────────┴──────┴─────────┘

4. 增量更新(纯 Rust)

代码语言:javascript
复制
pub fn incremental_update(existing_path: &str, new_path: &str, output_path: &str) -> PolarsResult<usize> {
    let existing = LazyFrame::scan_parquet(PlRefPath::new(existing_path), ScanArgsParquet::default())?.collect()?;
    let new_df = LazyFrame::scan_parquet(PlRefPath::new(new_path), ScanArgsParquet::default())?.collect()?;

    let original_count = existing.height();
    let merged = existing.vstack(&new_df)?;
    
    let mut final_df = merged
        .lazy()
        .sort(["<DATE>"], SortMultipleOptions::default())
        .unique(None, UniqueKeepStrategy::Last)
        .collect()?;

    let file = File::create(output_path)?;
    ParquetWriter::new(file).finish(&mut final_df)?;
    
    Ok(final_df.height() - original_count)
}

5. 分区存储策略(推荐生产使用)

代码语言:javascript
复制
pub fn save_partitioned(df: DataFrame, output_dir: &str) -> PolarsResult<()> {
    let df = df
        .lazy()
        .with_column(col("<DATE>").dt().year().alias("year"))
        .with_column(col("<DATE>").dt().month().alias("month"))
        .collect()?;

    // Polars Rust 暂无内置分区写入,可按年/月手动拆分后分别保存
    // 这里示例:保存完整 Parquet,后续查询时用 filter 实现分区效果
    let file = File::create(format!("{}/data.parquet", output_dir))?;
    ParquetWriter::new(file).finish(&df)?;
    println!("✅ 分区列已添加(year, month),可通过 filter 高效查询");
    Ok(())
}

此处需要根据实际数据情况来处理


最佳实践总结

Python 探索检查表

  • df.head()describe()info() 快速看数据
  • isnull().sum() 检查缺失
  • duplicated()min()/max() 找异常

Rust 生产原则

  • • 始终使用 LazyFrame + collect()
  • • 关键操作前必须排序(收益率、滚动计算依赖)
  • • 优先保存 Parquet,压缩率和读取速度远超 CSV
  • • 增量更新用 vstack + unique 实现

下篇预告 《描述性统计:Python 灵活分析,Rust 高效计算》将带你对比 pandas 和 Polars 的分组统计、绩效指标计算,并给出完整的 Rust 绩效报告生成方案。


我们下篇见!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-04-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Rust火箭工坊 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Python 数据探索
    • 使用 pandas 快速了解数据
    • 数据质量检查函数
    • 设计预处理逻辑(Python 原型)
  • Rust 生产级实现(纯 Rust 版本)
    • 1. Cargo.toml 配置(必须)
    • 2. 核心预处理函数(lib.rs)
    • 3. 主函数运行示例(main.rs)
    • 4. 增量更新(纯 Rust)
    • 5. 分区存储策略(推荐生产使用)
  • 最佳实践总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档