我有一个案例,我有一个带有axum的http服务器,它以非常高的吞吐量接收有效载荷(每秒可以达到20m )。我需要从请求中提取这些字节,并对它们进行一些繁重的计算。内存达到不可预测的高(可能达到5 5Gb)的问题。这是我试图获得它的当前设置:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel::<WriteRequest>(32);
tokio::spawn(async move {
while let Some(payload) = rx.recv().await {
tokio::task::spawn_blocking(move || {
// Run heavy computation here...
heavy_computation(payload)
}).await;
}
});
// build our application with a route
let app = Router::new()
.route("/write", post(move |req: Bytes| async move {
let data: WriteRequest = Message::decode(req);
// send data here
let _ = tx.send(data).await;
"ok"
}));
let addr = ([0, 0, 0, 0], 8080).into();
let server = axum::Server::bind(&addr)
.serve(app.into_make_service());
if let Err(e) = server.await {
error!("server error: {}", e);
}
Ok(())
}我认为是bounded channel上的反压力使请求不断堆积,直到它们可以被发送到另一个task进行处理,从而导致高内存。因为即使我尝试用一个简单的关于200ms的sleep替换heavy_copmutation,结果也是一样的。如果我去掉了heavy_computation部分,内存仍然很低。
解决这个问题的正确方法是什么?或者在这种高吞吐量的情况下,这里什么也做不了?
非常感谢!
发布于 2021-09-16 09:19:18
感觉就像在heavy_computation忙碌的时候,数以百万计的待处理请求堆积起来。需要限制一次处理的已接受连接/请求的数量。要达到5 5Gb的使用率,只需要25K挂起的请求和200Kb的有效负载,甚至不是数百万。
这是一个已知问题,但那里的人建议使用来自hyper doesn't have a max connections setting的ConcurrencyLimit中间件,并将其配置到服务器中,或者创建一个自定义的接受/处理循环。
也可以通过axum将此中间件传递到塔中,但如果您也可以这样做,您可以尝试直接访问塔,甚至是裸露的hyper,并使用适用于此工作负载的原语实现它。
https://stackoverflow.com/questions/69204162
复制相似问题