我已经向一个持久的RabbitMQ队列发送了大约10000条消息,所有这些消息都处于“就绪”状态。下面的代码在处理每条消息时休眠10秒。只要我运行调用下面函数的rust应用程序,队列中所有这些消息的状态几乎立即从"Ready“变为"Unacked”。代码继续一次处理一条消息。
如果我启动另一个运行下面代码的实例,除非我终止第一个应用程序,否则不会向该实例调度任何消息。
我想要的行为是订阅者一次最多只能接收一条消息,并且在确认当前消息后才能获得下一条消息。
我应该更改什么配置?
use lapin::{options::*, Connection, ConnectionProperties, Result};
use futures_util::stream::StreamExt;
//use std::future::Future;
use tracing::info;
use slog::Drain;
pub fn lapin_test_consumer()->std::result::Result<i64, Box<std::io::Error>> {
//env_logger::init();
let log_file_name:&str="/tmp/lapin_test_consumer.log";
let log_file_path=std::path::Path::new(&log_file_name);
let dir_file_path=log_file_path.parent().unwrap();
std::fs::create_dir_all(dir_file_path).unwrap();
let log_file_handler_option = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(log_file_name)
//.unwrap()
;
let log_file_handler=match log_file_handler_option
{
Ok(f)=>f
,Err(err)=>{
println!("{:?}", err);
panic!("Unable to open the log file '{}', '{:?}'",log_file_name,err);
}
};
let my_log_drain = slog_async::Async::new(
slog::Duplicate::new(
slog::Filter::new(
slog_term::FullFormat::new(
slog_term::PlainSyncDecorator::new(log_file_handler,)
)
.use_file_location()
.build()
,
|record: &slog::Record|
{
record.level().is_at_least(slog::Level::Debug)
}
)
//,slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build()
,slog::Duplicate::new(
slog::Filter::new(
slog_term::FullFormat::new(
slog_term::PlainSyncDecorator::new(std::io::stderr(),)
)
.use_file_location()
.build()
,
//|record: &slog::Record| record.level().is_at_least(slog::Level::Warning)
|record: &slog::Record|
{
record.level().is_at_least(slog::Level::Debug)
}
)
//,slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build()
,slog_term::FullFormat::new(slog_term::TermDecorator::new().build()).use_file_location().build()
)
).fuse()
)
.build()
.fuse()
;
let my_slog_logger=slog::Logger::root(my_log_drain, slog::o!("n" => env!("CARGO_PKG_NAME"),"v" => env!("CARGO_PKG_VERSION")));
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
let addr:String = std::env::var("AMQP_ADDR").unwrap_or_else(
|_|{
format!("amqp://{}:{}@{}:{}/{}?heartbeat=0"
,"abcd"//aMQPJobUser
,"abcd"//aMQPJobPasswd
,"somewhere.com"//aMQPJobHost
,5672//aMQPJobPort
,"lapin_test.test"//aMQPJobVirtualHost
).into()
}
);
let amqp_conn_url:&str=&addr.as_str();
//see "https://docs.rs/lapin/1.8.0/lapin/struct.Consumer.html"
let res: std::result::Result<i64, Box<std::io::Error>> = async_global_executor::block_on(async {
let sleep_duration_ms:u64=10000u64;
let conn_result:std::result::Result<lapin::Connection, lapin::Error> = Connection::connect(
&amqp_conn_url,
ConnectionProperties::default().with_default_executor(2),//set the number of threads
)
.await;
let conn:lapin::Connection=match conn_result{
Err(err)=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At lapin_test_publisher(), pos 1b, some error has been encountered while trying to establish AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
}
Ok(conn2)=>{info!("CONNECTED");conn2}
};
let mut message_cnt:i64=0i64;let _some_i64:i64=message_cnt;
let channel_a_result:Result<lapin::Channel>=conn.create_channel().await;
let channel_a:lapin::Channel=match channel_a_result{
Err(err)=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At lapin_test_consumer(), pos 1b, some error has been encountered while trying to obtain a channel from AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
}
Ok(channel)=>{channel}
};
channel_a
.exchange_declare(
"my_direct_exchange"
,lapin::ExchangeKind::Direct
,lapin::options::ExchangeDeclareOptions{
passive:false
,durable:true
,auto_delete:false
,internal:false
,nowait:false
}
,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
)
;
let queue = channel_a
.queue_declare(
"hello.persistent"//:&str queue name
,lapin::options::QueueDeclareOptions{
passive:false,
durable:true,
exclusive:false,
auto_delete:false,
nowait:false,
}
,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
)
.await
.expect("queue_declare")
;
channel_a
.queue_bind(
"hello.persistent"
,"my_direct_exchange"
,"hello.persistent"
, lapin::options::QueueBindOptions{
nowait:false
}
,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
)
;
let consumer_a_result:Result<lapin::Consumer>=channel_a
.basic_consume(
"hello.persistent",
"my_consumer",
lapin::options::BasicConsumeOptions{
no_local: true,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-local"
no_ack: false,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-ack" "If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application."
exclusive: false,
nowait: false,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-wait" "If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception."
},
lapin::types::FieldTable::default(),
)
.await;
let mut consumer_a:lapin::Consumer=match consumer_a_result{
Err(err)=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At lapin_test_consumer(), pos 1b, some error has been encountered while trying to obtain a consumer from AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
//return Err(err);
}
Ok(consumer)=>{consumer}
};
while let Some(delivery) = consumer_a.next().await {
let (channel2, delivery2) = delivery.expect("error in consumer");
message_cnt+=1;
slog::info!(my_slog_logger,"------------------------------------------------------------------, message_cnt is:{}",&message_cnt);
let s:String = match String::from_utf8(delivery2.data.to_owned()) {//delivery.data is of type Vec<u8>
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
let log_message:String=format!("message_cnt is:{}, delivery_tag is:{}, exchange is:{}, routing_key is:{}, redelivered is:{}, properties is:'{:?}', received data is:'{:?}'"
,&message_cnt
,&delivery2.delivery_tag
,&delivery2.exchange
,&delivery2.routing_key
,&delivery2.redelivered
,&delivery2.properties
,&s
);
slog::info!(my_slog_logger,"{}",log_message);
std::thread::sleep(std::time::Duration::from_millis(sleep_duration_ms));
slog::info!(my_slog_logger,"After {}ms sleep.",sleep_duration_ms);
channel2
.basic_ack(delivery2.delivery_tag, BasicAckOptions::default())
.await
.expect("ack")
;
}
Ok(message_cnt)
}
);
res
}发布于 2021-10-15 20:54:42
解决了。现在,每个消费者一次只挑选一条消息。未确认消息的数量等于消费者的数量。这就是我一直在寻找的。解决方案在于设置通道的basic_qos,我在获取AMQP通道后包含了下面的代码。
channel_a.basic_qos(
1
,BasicQosOptions{global:true}
);https://stackoverflow.com/questions/69589745
复制相似问题