首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >锈蚀AWS多部分上传使用rusoto,多线程(人造丝)惊慌失措‘没有反应堆运行...`

锈蚀AWS多部分上传使用rusoto,多线程(人造丝)惊慌失措‘没有反应堆运行...`
EN

Stack Overflow用户
提问于 2021-03-10 03:40:35
回答 1查看 690关注 0票数 1

我试图将一个文件上传到aws in rust,因为我使用的是rusoto_s3的s3锈蚀客户端,当这些部分是从单个线程发送时,我设法使多部分上传代码工作起来,然而,这不是我想要的,我想上传大文件,我希望能够在多个线程中发送这些部分,为此,我做了一点点谷歌搜索,并遇到了人丝

关于多部分上传的工作方式如下:

  1. 初始化多部分-> aws将返回一个ID
  2. 使用此ID发送不同的部件,传递文件块,部件编号-> aws将返回一个Etag
  3. 发送完所有部件后,发送包含所有已完成部件的完整上载请求,因为数组包含Etag和部件编号。

我是生锈新手,来自C++和Java背景,下面是我的代码:

代码语言:javascript
复制
#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
    let now = Instant::now();
    dotenv().ok();
    let local_filename = "./files/test_big.DCM";
    let destination_filename = "24_time_test.dcm";

    let mut file = std::fs::File::open(local_filename).unwrap();
    const CHUNK_SIZE: usize = 7_000_000;
    let mut buffer = Vec::with_capacity(CHUNK_SIZE);

    let client = super::get_client().await;
    let create_multipart_request = CreateMultipartUploadRequest {
        bucket: client.bucket_name.to_owned(),
        key: destination_filename.to_owned(),
        ..Default::default()
    };

    // Start the multipart upload and note the upload_id generated
    let response = client
        .s3
        .create_multipart_upload(create_multipart_request)
        .await
        .expect("Couldn't create multipart upload");
    let upload_id = response.upload_id.unwrap();

    // Create upload parts
    let create_upload_part = |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
        UploadPartRequest {
            body: Some(body.into()),
            bucket: client.bucket_name.to_owned(),
            key: destination_filename.to_owned(),
            upload_id: upload_id.to_owned(),
            part_number: part_number,
            ..Default::default()
        }
    };

    let completed_parts = Arc::new(Mutex::new(vec![]));

    rayon::scope(|scope| {
        let mut part_number = 1;
        loop {
            let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
            println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
            file.by_ref()
                .take(maximum_bytes_to_read as u64)
                .read_to_end(&mut buffer)
                .unwrap();

            println!("length: {}", buffer.len());
            println!("part_number: {}", part_number);
            if buffer.len() == 0 {
                // The file has ended.
                break;
            }

            let next_buffer = Vec::with_capacity(CHUNK_SIZE);
            let data_to_send = buffer;
            let completed_parts_cloned = completed_parts.clone();
            scope.spawn(move |_| {
                let part = create_upload_part(data_to_send.to_vec(), part_number);
                {
                    let part_number = part.part_number;
                    let client = executor::block_on(super::get_client());
                    let response = executor::block_on(client.s3.upload_part(part));

                    completed_parts_cloned.lock().unwrap().push(CompletedPart {
                        e_tag: response
                            .expect("Couldn't complete multipart upload")
                            .e_tag
                            .clone(),
                        part_number: Some(part_number),
                    });
                }
            });

            buffer = next_buffer;
            part_number = part_number + 1;
        }
    });

    let completed_upload = CompletedMultipartUpload {
        parts: Some(completed_parts.lock().unwrap().to_vec()),
    };

    let complete_req = CompleteMultipartUploadRequest {
        bucket: client.bucket_name.to_owned(),
        key: destination_filename.to_owned(),
        upload_id: upload_id.to_owned(),
        multipart_upload: Some(completed_upload),
        ..Default::default()
    };
    client
        .s3
        .complete_multipart_upload(complete_req)
        .await
        .expect("Couldn't complete multipart upload");
    println!(
        "time taken: {}, with chunk:: {}",
        now.elapsed().as_secs(),
        CHUNK_SIZE
    );
}

下面是我得到的输出和错误的示例:

代码语言:javascript
复制
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 1
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 2
maximum_bytes_to_read: 7000000
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
length: 7000000

我在googled上搜索了这个错误,但是我对它的实际内容并没有一个清晰的理解:

代码语言:javascript
复制
there is no reactor running, must be called from the context of Tokio runtime” 

下面是我发现的:有同样错误的另一个问题

还有另一个问题

这似乎是一些兼容性问题,因为s3可能正在使用与我所拥有的tokio版本不兼容的某些版本的tokio

以下是一些相关的依赖关系:

代码语言:javascript
复制
tokio = { version = "1", features = ["full"] }
tokio-compat-02 = "0.1.2"
rusoto_s3 = "0.46.0"
rusoto_core = "0.46.0"
rusoto_credential = "0.46.0"
rayon = "1.5.0"

