2023年,我有一个做量化的朋友,他开发的均值回归策略在2022年表现出色,年化收益28%。
他很满意,把全部资金都投入了这个策略。
结果2023年遇上单边上涨行情,均值回归策略一路踏空,全年亏损12%。
他问我:「为什么策略突然就不行了?」
我说:「不是策略不行,是市场变了。均值回归天生不适合单边趋势行情。」
单一策略的天花板在于:它只在特定市场环境下有效。
均值回归在震荡市赚钱,动量策略在趋势市赚钱——但没有人能预测明天是什么市。
解决方案?多策略组合。
今天我们聊聊如何用Polars处理多时间框架信号,构建一个稳健的多策略组合系统。
诺贝尔经济学奖得主马科维茨说过:「分散化是投资唯一的免费午餐。」
不同策略有不同的「性格」:
如果两个策略的收益相关性低,组合后:
在组合之前,先分析策略相关性:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fn calculate_correlation(returns_a: &[f64], returns_b: &[f64]) -> f64 {
let n = returns_a.len() as f64;
let mean_a: f64 = returns_a.iter().sum::<f64>() / n;
let mean_b: f64 = returns_b.iter().sum::<f64>() / n;
let cov: f64 = returns_a.iter()
.zip(returns_b.iter())
.map(|(a, b)| (a - mean_a) * (b - mean_b))
.sum::<f64>() / n;
let std_a = (returns_a.iter().map(|a| (a - mean_a).powi(2)).sum::<f64>() / n).sqrt();
let std_b = (returns_b.iter().map(|b| (b - mean_b).powi(2)).sum::<f64>() / n).sqrt();
cov / (std_a * std_b)
}
理想情况:策略间相关性 < 0.3,意味着它们的收益模式不重叠。
假设你有三个策略:
如何把这三个不同频率的信号合并?
Polars提供了asof_join,可以处理时间序列的「向前匹配」:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use polars::prelude::*;
use polars::time::*;
fn merge_multi_timeframe_signals(
daily_signals: DataFrame,
hourly_signals: DataFrame,
minute_signals: DataFrame,
) -> DataFrame {
// 确保时间列格式正确
let daily = daily_signals.lazy()
.with_column(col("datetime").cast(DataType::Datetime(TimeUnit::Milliseconds, None)));
let hourly = hourly_signals.lazy()
.with_column(col("datetime").cast(DataType::Datetime(TimeUnit::Milliseconds, None)));
let minute = minute_signals.lazy()
.with_column(col("datetime").cast(DataType::Datetime(TimeUnit::Milliseconds, None)));
// asof join:向前匹配最近的时间
let merged = daily
.asof_join(hourly, col("datetime"), col("datetime"), AsofJoinOptions {
strategy: AsofStrategy::Backward, // 向前匹配
tolerance: Some(Duration::parse("1d")), // 容差1天
..Default::default()
})
.asof_join(minute, col("datetime"), col("datetime"), AsofJoinOptions {
strategy: AsofStrategy::Backward,
tolerance: Some(Duration::parse("1h")),
..Default::default()
})
.collect()
.unwrap();
merged
}
不同策略的信号可能用不同的表达方式:
需要统一标准化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fn standardize_signals(df: &DataFrame) -> DataFrame {
df.lazy()
.with_columns([
// 将布尔信号转为[-1, 1]区间
when(col("buy_signal").eq(true))
.then(lit(1.0))
.when(col("sell_signal").eq(true))
.then(lit(-1.0))
.otherwise(lit(0.0))
.alias("signal_a"),
// 将数值信号归一化到[-1, 1]
col("signal_b").clip(lit(-1.0), lit(1.0)),
// signal_strength已经是[0, 1],转为[-1, 1]
(col("signal_strength") * lit(2.0) - lit(1.0))
.alias("signal_c"),
])
.collect()
.unwrap()
}
合并后的信号如何决策?有几种方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fn voting_method(df: &DataFrame) -> DataFrame {
df.lazy()
.with_columns([
// 统计买入信号数量
(col("signal_a").gt(0.0) + col("signal_b").gt(0.0) + col("signal_c").gt(0.0))
.alias("buy_votes"),
// 统计卖出信号数量
(col("signal_a").lt(0.0) + col("signal_b").lt(0.0) + col("signal_c").lt(0.0))
.alias("sell_votes"),
])
.with_columns([
// 多数投票
when(col("buy_votes").gt(col("sell_votes")))
.then(lit(1.0)) // 买入
.when(col("sell_votes").gt(col("buy_votes")))
.then(lit(-1.0)) // 卖出
.otherwise(lit(0.0)) // 不操作
.alias("final_signal"),
])
.collect()
.unwrap()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fn weighted_average_method(df: &DataFrame, weights: &[f64]) -> DataFrame {
df.lazy()
.with_columns([
(col("signal_a") * lit(weights[0]) +
col("signal_b") * lit(weights[1]) +
col("signal_c") * lit(weights[2]))
.alias("final_signal"),
])
.with_columns([
// 设置阈值
when(col("final_signal").gt(lit(0.5)))
.then(lit(1.0))
.when(col("final_signal").lt(lit(-0.5)))
.then(lit(-1.0))
.otherwise(lit(0.0))
.alias("trading_signal"),
])
.collect()
.unwrap()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn priority_method(df: &DataFrame) -> DataFrame {
// 假设策略A优先级最高
df.lazy()
.with_columns([
when(col("signal_a").neq(0.0))
.then(col("signal_a"))
.when(col("signal_b").neq(0.0))
.then(col("signal_b"))
.otherwise(col("signal_c"))
.alias("final_signal"),
])
.collect()
.unwrap()
}
不同市场环境下,策略表现不同。可以用动态权重:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
fn calculate_dynamic_weights(
performance_a: f64,
performance_b: f64,
performance_c: f64,
) -> Vec<f64> {
// 基于近期表现分配权重
let total = performance_a + performance_b + performance_c;
if total <= 0.0 {
// 所有策略都亏损,等权
return vec![0.33, 0.33, 0.33];
}
vec![
performance_a / total,
performance_b / total,
performance_c / total,
]
}
// 滚动窗口计算近期表现
fn rolling_performance_weights(
returns: &DataFrame,
window: usize,
) -> DataFrame {
returns.lazy()
.with_columns([
col("return_a")
.rolling_mean(window as i64)
.alias("perf_a"),
col("return_b")
.rolling_mean(window as i64)
.alias("perf_b"),
col("return_c")
.rolling_mean(window as i64)
.alias("perf_c"),
])
.collect()
.unwrap()
}
组合收益来自哪些策略?需要做绩效归因:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fn performance_attribution(
portfolio_returns: &[f64],
strategy_returns: &HashMap<String, Vec<f64>>,
weights: &[f64],
) -> AttributionReport {
let mut contributions = HashMap::new();
for (name, returns) in strategy_returns {
let contribution: f64 = returns.iter()
.zip(portfolio_returns.iter())
.map(|(r, _)| r.clone())
.sum();
contributions.insert(name.clone(), contribution);
}
AttributionReport {
total_return: portfolio_returns.iter().sum(),
strategy_contributions: contributions,
}
}
我们组合三个策略:
策略 | 类型 | 时间框架 | 权重 |
|---|---|---|---|
均值回归 | 逆势 | 日线 | 40% |
动量突破 | 趋势 | 日线 | 40% |
RSI反转 | 逆势 | 60分钟 | 20% |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 加载数据
let daily_data = load_daily_data("./data/daily").await?;
let hourly_data = load_hourly_data("./data/hourly").await?;
// 2. 生成各策略信号
let mean_reversion_signals = MeanReversionStrategy::new(20, 2.0)
.generate_signals(&daily_data)?;
let momentum_signals = MomentumStrategy::new(14, 70, 30)
.generate_signals(&daily_data)?;
let rsi_signals = RSIStrategy::new(14, 70, 30)
.generate_signals(&hourly_data)?;
// 3. 合并信号
let merged = merge_multi_timeframe_signals(
mean_reversion_signals,
momentum_signals,
rsi_signals,
);
// 4. 加权决策
let weights = vec![0.4, 0.4, 0.2];
let final_signals = weighted_average_method(&merged, &weights);
// 5. 回测
let backtester = Backtester::new(1_000_000.0);
let result = backtester.run(&daily_data, &final_signals)?;
// 6. 输出结果
println!("组合策略绩效:");
println!(" 年化收益: {:.2}%", result.annual_return * 100.0);
println!(" 夏普比率: {:.2}", result.sharpe_ratio);
println!(" 最大回撤: {:.2}%", result.max_drawdown * 100.0);
Ok(())
}
指标 | 均值回归 | 动量策略 | RSI反转 | 组合 |
|---|---|---|---|---|
年化收益 | 18.6% | 22.3% | 15.8% | 24.1% |
夏普比率 | 1.34 | 1.12 | 0.95 | 1.68 |
最大回撤 | -15.2% | -22.8% | -18.3% | -11.5% |
策略相关性 | - | 0.32 | 0.18 | - |
结论:组合策略在收益、夏普、回撤三个维度都优于单一策略。
组合需要定期再平衡,调整各策略权重:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fn rebalance_portfolio(
current_weights: &[f64],
target_weights: &[f64],
positions: &HashMap<String, Position>,
cash: f64,
) -> Vec<Trade> {
let total_value = cash + positions.values()
.map(|p| p.market_value())
.sum::<f64>();
let mut trades = Vec::new();
for (i, (current, target)) in current_weights.iter().zip(target_weights.iter()).enumerate() {
let current_value = total_value * current;
let target_value = total_value * target;
let diff = target_value - current_value;
if diff.abs() > total_value * 0.01 { // 超过1%才调整
trades.push(Trade {
strategy_index: i,
amount: diff,
});
}
}
trades
}
再平衡频率:
⚠️ 多策略组合也有风险:
建议:策略数量控制在3-5个,定期监控相关性变化。
多策略组合是量化投资的高级课题。
它不仅是技术问题——如何合并信号、如何分配权重——更是艺术问题——如何选择互补的策略、如何平衡收益与风险。
记住一句话:组合的目标不是收益最大化,而是风险调整后收益最大化。
下一篇,我们将聊聊风控模块设计:
(全文完)