首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >队列中的所有消息都分配给单个订阅者

队列中的所有消息都分配给单个订阅者
EN

Stack Overflow用户
提问于 2021-10-15 19:31:29
回答 1查看 66关注 0票数 1

我已经向一个持久的RabbitMQ队列发送了大约10000条消息,所有这些消息都处于“就绪”状态。下面的代码在处理每条消息时休眠10秒。只要我运行调用下面函数的rust应用程序,队列中所有这些消息的状态几乎立即从"Ready“变为"Unacked”。代码继续一次处理一条消息。

如果我启动另一个运行下面代码的实例,除非我终止第一个应用程序,否则不会向该实例调度任何消息。

我想要的行为是订阅者一次最多只能接收一条消息,并且在确认当前消息后才能获得下一条消息。

我应该更改什么配置?

代码语言:javascript
复制
use lapin::{options::*, Connection, ConnectionProperties, Result};
use futures_util::stream::StreamExt;
//use std::future::Future;
use tracing::info;
use slog::Drain;

pub fn lapin_test_consumer()->std::result::Result<i64, Box<std::io::Error>> {
    //env_logger::init();
    let log_file_name:&str="/tmp/lapin_test_consumer.log";
    let log_file_path=std::path::Path::new(&log_file_name);
    let dir_file_path=log_file_path.parent().unwrap();
    std::fs::create_dir_all(dir_file_path).unwrap();
    
    let log_file_handler_option = std::fs::OpenOptions::new()
        .create(true)
        .write(true)
        .truncate(true)
        .open(log_file_name)
        //.unwrap()
    ;
    let log_file_handler=match log_file_handler_option
    {
        Ok(f)=>f
        ,Err(err)=>{
            println!("{:?}", err);
            panic!("Unable to open the log file '{}', '{:?}'",log_file_name,err);
        }
    };
    
    let my_log_drain = slog_async::Async::new(
        slog::Duplicate::new(
            slog::Filter::new(
                slog_term::FullFormat::new(
                    slog_term::PlainSyncDecorator::new(log_file_handler,)
                )
                .use_file_location()
                .build()
                ,
                |record: &slog::Record|
                {
                    record.level().is_at_least(slog::Level::Debug)
                }    
            )
            //,slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build()
            ,slog::Duplicate::new(
                slog::Filter::new(
                    slog_term::FullFormat::new(
                        slog_term::PlainSyncDecorator::new(std::io::stderr(),)
                    )
                    .use_file_location()
                    .build()
                    ,
                    //|record: &slog::Record| record.level().is_at_least(slog::Level::Warning)
                    |record: &slog::Record|
                    {
                        record.level().is_at_least(slog::Level::Debug)
                    }    
                    
                )
                //,slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build()
                ,slog_term::FullFormat::new(slog_term::TermDecorator::new().build()).use_file_location().build()
            )
        ).fuse()
    )
    .build()
    .fuse()
    ;
    let my_slog_logger=slog::Logger::root(my_log_drain, slog::o!("n" => env!("CARGO_PKG_NAME"),"v" => env!("CARGO_PKG_VERSION")));
    
    if std::env::var("RUST_LOG").is_err() {
        std::env::set_var("RUST_LOG", "info");
    }


    let addr:String = std::env::var("AMQP_ADDR").unwrap_or_else(
        |_|{
            format!("amqp://{}:{}@{}:{}/{}?heartbeat=0"
                ,"abcd"//aMQPJobUser
                ,"abcd"//aMQPJobPasswd
                ,"somewhere.com"//aMQPJobHost
                ,5672//aMQPJobPort
                ,"lapin_test.test"//aMQPJobVirtualHost
            ).into()
        }
    );
    let amqp_conn_url:&str=&addr.as_str();
    

    //see "https://docs.rs/lapin/1.8.0/lapin/struct.Consumer.html"
    let res: std::result::Result<i64, Box<std::io::Error>> = async_global_executor::block_on(async {
            let sleep_duration_ms:u64=10000u64;
            let conn_result:std::result::Result<lapin::Connection, lapin::Error> = Connection::connect(
                &amqp_conn_url,
                ConnectionProperties::default().with_default_executor(2),//set the number of threads
            )
            .await;
            let conn:lapin::Connection=match conn_result{
                Err(err)=>{
                    let bt=backtrace::Backtrace::new();
                    let log_message=format!(">>>>>At lapin_test_publisher(), pos 1b, some error has been encountered while trying to establish AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
                    slog::error!(my_slog_logger,"{}",log_message);
                    let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
                    return std::result::Result::Err(Box::new(custom_error));
                }
                Ok(conn2)=>{info!("CONNECTED");conn2}
            };
            
            
            let mut message_cnt:i64=0i64;let _some_i64:i64=message_cnt;
            let channel_a_result:Result<lapin::Channel>=conn.create_channel().await;
            let channel_a:lapin::Channel=match channel_a_result{
                Err(err)=>{
                    let bt=backtrace::Backtrace::new();
                    let log_message=format!(">>>>>At lapin_test_consumer(), pos 1b, some error has been encountered while trying to obtain a channel from AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
                    slog::error!(my_slog_logger,"{}",log_message);
                    let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
                    return std::result::Result::Err(Box::new(custom_error));
                }
                Ok(channel)=>{channel}
            };
            
            channel_a
                .exchange_declare(
                    "my_direct_exchange"
                    ,lapin::ExchangeKind::Direct
                    ,lapin::options::ExchangeDeclareOptions{
                        passive:false
                        ,durable:true
                        ,auto_delete:false
                        ,internal:false
                        ,nowait:false
                    }
                    ,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
                )
            ;
            
            let queue = channel_a
                .queue_declare(
                    "hello.persistent"//:&str queue name
                    ,lapin::options::QueueDeclareOptions{
                        passive:false,
                        durable:true,
                        exclusive:false,
                        auto_delete:false,
                        nowait:false,
                    }
                    ,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
                )
                .await
                .expect("queue_declare")
            ;
            channel_a
                .queue_bind(
                    "hello.persistent"
                    ,"my_direct_exchange"
                    ,"hello.persistent"
                    , lapin::options::QueueBindOptions{
                        nowait:false
                    }
                    ,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
                )
            ;
            
            let consumer_a_result:Result<lapin::Consumer>=channel_a
                .basic_consume(
                    "hello.persistent",
                    "my_consumer",
                    lapin::options::BasicConsumeOptions{
                        no_local: true,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-local"
                        no_ack: false,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-ack" "If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application."
                        exclusive: false,
                        nowait: false,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-wait" "If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception."
                    },
                    lapin::types::FieldTable::default(),
                )
                .await;
            let mut consumer_a:lapin::Consumer=match consumer_a_result{
                Err(err)=>{
                    let bt=backtrace::Backtrace::new();
                    let log_message=format!(">>>>>At lapin_test_consumer(), pos 1b, some error has been encountered while trying to obtain a consumer from AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
                    slog::error!(my_slog_logger,"{}",log_message);
                    let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
                    return std::result::Result::Err(Box::new(custom_error));
                    //return Err(err);
                }
                Ok(consumer)=>{consumer}
            };
        
            while let Some(delivery) = consumer_a.next().await {
                let (channel2, delivery2) = delivery.expect("error in consumer");
                message_cnt+=1;
                slog::info!(my_slog_logger,"------------------------------------------------------------------, message_cnt is:{}",&message_cnt);
                let s:String = match String::from_utf8(delivery2.data.to_owned()) {//delivery.data is of type Vec<u8>
                    Ok(v) => v,
                    Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
                };          
                let log_message:String=format!("message_cnt is:{}, delivery_tag is:{}, exchange is:{}, routing_key is:{}, redelivered is:{}, properties is:'{:?}', received data is:'{:?}'"
                    ,&message_cnt
                    ,&delivery2.delivery_tag
                    ,&delivery2.exchange
                    ,&delivery2.routing_key
                    ,&delivery2.redelivered
                    ,&delivery2.properties
                    ,&s
                );
                slog::info!(my_slog_logger,"{}",log_message);
                std::thread::sleep(std::time::Duration::from_millis(sleep_duration_ms));
                slog::info!(my_slog_logger,"After {}ms sleep.",sleep_duration_ms);
                channel2
                    .basic_ack(delivery2.delivery_tag, BasicAckOptions::default())
                    .await
                    .expect("ack")
                ;
            }
            Ok(message_cnt)
        }
    );
    res
}
EN

回答 1

Stack Overflow用户

发布于 2021-10-15 20:54:42

解决了。现在,每个消费者一次只挑选一条消息。未确认消息的数量等于消费者的数量。这就是我一直在寻找的。解决方案在于设置通道的basic_qos,我在获取AMQP通道后包含了下面的代码。

代码语言:javascript
复制
            channel_a.basic_qos(
                1
                ,BasicQosOptions{global:true}
            );
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69589745

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档