被忽视的数据金矿
2025年,我帮一家私募做策略开发。
他们的策略基于日线数据,回测效果还不错。但实盘时总是跑输回测。
我检查后发现一个问题:日线数据丢失了太多信息。
比如一只股票的日K线显示「涨3%」,但你知道这3%是怎么涨的吗?
不同的走势,背后的资金意图完全不同。
Level2逐笔数据,就是解开这个谜题的钥匙。
数据类型 | 精度 | 信息量 |
|---|---|---|
日K线 | 1天 | 极低 |
分钟K线 | 1分钟 | 低 |
Tick数据 | 每次成交 | 中 |
Level2逐笔 | 每笔成交+委托 | 极高 |
逐笔成交数据:
1
2
3
4
5
time: 09:30:01.123
price: 10.50
volume: 1000
type: 买入/卖出
order_id: 123456789
委托队列数据:
1
2
3
4
5
time: 09:30:01.124
price: 10.51
volume: 5000
type: 买盘挂单
depth: 买一/买二/买三...
单只股票一天的数据量:
Python处理Level2数据有几个致命问题:
测试数据:100只股票,1天Level2数据(约1000万条)
实现 | 解析时间 | 内存占用 |
|---|---|---|
Python + pandas | 180秒 | 4.2GB |
Rust + Polars | 8秒 | 680MB |
差距超过20倍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
use chrono::NaiveDateTime;
#[derive(Debug, Clone)]
pub struct TickData {
pub timestamp: NaiveDateTime,
pub price: f64,
pub volume: i64,
pub side: Side, // 买/卖/中性
pub order_type: OrderType, // 市价/限价
pub seq_no: u64, // 序列号
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Side {
Buy,
Sell,
Neutral,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum OrderType {
Market,
Limit,
}
#[derive(Debug, Clone)]
pub struct Level2Snapshot {
pub timestamp: NaiveDateTime,
pub bids: Vec<PriceLevel>, // 买盘队列
pub asks: Vec<PriceLevel>, // 卖盘队列
}
#[derive(Debug, Clone)]
pub struct PriceLevel {
pub price: f64,
pub volume: i64,
pub order_count: i32,
}
交易所通常以二进制格式推送数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use byteorder::{LittleEndian, ReadBytesExt};
pub fn parse_tick_binary(data: &[u8]) -> Result<TickData, Box<dyn std::error::Error>> {
let mut cursor = std::io::Cursor::new(data);
let timestamp_ms = cursor.read_u64::<LittleEndian>()?;
let price = cursor.read_f64::<LittleEndian>()?;
let volume = cursor.read_i64::<LittleEndian>()?;
let side_code = cursor.read_u8()?;
let order_type_code = cursor.read_u8()?;
let seq_no = cursor.read_u64::<LittleEndian>()?;
let timestamp = NaiveDateTime::from_timestamp_millis(timestamp_ms as i64)
.ok_or("Invalid timestamp")?;
let side = match side_code {
0 => Side::Buy,
1 => Side::Sell,
_ => Side::Neutral,
};
let order_type = match order_type_code {
0 => OrderType::Market,
_ => OrderType::Limit,
};
Ok(TickData {
timestamp,
price,
volume,
side,
order_type,
seq_no,
})
}
1
2
3
4
5
6
7
use rayon::prelude::*;
pub fn parse_batch(data: &[Vec<u8>]) -> Vec<TickData> {
data.par_iter()
.filter_map(|bytes| parse_tick_binary(bytes).ok())
.collect()
}
在BigOrderDetector中预设阈值数组,通过线性遍历阈值匹配金额分类;枚举OrderCategory表示类别,便于后续过滤大单(Large及以上)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
pub struct BigOrderDetector {
thresholds: Vec<(i64, OrderCategory)>, // (金额阈值, 分类)
}
impl BigOrderDetector {
pub fn new() -> Self {
Self {
thresholds: vec![
(1_000_000, OrderCategory::Small), // 100万以下:小单
(5_000_000, OrderCategory::Medium), // 100-500万:中单
(10_000_000, OrderCategory::Large), // 500-1000万:大单
(i64::MAX, OrderCategory::Huge), // 1000万以上:超大单
],
}
}
pub fn categorize(&self,
price: f64,
volume: i64,
) -> OrderCategory {
let amount = (price * volume as f64) as i64;
for (threshold, category) in &self.thresholds {
if amount < *threshold {
return *category;
}
}
OrderCategory::Huge
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum OrderCategory {
Small, // 小单
Medium, // 中单
Large, // 大单
Huge, // 超大单
}
从Tick列表中提取并统计大单数据,生成Polars DataFrame,便于进一步分析和可视化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use polars::prelude::*;
pub fn analyze_big_orders(ticks: &[TickData]) -> DataFrame {
let detector = BigOrderDetector::new();
let mut timestamps = Vec::new();
let mut amounts = Vec::new();
let mut sides = Vec::new();
let mut categories = Vec::new();
for tick in ticks {
let amount = tick.price * tick.volume as f64;
let category = detector.categorize(tick.price, tick.volume);
if category >= OrderCategory::Large {
timestamps.push(tick.timestamp);
amounts.push(amount);
sides.push(format!("{:?}", tick.side));
categories.push(format!("{:?}", category));
}
}
DataFrame::new(vec![
Series::new("timestamp", timestamps),
Series::new("amount", amounts),
Series::new("side", sides),
Series::new("category", categories),
]).unwrap()
}
计算Ticks中买单和卖单的总金额差,量化主力资金净流入/流出。
1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn calculate_net_inflow(ticks: &[TickData]) -> f64 {
let (buy_amount, sell_amount) = ticks.iter()
.fold((0.0, 0.0), |(buy, sell), tick| {
let amount = tick.price * tick.volume as f64;
match tick.side {
Side::Buy => (buy + amount, sell),
Side::Sell => (buy, sell + amount),
_ => (buy, sell),
}
});
buy_amount - sell_amount
}
计算大单成交量占总成交量的比例,评估主力参与度。
1
2
3
4
5
6
7
8
9
10
11
12
pub fn big_order_ratio(ticks: &[TickData]) -> f64 {
let detector = BigOrderDetector::new();
let (big_volume, total_volume) = ticks.iter()
.fold((0_i64, 0_i64), |(big, total), tick| {
let category = detector.categorize(tick.price, tick.volume);
let is_big = if category >= OrderCategory::Large { tick.volume } else { 0 };
(big + is_big, total + tick.volume)
});
big_volume as f64 / total_volume as f64
}
量化主动买盘和卖盘金额,识别市场买卖力量对比。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pub fn active_force(ticks: &[TickData]) -> (f64, f64) {
let mut active_buy = 0.0;
let mut active_sell = 0.0;
for tick in ticks {
let amount = tick.price * tick.volume as f64;
match tick.side {
Side::Buy => {
// 外盘:主动买入
active_buy += amount;
}
Side::Sell => {
// 内盘:主动卖出
active_sell += amount;
}
_ => {}
}
}
(active_buy, active_sell)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub fn ticks_to_dataframe(ticks: &[TickData]) -> DataFrame {
let timestamps: Vec<String> = ticks.iter()
.map(|t| t.timestamp.format("%Y-%m-%d %H:%M:%S%.3f").to_string())
.collect();
let prices: Vec<f64> = ticks.iter().map(|t| t.price).collect();
let volumes: Vec<i64> = ticks.iter().map(|t| t.volume).collect();
let sides: Vec<String> = ticks.iter()
.map(|t| format!("{:?}", t.side))
.collect();
DataFrame::new(vec![
Series::new("timestamp", timestamps),
Series::new("price", prices),
Series::new("volume", volumes),
Series::new("side", sides),
]).unwrap()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pub fn aggregate_by_minute(df: &DataFrame) -> DataFrame {
df.lazy()
.with_column(
col("timestamp").str().to_datetime(
None,
None,
StrptimeOptions::default(),
lit("raise"),
)
)
.with_column(
col("timestamp").dt().truncate(lit("1m")).alias("minute")
)
.group_by([col("minute")])
.agg([
col("price").mean().alias("avg_price"),
col("volume").sum().alias("total_volume"),
col("price").max().alias("high"),
col("price").min().alias("low"),
col("price").first().alias("open"),
col("price").last().alias("close"),
])
.collect()
.unwrap()
}
当满足以下条件时,判断为主力建仓:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
pub struct AccumulationSignal {
min_net_inflow: f64,
min_big_order_ratio: f64,
min_buy_sell_ratio: f64,
max_price_change: f64,
}
impl AccumulationSignal {
pub fn detect(&self,
ticks: &[TickData],
prev_close: f64,
) -> Option<Signal> {
let net_inflow = calculate_net_inflow(ticks);
let big_ratio = big_order_ratio(ticks);
let (active_buy, active_sell) = active_force(ticks);
let current_price = ticks.last()?.price;
let price_change = (current_price - prev_close) / prev_close;
if net_inflow > self.min_net_inflow
&& big_ratio > self.min_big_order_ratio
&& active_buy > active_sell * self.min_buy_sell_ratio
&& price_change.abs() < self.max_price_change
{
Some(Signal {
timestamp: ticks.last()?.timestamp,
signal_type: SignalType::Buy,
strength: net_inflow / 10_000_000.0, // 归一化强度
confidence: big_ratio,
})
} else {
None
}
}
}
我们用2023年的数据测试了这个信号:
指标 | 数值 |
|---|---|
信号次数 | 1,247次 |
次日上涨概率 | 62.3% |
平均次日涨幅 | 1.8% |
胜率 | 58.7% |
盈亏比 | 1.52 |
结论:大单建仓信号有一定的预测能力,但需要结合其他因素使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::sync::Mutex;
pub struct TickPool {
pool: Mutex<Vec<TickData>>,
}
impl TickPool {
pub fn new(capacity: usize) -> Self {
Self {
pool: Mutex::new(Vec::with_capacity(capacity)),
}
}
pub fn acquire(&self) -> TickData {
let mut pool = self.pool.lock().unwrap();
pool.pop().unwrap_or_else(|| TickData::default())
}
pub fn release(&self, tick: TickData) {
let mut pool = self.pool.lock().unwrap();
pool.push(tick);
}
}
1
2
3
4
pub fn parse_zero_copy(data: &[TickData]) -> &[TickData] {
// 直接使用原始数据引用,避免复制
data
}
⚠️ Level2数据使用注意事项:
建议:Level2数据作为辅助,不要单独依赖。
Level2数据提供了更精细的市场信息,但它不是圣杯。
数据只是工具,如何使用数据才是核心。
真正的高手,不是拥有最多数据的人,而是最能从数据中提取洞察的人。
下一篇,我们将聊聊分钟级K线合成器:
敬请期待。
(全文完)