
为什么需要两层数据处理?
量化数据处理常常分为两个阶段。
阶段1:研究探索期(Python + pandas) 你刚拿到数据,最想快速回答这些问题:
pandas 的 df.head()、df.describe()、df.info() 能让你几秒钟内看清数据,快速迭代方案。
阶段2:生产落地期(Rust + Polars) 数据量上来后,pandas 容易 OOM、速度慢、无法长期稳定运行。这时需要 Rust + Polars:
# data_exploration.py
import pandas as pd
import numpy as np
df = pd.read_csv("data/aapl.us.txt", parse_dates=['<DATE>'])
# 快速了解数据
print("=== 数据概览 ===")
print(f"行数: {len(df)}")
print(f"列数: {len(df.columns)}")
print(f"\n列名: {df.columns.tolist()}")
print(f"\n数据类型:\n{df.dtypes}")
print(f"\n前 5 行:")
print(df.head())
print("\n=== 描述性统计 ===")
print(df.describe())
print("\n=== 缺失值统计 ===")
missing = df.isnull().sum()
print(missing[missing > 0])
print("\n=== 数据范围 ===")
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
print(f"{col}: [{df[col].min():.2f}, {df[col].max():.2f}]")典型输出(以 AAPL 日线为例):
=== 数据概览 ===
行数: 10473
列数: 10
列名: ['<TICKER>', '<PER>', '<DATE>', '<TIME>', '<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>', '<VOL>', '<OPENINT>']
数据类型:
<TICKER> object
<PER> object
<DATE> datetime64[ns]
<TIME> int64
<OPEN> float64
<HIGH> float64
<LOW> float64
<CLOSE> float64
<VOL> int64
<OPENINT> int64
dtype: object
前 5 行:
<TICKER> <PER> <DATE> <TIME> <OPEN> <HIGH> <LOW> <CLOSE> \
0 AAPL.US D 1984-09-07 0 0.099172 0.100390 0.097975 0.099172
1 AAPL.US D 1984-09-10 0 0.099172 0.099477 0.096788 0.098584
2 AAPL.US D 1984-09-11 0 0.099477 0.102175 0.099477 0.100390
3 AAPL.US D 1984-09-12 0 0.100390 0.100977 0.097367 0.097367
4 AAPL.US D 1984-09-13 0 0.102784 0.103077 0.102784 0.102784
<VOL> <OPENINT>
0 99242379 0
1 77028276 0
2 181637249 0
3 158675628 0
4 247131424 0
=== 描述性统计 ===
<DATE> <TIME> <OPEN> <HIGH> \
count 10473 10473.0 10473.000000 10473.000000
mean 2005-06-10 19:02:10.965339392 0.0 31.828664 32.172663
min 1984-09-07 00:00:00 0.0 0.054537 0.055135
25% 1995-01-16 00:00:00 0.0 0.292073 0.297831
50% 2005-06-09 00:00:00 0.0 1.261280 1.283240
75% 2015-11-02 00:00:00 0.0 25.086800 25.360700
max 2026-04-02 00:00:00 0.0 285.932000 288.350000
std NaN 0.0 62.055662 62.718917
<LOW> <CLOSE> <VOL> <OPENINT>
count 10473.000000 10473.000000 1.047300e+04 10473.0
mean 31.503916 31.851466 3.835056e+08 0.0
min 0.053938 0.053938 2.856842e+06 0.0
25% 0.286399 0.292073 1.292911e+08 0.0
50% 1.239590 1.256140 2.492596e+08 0.0
75% 24.826000 25.122500 4.859325e+08 0.0
max 283.035000 285.922000 8.846194e+09 0.0
std 61.440598 62.105043 4.067278e+08 0.0
=== 缺失值统计 ===
Series([], dtype: int64)
=== 数据范围 ===
<TIME>: [0.00, 0.00]
<OPEN>: [0.05, 285.93]
<HIGH>: [0.06, 288.35]
<LOW>: [0.05, 283.04]
<CLOSE>: [0.05, 285.92]
<VOL>: [2856842.00, 8846193721.00]
<OPENINT>: [0.00, 0.00]# quality_check.py
import pandas as pd
def check_data_quality(df: pd.DataFrame) -> dict:
issues = {}
if df['<DATE>'].duplicated().sum() > 0:
issues['duplicate_dates'] = df['<DATE>'].duplicated().sum()
if df.isnull().sum().sum() > 0:
issues['missing_values'] = df.isnull().sum().sum()
if (df[['<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>']] < 0).any().any():
issues['negative_prices'] = True
if (df['<HIGH>'] < df['<LOW>']).sum() > 0:
issues['invalid_high_low'] = (df['<HIGH>'] < df['<LOW>']).sum()
if ((df['<CLOSE>'] > df['<HIGH>']) | (df['<CLOSE>'] < df['<LOW>'])).sum() > 0:
issues['invalid_close'] = ((df['<CLOSE>'] > df['<HIGH>']) | (df['<CLOSE>'] < df['<LOW>'])).sum()
if (df['<VOL>'] == 0).sum() > 0:
issues['zero_volume'] = (df['<VOL>'] == 0).sum()
return issues
df = pd.read_csv("data/aapl.us.txt", parse_dates=['<DATE>'])
issues = check_data_quality(df)
print("✓ 数据质量良好" if not issues else f"发现问题:{issues}")# preprocessing_design.py
import pandas as pd
import numpy as np
def design_preprocessing(df: pd.DataFrame) -> pd.DataFrame:
df = df.sort_values('Date').reset_index(drop=True)
df = df.fillna(method='ffill') # 前向填充
df['Return'] = df['Close'].pct_change()
df['LogReturn'] = np.log(df['Close'] / df['Close'].shift(1))
# 异常值检测(3σ)
std = df['Return'].std()
df['Outlier'] = (df['Return'].abs() > 3 * std).astype(int)
# 技术指标
df['MA5'] = df['Close'].rolling(5).mean()
df['MA20'] = df['Close'].rolling(20).mean()
df['MA60'] = df['Close'].rolling(60).mean()
return df
df = pd.read_csv("data/AAPL.csv", parse_dates=['Date'])
processed = design_preprocessing(df)
print(processed[['Date', 'Close', 'Return', 'MA5', 'MA20']].head())Python 阶段的核心价值在于快速验证逻辑,发现问题立刻调整。
[package]
name = "rustdata_processing"
version = "0.1.0"
edition = "2024"
[dependencies]
polars = { version = "0.53", features = ["lazy", "dtype-date", "dtype-datetime", "temporal","parquet","pct_change","abs", "rolling_window", "log"] }
polars-io = { version = "0.53", features = ["csv", "parquet"] }// src/lib.rs
use polars::prelude::*;
use std::fs::File;
pub fn efficient_preprocessing(file_path: &str, output_path: Option<&str>) -> PolarsResult<DataFrame> {
// Lazy 读取 CSV(支持自动解析日期)
let lf = LazyCsvReader::new(PlRefPath::new(file_path))
.with_has_header(true)
.with_try_parse_dates(true)
.finish()?
// 排序(必须先排序才能正确计算收益率)
.sort(
["<DATE>"],
SortMultipleOptions::default(),
)
// 前向填充缺失值
.with_column(
col("<CLOSE>").fill_null(col("<CLOSE>").shift(lit(1))).alias("Close_filled")
)
// 计算收益率
.with_column(
((col("Close_filled") - col("Close_filled").shift(lit(1))) / col("Close_filled").shift(lit(1)))
.alias("Return")
)
.with_column(
(col("Close_filled").log(lit(std::f64::consts::E)) - col("Close_filled").shift(lit(1)).log(lit(std::f64::consts::E)))
.alias("LogReturn")
)
// 滚动均线
.with_column(
col("Close_filled").rolling_mean(RollingOptionsFixedWindow {
window_size: 5,
min_periods: 5,
center: false,
..Default::default()
})
.alias("MA5")
)
.with_column(
col("Close_filled").rolling_mean(RollingOptionsFixedWindow {
window_size: 20,
min_periods: 20,
center: false,
..Default::default()
})
.alias("MA20")
)
// 异常值
.with_column(
col("Return").abs()
.gt(col("Return").std(0) * lit(3.0))
.cast(DataType::Int32)
.alias("Outlier")
);
let mut df = lf.collect()?;
// 可选:保存为 Parquet(压缩率高、读取快)
if let Some(out) = output_path {
let file = File::create(out)?;
ParquetWriter::new(file).finish(&mut df)?;
println!("✅ 已保存到 Parquet: {}", out);
}
Ok(df)
}
// src/main.rs
use rustdata_processing::efficient_preprocessing;
fn main() -> PolarsResult<()> {
let df = efficient_preprocessing("data/AAPL.csv", Some("data/processed/AAPL.parquet"))?;
println!("处理完成!共 {} 行数据", df.height());
println!("前 5 行:\n{}", df.head(Some(5)));
Ok(())
}运行命令:
cargo run --release运行结果
✅ 已保存到 Parquet: data/processed/AAPL.parquet
处理完成!共 10473 行数据
前 5 行:
shape: (5, 16)
┌──────────┬───────┬──────────┬────────┬───┬───────────┬──────────┬──────┬─────────┐
│ <TICKER> ┆ <PER> ┆ <DATE> ┆ <TIME> ┆ … ┆ LogReturn ┆ MA5 ┆ MA20 ┆ Outlier │
│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ i64 ┆ i64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ i32 │
╞══════════╪═══════╪══════════╪════════╪═══╪═══════════╪══════════╪══════╪═════════╡
│ AAPL.US ┆ D ┆ 19840907 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ AAPL.US ┆ D ┆ 19840910 ┆ 0 ┆ … ┆ -0.005954 ┆ null ┆ null ┆ 0 │
│ AAPL.US ┆ D ┆ 19840911 ┆ 0 ┆ … ┆ 0.018156 ┆ null ┆ null ┆ 0 │
│ AAPL.US ┆ D ┆ 19840912 ┆ 0 ┆ … ┆ -0.030575 ┆ null ┆ null ┆ 0 │
│ AAPL.US ┆ D ┆ 19840913 ┆ 0 ┆ … ┆ 0.054142 ┆ 0.099659 ┆ null ┆ 0 │
└──────────┴───────┴──────────┴────────┴───┴───────────┴──────────┴──────┴─────────┘pub fn incremental_update(existing_path: &str, new_path: &str, output_path: &str) -> PolarsResult<usize> {
let existing = LazyFrame::scan_parquet(PlRefPath::new(existing_path), ScanArgsParquet::default())?.collect()?;
let new_df = LazyFrame::scan_parquet(PlRefPath::new(new_path), ScanArgsParquet::default())?.collect()?;
let original_count = existing.height();
let merged = existing.vstack(&new_df)?;
let mut final_df = merged
.lazy()
.sort(["<DATE>"], SortMultipleOptions::default())
.unique(None, UniqueKeepStrategy::Last)
.collect()?;
let file = File::create(output_path)?;
ParquetWriter::new(file).finish(&mut final_df)?;
Ok(final_df.height() - original_count)
}pub fn save_partitioned(df: DataFrame, output_dir: &str) -> PolarsResult<()> {
let df = df
.lazy()
.with_column(col("<DATE>").dt().year().alias("year"))
.with_column(col("<DATE>").dt().month().alias("month"))
.collect()?;
// Polars Rust 暂无内置分区写入,可按年/月手动拆分后分别保存
// 这里示例:保存完整 Parquet,后续查询时用 filter 实现分区效果
let file = File::create(format!("{}/data.parquet", output_dir))?;
ParquetWriter::new(file).finish(&df)?;
println!("✅ 分区列已添加(year, month),可通过 filter 高效查询");
Ok(())
}此处需要根据实际数据情况来处理
Python 探索检查表
df.head()、describe()、info() 快速看数据isnull().sum() 检查缺失duplicated()、min()/max() 找异常Rust 生产原则
LazyFrame + collect()vstack + unique 实现下篇预告 《描述性统计:Python 灵活分析,Rust 高效计算》将带你对比 pandas 和 Polars 的分组统计、绩效指标计算,并给出完整的 Rust 绩效报告生成方案。
我们下篇见!