在高频交易中,延迟是生命。
每一微秒都至关重要。而内存分配是延迟的主要来源之一:
今天我们聊聊如何用Rust实现**零分配(Zero Allocation)**设计。
1
2
3
4
5
// 每次处理一个tick都创建新对象
for tick in ticks {
let order = Order::new(tick.price, tick.volume);
process_order(&order);
} // order在这里被销毁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use std::sync::Mutex;
use std::ops::{Deref, DerefMut};
pub struct ObjectPool<T> {
pool: Mutex<Vec<T>>,
factory: fn() -> T,
}
impl<T: Send> ObjectPool<T> {
pub fn new(capacity: usize, factory: fn() -> T) -> Self {
let mut pool = Vec::with_capacity(capacity);
for _ in 0..capacity {
pool.push(factory());
}
Self {
pool: Mutex::new(pool),
factory,
}
}
// 修复生命周期标注
pub fn acquire(&self) -> PooledObject<'_, T> {
let mut pool = self.pool.lock().expect("Mutex poisoned");
let obj = pool.pop().unwrap_or_else(|| (self.factory)());
PooledObject { obj: Some(obj), pool: self }
}
pub fn release(&self, obj: T) {
let mut pool = self.pool.lock().expect("Mutex poisoned");
pool.push(obj);
}
}
pub struct PooledObject<'a, T: Send> {
obj: Option<T>,
pool: &'a ObjectPool<T>,
}
// 实现 Deref 使其可用
impl<'a, T: Send> Deref for PooledObject<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.obj.as_ref().unwrap()
}
}
impl<'a, T: Send> DerefMut for PooledObject<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.obj.as_mut().unwrap()
}
}
impl<'a, T: Send> Drop for PooledObject<'a, T> {
fn drop(&mut self) {
if let Some(obj) = self.obj.take() {
self.pool.release(obj);
}
}
}
Send?在 Rust 中,Send 是一个 Marker Trait(标记特质)。
Send,意味着这个类型的所有权可以安全地从一个线程**转移(Move)**到另一个线程。i32, String, Vec)都是 Send 的。Rc(引用计数)。因为它内部的计数器不是原子操作,多线程同时修改会导致内存错误。Mutex 的“潜规则”你在代码里使用了 Mutex<Vec<T>>。为了让 ObjectPool<T> 能在多线程间共享(比如放在 Arc 里),ObjectPool 必须实现 Sync。
这里的连锁反应是这样的:
ObjectPool<T> 是 Sync 的(可以被多个线程同时访问引用)。Sync 或能在线程间安全转移。Mutex<U> 来说,只有当 U 是 Send 的时候,Mutex<U> 才是 Sync 的。U 是 Vec<T>。而 Vec<T> 只有在 T 是 Send 时才是 Send。结论: 如果
T不满足Send,Mutex<Vec<T>>就无法在线程间同步,高并发代码在编译阶段就会报错。
1
2
3
4
5
// 每个对象单独分配
let mut orders = Vec::new();
for _ in 0..1000 {
orders.push(Box::new(Order::default()));
}
1
2
3
4
5
6
7
8
9
use bumpalo::Bump;
let arena = Bump::new();
// 所有对象在Arena中连续分配
let orders: Vec<&mut Order> = (0..1000)
.map(|_| arena.alloc(Order::default()))
.collect();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
pub struct MarketDataProcessor {
// 预分配缓冲区
buffer: Vec<u8>,
orders: Vec<Order>,
}
impl MarketDataProcessor {
pub fn new() -> Self {
Self {
buffer: vec![0u8; 1024 * 1024], // 1MB缓冲区
orders: Vec::with_capacity(1000),
}
}
pub fn process(&mut self, data: &[u8]) {
if data.len() > self.buffer.capacity() {
// 记录警告或处理异常,而不是让 Vec 自动扩容导致丢帧
eprintln!("Warning: Data size exceeds buffer capacity!");
}
self.buffer.extend_from_slice(data);
// 重置订单列表,不重新分配
self.orders.clear();
}
}
方法 | 处理100万条数据 | 内存分配次数 |
|---|---|---|
普通创建 | 850ms | 200万次 |
对象池 | 120ms | 0次 |
Arena | 95ms | 1次 |
在高频交易中,控制内存分配就是控制延迟。
Rust的所有权系统让我们可以实现真正的零分配设计。
下一篇,纳秒级计时:Rust量化系统的性能剖析与热点优化
敬请期待。
(全文完)