我认为主要的问题是想要在一个async线程中运行rayon代码。我尝试将我的async代码更改为使用executor::block_on阻止代码,我也花了一些时间试图让编译器高兴,我有多个线程他们都想写到let completed_parts = Arc::new(Mutex::new(vec![]));,所以我在这里做了一些克隆来让编译器高兴。

另外,如果我用过的蜡笔很重要,下面是它们:

代码语言:javascript
复制
extern crate dotenv;
extern crate tokio;
use bytes::Bytes;
use dotenv::dotenv;
use futures::executor;
use futures::*;
use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials};
use rusoto_s3::util::{PreSignedRequest, PreSignedRequestOption};
use rusoto_s3::PutObjectRequest;
use rusoto_s3::StreamingBody;
use rusoto_s3::{
    CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
    CreateMultipartUploadRequest, UploadPartRequest, S3,
};

use std::io::Read;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use tokio::fs;

新生锈,所以有很多移动的碎片,以使这一项正确!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-11 08:13:13

谢谢@Jmb的讨论,我摆脱了线程,并按照以下方式spawn了一个tokio任务:

创建一个持有或期货的向量,以便我们可以等待它们:

代码语言:javascript
复制
let mut multiple_parts_futures = Vec::new();

生成async任务:

代码语言:javascript
复制
loop { // loop file chuncks
    ...
    let send_part_task_future = tokio::task::spawn(async move {
    // Upload part
    ...
}

然后等待所有的未来:

代码语言:javascript
复制
let _results = futures::future::join_all(multiple_parts_futures).await;

值得一提的是,完成的部分需要排序:

代码语言:javascript
复制
let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);

整个守则是:

代码语言:javascript
复制
#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
    let now = Instant::now();
    dotenv().ok();
    let local_filename = "./files/test_big.DCM";
    let destination_filename = generate_unique_name();
    let destination_filename_clone = destination_filename.clone();
    let mut file = std::fs::File::open(local_filename).unwrap();
    const CHUNK_SIZE: usize = 6_000_000;
    let mut buffer = Vec::with_capacity(CHUNK_SIZE);

    let client = super::get_client().await;
    let create_multipart_request = CreateMultipartUploadRequest {
        bucket: client.bucket_name.to_owned(),
        key: destination_filename.to_owned(),
        ..Default::default()
    };

    // Start the multipart upload and note the upload_id generated
    let response = client
        .s3
        .create_multipart_upload(create_multipart_request)
        .await
        .expect("Couldn't create multipart upload");
    let upload_id = response.upload_id.unwrap();

    let upload_id_clone = upload_id.clone();
    // Create upload parts
    let create_upload_part = move |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
        UploadPartRequest {
            body: Some(body.into()),
            bucket: client.bucket_name.to_owned(),
            key: destination_filename_clone.to_owned(),
            upload_id: upload_id_clone.to_owned(),
            part_number: part_number,
            ..Default::default()
        }
    };

    let create_upload_part_arc = Arc::new(create_upload_part);
    let completed_parts = Arc::new(Mutex::new(vec![]));

    let mut part_number = 1;

    let mut multiple_parts_futures = Vec::new();
    loop {
        let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
        println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
        file.by_ref()
            .take(maximum_bytes_to_read as u64)
            .read_to_end(&mut buffer)
            .unwrap();
        println!("length: {}", buffer.len());
        println!("part_number: {}", part_number);
        if buffer.len() == 0 {
            // The file has ended.
            break;
        }
        let next_buffer = Vec::with_capacity(CHUNK_SIZE);
        let data_to_send = buffer;
        let completed_parts_cloned = completed_parts.clone();
        let create_upload_part_arc_cloned = create_upload_part_arc.clone();
        let send_part_task_future = tokio::task::spawn(async move {
            let part = create_upload_part_arc_cloned(data_to_send.to_vec(), part_number);
            {
                let part_number = part.part_number;
                let client = super::get_client().await;
                let response = client.s3.upload_part(part).await;
                completed_parts_cloned.lock().unwrap().push(CompletedPart {
                    e_tag: response
                        .expect("Couldn't complete multipart upload")
                        .e_tag
                        .clone(),
                    part_number: Some(part_number),
                });
            }
        });
        multiple_parts_futures.push(send_part_task_future);
        buffer = next_buffer;
        part_number = part_number + 1;
    }
    let client = super::get_client().await;
    println!("waiting for futures");
    let _results = futures::future::join_all(multiple_parts_futures).await;

    let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
    completed_parts_vector.sort_by_key(|part| part.part_number);
    println!("futures done");
    let completed_upload = CompletedMultipartUpload {
        parts: Some(completed_parts_vector),
    };

    let complete_req = CompleteMultipartUploadRequest {
        bucket: client.bucket_name.to_owned(),
        key: destination_filename.to_owned(),
        upload_id: upload_id.to_owned(),
        multipart_upload: Some(completed_upload),
        ..Default::default()
    };

    client
        .s3
        .complete_multipart_upload(complete_req)
        .await
        .expect("Couldn't complete multipart upload");
    println!(
        "time taken: {}, with chunk:: {}",
        now.elapsed().as_secs(),
        CHUNK_SIZE
    );
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66558012

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档