From dc2be288741a613dad06cb8ecace9d847deade3f Mon Sep 17 00:00:00 2001 From: sim1222 Date: Sat, 3 Aug 2024 02:09:28 +0900 Subject: [PATCH] impl file stream upload --- src/lib.rs | 288 ++++++++++++++++++++++++++++++++++-------- src/main.rs | 267 ++++++++++++++++++++++++++++++++++----- src/types/response.rs | 6 +- src/util.rs | 238 +++++++++++++++++++++++++++++++++- 4 files changed, 710 insertions(+), 89 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 554cd17..e19e767 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,10 @@ use aws_config::Region; use aws_sdk_s3::config::Credentials; -use constants::APP_VERSION; +use constants::{APP_VERSION, CHUNK_SIZE}; use std::{io::BufReader, path::PathBuf, sync::Arc}; -use util::{check_job, file_detail, list_files, multipart_upload, TargetFile}; +use util::{ + check_job, file_detail, list_files, multipart_upload, multipart_upload_from_path, single_upload, +}; mod client; mod constants; @@ -13,6 +15,17 @@ pub struct RakutenDriveClient { client: client::Client, } +pub struct UploadFile { + pub data: Vec, + pub path: String, +} + +#[derive(Debug, Clone)] +pub struct TargetFile { + pub file: PathBuf, + pub path: String, +} + impl RakutenDriveClient { pub async fn try_new(refresh_token_str: String) -> anyhow::Result { let client = client::Client::try_new(refresh_token_str).await?; @@ -34,8 +47,7 @@ impl RakutenDriveClient { } pub async fn upload( &self, - file_path: &str, - file_data: &[u8], + files: Vec, prefix: Option<&str>, fake_size: Option, pb: Option, @@ -44,12 +56,17 @@ impl RakutenDriveClient { host_id: self.client.host_id.clone(), path: prefix.unwrap_or("").to_string(), upload_id: "".to_string(), - file: vec![types::request::CheckUploadRequestFile { - path: file_path.to_string(), - size: fake_size.unwrap_or(file_data.len() as u64) as i64, - }], + file: files + .iter() + .map(|file| types::request::CheckUploadRequestFile { + path: file.path.clone(), + size: fake_size.unwrap_or(file.data.len() as u64) as i64, + }) + .collect(), }; + println!("prefix: {:?}", prefix.unwrap_or("")); + let check_upload_res = self.client.check_upload(req).await.unwrap(); let token_res = self.client.get_upload_token().await.unwrap(); @@ -68,46 +85,211 @@ impl RakutenDriveClient { .force_path_style(true) .build(); - multipart_upload( - &token_res, - &check_upload_res.bucket, - &check_upload_res.file[0], - &check_upload_res.prefix, - &check_upload_res.region, - &check_upload_res.upload_id, - file_data, - pb, - ) - .await - .unwrap(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(5)); + let mut handles = Vec::new(); + + for (i, file) in files.iter().enumerate() { + let token_res = token_res.clone(); + let check_upload_res = check_upload_res.clone(); + let file_data = file.data.clone(); + let pb = pb.clone(); + let semaphore = semaphore.clone().acquire_owned().await.unwrap(); + if file_data.len() > CHUNK_SIZE * 10 { + multipart_upload( + &token_res, + &check_upload_res.bucket, + &check_upload_res.file[i], + &check_upload_res.prefix, + &check_upload_res.region, + &check_upload_res.upload_id, + &file_data, + pb.clone(), + ) + .await + .unwrap(); + } else { + handles.push(tokio::spawn(async move { + let _permit = semaphore; + loop { + match single_upload( + &token_res, + &check_upload_res.bucket, + &check_upload_res.file[i], + &check_upload_res.prefix, + &check_upload_res.region, + &check_upload_res.upload_id, + &file_data, + ) + .await + { + Ok(_) => { + // println!("Uploaded: {:?}", i); + break; + } + Err(e) => { + // println!("Error"); + continue; + } + } + } + if let Some(pb) = pb { + pb.inc(1); + } + })); + } + } + + for handle in handles { + handle.await.unwrap(); + } + + // multipart_upload( + // &token_res, + // &check_upload_res.bucket, + // &check_upload_res.file[0], + // &check_upload_res.prefix, + // &check_upload_res.region, + // &check_upload_res.upload_id, + // file_data, + // pb, + // ) + // .await + // .unwrap(); // if file_size > CHUNK_SIZE as u64 { // for (i, file) in files.iter().enumerate() { // println!("Multi Uploading: {:?}", file.file); - // } - // } else { - // for (i, file) in files.iter().enumerate() { - // println!("Uploading: {:?}", file.file); - // let stream = ByteStream::read_from() - // .path(file.file.clone()) - // .offset(0) - // .length(Length::Exact(file_size)) - // .build() - // .await - // .unwrap(); - // let key = - // check_upload_res.prefix.to_owned() + check_upload_res.file[i].path.as_str(); - // let _upload_res = s3_client - // .put_object() - // .bucket(check_upload_res.bucket.clone()) - // .key(key) - // .body(stream) - // .send() - // .await - // .unwrap(); - // } - // } + // } + // } else { + // for (i, file) in files.iter().enumerate() { + // println!("Uploading: {:?}", file.file); + // let stream = ByteStream::read_from() + // .path(file.file.clone()) + // .offset(0) + // .length(Length::Exact(file_size)) + // .build() + // .await + // .unwrap(); + // let key = + // check_upload_res.prefix.to_owned() + check_upload_res.file[i].path.as_str(); + // let _upload_res = s3_client + // .put_object() + // .bucket(check_upload_res.bucket.clone()) + // .key(key) + // .body(stream) + // .send() + // .await + // .unwrap(); + // } // } + // } + + println!("Checking for upload complete..."); + + match check_job(&check_upload_res.upload_id, &self.client).await { + Ok(_) => Ok(()), + Err(e) => { + println!("Error: {:?}", e); + Err(anyhow::anyhow!("Error: {:?}", e)) + } + } + } + + pub async fn upload_from_path( + &self, + file: TargetFile, + prefix: Option<&str>, + fake_size: Option, + pb: Option, + ) -> anyhow::Result<()> { + let req = types::request::CheckUploadRequest { + host_id: self.client.host_id.clone(), + path: prefix.unwrap_or("").to_string(), + upload_id: "".to_string(), + file: vec![types::request::CheckUploadRequestFile { + path: file.file.to_str().unwrap().to_string(), + size: fake_size.unwrap_or(file.file.metadata().unwrap().len() as u64) as i64, + }], + }; + + println!("prefix: {:?}", prefix.unwrap_or("")); + + let check_upload_res = self.client.check_upload(req).await.unwrap(); + + let token_res = self.client.get_upload_token().await.unwrap(); + + let cledential = Credentials::new( + token_res.access_key_id.clone(), + token_res.secret_access_key.clone(), + Some(token_res.session_token.clone()), + None, + "2021-06-01", + ); + let _config = aws_sdk_s3::Config::builder() + .behavior_version_latest() + .region(Region::new(check_upload_res.region.clone())) + .credentials_provider(cledential) + .force_path_style(true) + .build(); + + let token_res = token_res.clone(); + let check_upload_res = check_upload_res.clone(); + let pb = pb.clone(); + multipart_upload_from_path( + &token_res, + &check_upload_res.bucket, + &check_upload_res.file[1], + &check_upload_res.prefix, + &check_upload_res.region, + &check_upload_res.upload_id, + file, + pb.clone(), + ) + .await + .unwrap(); + + // multipart_upload( + // &token_res, + // &check_upload_res.bucket, + // &check_upload_res.file[0], + // &check_upload_res.prefix, + // &check_upload_res.region, + // &check_upload_res.upload_id, + // file_data, + // pb, + // ) + // .await + // .unwrap(); + // if file_size > CHUNK_SIZE as u64 { + // for (i, file) in files.iter().enumerate() { + // println!("Multi Uploading: {:?}", file.file); + + // } + // } else { + // for (i, file) in files.iter().enumerate() { + // println!("Uploading: {:?}", file.file); + // let stream = ByteStream::read_from() + // .path(file.file.clone()) + // .offset(0) + // .length(Length::Exact(file_size)) + // .build() + // .await + // .unwrap(); + // let key = + // check_upload_res.prefix.to_owned() + check_upload_res.file[i].path.as_str(); + // let _upload_res = s3_client + // .put_object() + // .bucket(check_upload_res.bucket.clone()) + // .key(key) + // .body(stream) + // .send() + // .await + // .unwrap(); + // } + // } + // } + + println!("Checking for upload complete..."); match check_job(&check_upload_res.upload_id, &self.client).await { Ok(_) => Ok(()), @@ -166,9 +348,15 @@ impl RakutenDriveClient { pub async fn mkdir(&self, name: &str, path: Option<&str>) -> anyhow::Result<()> { if name.contains('/') { - println!("Please use --path option for set parent directory"); + // println!("Please use --path option for set parent directory"); return Err(anyhow::anyhow!( - "Please use --path option for set parent directory" + "Can not use / in folder name. Use --path option for set parent directory." + )); + } + if name.len() > 255 { + println!("Folder name should be less than 255 characters."); + return Err(anyhow::anyhow!( + "Folder name should be less than 255 characters." )); } let req = types::request::CreateFolderRequest { @@ -190,11 +378,11 @@ impl RakutenDriveClient { } pub async fn rename(&self, path: &str, name: &str) -> anyhow::Result<()> { - if name.contains('/') { - println!("Can't use / in file name"); - println!("Name should be file name only."); - return Err(anyhow::anyhow!("Can't use / in file name")); - } + // if name.contains('/') { + // println!("Can't use / in file name"); + // println!("Name should be file name only."); + // return Err(anyhow::anyhow!("Can't use / in file name")); + // } let file_path = path.split('/').collect::>()[0..path.split('/').count() - 1].join("/") + "/"; diff --git a/src/main.rs b/src/main.rs index 82d0f4c..b48fd18 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use std::{ cmp::{max, min}, + collections::{HashMap, HashSet}, io::{stdout, Write}, path::{Path, PathBuf}, sync::Arc, @@ -15,7 +16,7 @@ use clap::{Parser, Subcommand}; use constants::REFRESH_TOKEN; use human_bytes::human_bytes; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; -use rakuten_drive_cui::RakutenDriveClient; +use rakuten_drive_cui::{RakutenDriveClient, TargetFile, UploadFile}; use tokio::{ fs::File, io::{AsyncReadExt, BufReader}, @@ -59,6 +60,14 @@ enum Commands { /// Send fake file size to server (byte) #[clap(short, long)] fake_size: Option, + + /// Do not check file existence + #[clap(long)] + force: bool, + + /// Stream Upload + #[clap(short, long)] + stream: bool, }, #[clap(about = "Download file")] Download { @@ -119,27 +128,66 @@ enum Commands { async fn main() -> anyhow::Result<()> { let args = Args::parse(); - let client = RakutenDriveClient::try_new(REFRESH_TOKEN.to_string()).await?; + let client = Arc::new(RakutenDriveClient::try_new(REFRESH_TOKEN.to_string()).await?); - match &args.command { + match args.command { Commands::List { prefix } => { - client.list(prefix.as_deref()).await.unwrap(); + let res = client.list(prefix.as_deref()).await.unwrap(); + res.file.iter().for_each(|f| { + let dir_str = if f.is_folder { "d" } else { " " }; + println!( + "{}\t{}\t{}\t{}", + dir_str, + human_bytes(f.size as f64), + f.last_modified, + f.path, + ) + }); } Commands::Upload { file, prefix, recursive, fake_size, + force, + stream, } => { // is folder - if file.is_dir() && !*recursive { + if file.is_dir() && !recursive { println!("Use --recursive option for folder upload"); return Err(anyhow::anyhow!("Use --recursive option for folder upload")); } + if stream && recursive { + println!("Can't use Stream Upload and Recursive Upload both."); + return Err(anyhow::anyhow!( + "Can't use Stream Upload and Recursive Upload both." + )); + } + if let Some(prefix) = prefix.as_ref() { + if !prefix.ends_with('/') { + println!("Prefix must end with /"); + return Err(anyhow::anyhow!("Prefix must end with /")); + } + } + if file.is_dir() { + println!("name: {:?}", file.file_name().unwrap().to_str().unwrap()); + println!("prefix: {:?}", prefix.as_deref()); + client + .mkdir( + file.file_name().unwrap().to_str().unwrap(), + prefix.as_deref(), + ) + .await?; + } + let prefix = if file.is_dir() && prefix.is_none() { + Some(file.file_name().unwrap().to_str().unwrap().to_string() + "/") + } else { + prefix + }; let mut files = Vec::::new(); - if file.is_dir() && *recursive { + if file.is_dir() && recursive { // upload folder let mut dirs = Vec::::new(); dirs.push(file.clone()); @@ -153,12 +201,12 @@ async fn main() -> anyhow::Result<()> { } else { files.push(TargetFile { file: path.clone(), - path: path - .strip_prefix(file) - .unwrap() - .to_str() - .expect("Invalid File Name") - .to_string(), + path: (prefix.clone().unwrap_or("".to_string()) + + path + .strip_prefix(&file) + .unwrap() + .to_str() + .expect("Invalid File Name")), }); } } @@ -185,19 +233,87 @@ async fn main() -> anyhow::Result<()> { }); } - for file in &files { - if client.info(file.path.as_str()).await.is_ok() { - println!("File already exists."); - return Err(anyhow::anyhow!("File already exists.")); + if !force { + println!("Checking file existence..."); + for file in &files { + println!("Checking: {:?}", file.path); + let res = client.info(file.path.as_str()).await; + if res.is_ok() { + println!("File already exists."); + return Err(anyhow::anyhow!("File already exists.")); + } else { + println!("{:?}", res.err().unwrap()); + } } } - for file in &files { + // println!("{:#?}", files); + + let mut dirs = files + .iter() + .map(|f| f.path.clone()) + .map(|f| f.split('/').collect::>()[..f.split('/').count() - 1].join("/")) + .collect::>() + .into_iter() + .collect::>(); + dirs.sort(); + + println!("{:?}", dirs); + + let mut done_set = Vec::new(); + + for dir in dirs { + let paths = dir.split('/'); + println!("{}", dir); + for (i, path) in paths.clone().enumerate() { + let mut path_prefix = paths.clone().take(i).collect::>().join("/"); + if path_prefix.is_empty() && path == file.file_name().unwrap().to_str().unwrap() + { + continue; + } + if !path_prefix.is_empty() { + path_prefix.push('/'); + } + + if done_set + .iter() + .any(|x| x == &(path.to_string(), path_prefix.clone())) + { + continue; + } + + println!("path: {:?}", path); + println!("prefix: {:?}", path_prefix); + match client.mkdir(path, Some(&path_prefix)).await { + Ok(_) => { + done_set.push((path.to_string(), path_prefix)); + } + Err(e) => { + println!("{:?}", e); + continue; + } + } + } + } + + let small_files = files + .iter() + .filter(|f| f.file.metadata().unwrap().len() < 1024 * 1024 * 10 * 10) + .collect::>(); + let large_files = files + .iter() + .filter(|f| f.file.metadata().unwrap().len() >= 1024 * 1024 * 10 * 10) + .collect::>(); + + println!("small: {}", small_files.len()); + println!("large: {}", large_files.len()); + + for file in large_files { + // wait 50ms + // tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let file_size = file.file.metadata().unwrap().len(); - let file_data = File::open(file.file.clone()).await?; - let mut file_reader = tokio::io::BufReader::new(file_data); - let mut file_data: Vec = Vec::with_capacity(file_size as usize); - file_reader.read_to_end(&mut file_data).await?; + let file_data = File::open(file.file.clone()).await.unwrap(); let pb = ProgressBar::new(file_size); pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") @@ -205,12 +321,101 @@ async fn main() -> anyhow::Result<()> { .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .progress_chars("#>-")); + let file_name = file.file.file_name().unwrap().to_str().unwrap(); + let file_dir = file.path.split('/').collect::>() + [..file.path.split('/').count() - 1] + .join("/") + + "/"; + + // println!("file.path: {:?}", file_name); + // println!("prefix: {:?}", file_dir); + + if stream { + client + .upload_from_path(file.clone(), Some(&file_dir), fake_size, Some(pb)) + .await + .unwrap(); + } else { + let mut file_reader = tokio::io::BufReader::new(file_data); + let mut file_data: Vec = Vec::with_capacity(file_size as usize); + file_reader.read_to_end(&mut file_data).await.unwrap(); + + client + .upload( + vec![UploadFile { + data: file_data, + path: file_name.to_string(), + }], + Some(&file_dir), + fake_size, + Some(pb), + ) + .await + .unwrap(); + } + } + + let mut a = HashMap::>::new(); + + for file in small_files { + // per parent path + let file_size = file.file.metadata().unwrap().len(); + let file_data = File::open(file.file.clone()).await.unwrap(); + let mut file_reader = tokio::io::BufReader::new(file_data); + let mut file_data: Vec = Vec::with_capacity(file_size as usize); + file_reader.read_to_end(&mut file_data).await.unwrap(); + + let file_name = file.file.file_name().unwrap().to_str().unwrap(); + let file_dir = file.path.split('/').collect::>() + [..file.path.split('/').count() - 1] + .join("/") + + "/"; + + a.entry(file_dir.clone()).or_default().push(file.clone()); + } + + let b = a.into_values().collect::>>(); + + for dir_group in b { + let mut upload_files = Vec::new(); + + for file in &dir_group { + let file_size = file.file.metadata().unwrap().len(); + let file_data = File::open(file.file.clone()).await.unwrap(); + let mut file_reader = tokio::io::BufReader::new(file_data); + let mut file_data: Vec = Vec::with_capacity(file_size as usize); + file_reader.read_to_end(&mut file_data).await.unwrap(); + + let file_name = file.file.file_name().unwrap().to_str().unwrap(); + let file_dir = file.path.split('/').collect::>() + [..file.path.split('/').count() - 1] + .join("/") + + "/"; + upload_files.push(UploadFile { + data: file_data, + path: file_name.to_string(), + }); + } + + println!("Uploading {} files", upload_files.len()); + + let pb = ProgressBar::new(upload_files.len() as u64); + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})") + .unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("#>-")); + client .upload( - &file.path, - &file_data, - prefix.as_deref(), - *fake_size, + upload_files, + Some( + (dir_group[0].path.split('/').collect::>() + [..dir_group[0].path.split('/').count() - 1] + .join("/") + + "/") + .as_str(), + ), + fake_size, Some(pb), ) .await @@ -224,22 +429,22 @@ async fn main() -> anyhow::Result<()> { .unwrap(); } Commands::Move { path, dest } => { - client.move_file(path, dest).await.unwrap(); + client.move_file(&path, &dest).await.unwrap(); } Commands::Delete { path, recursive } => { - client.delete(path, recursive).await.unwrap(); + client.delete(&path, &recursive).await.unwrap(); } Commands::Mkdir { name, path } => { - client.mkdir(name, path.as_deref()).await.unwrap(); + client.mkdir(&name, path.as_deref()).await.unwrap(); } Commands::Copy { src, dest } => { - client.copy(src, dest).await.unwrap(); + client.copy(&src, &dest).await.unwrap(); } Commands::Rename { path, name } => { - client.rename(path, name).await.unwrap(); + client.rename(&path, &name).await.unwrap(); } Commands::Info { path } => { - client.info(path).await.unwrap(); + client.info(&path).await.unwrap(); } Commands::Auth {} => { println!("Click the link below to authorize the app:\n"); diff --git a/src/types/response.rs b/src/types/response.rs index 06de32f..1b1d88a 100644 --- a/src/types/response.rs +++ b/src/types/response.rs @@ -75,7 +75,7 @@ pub struct CheckActionResponse { pub message: Option, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "snake_case")] pub struct CheckUploadResponse { pub bucket: String, @@ -85,7 +85,7 @@ pub struct CheckUploadResponse { pub upload_id: String, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct CheckUploadResponseFile { pub last_modified: String, // 1970-01-20T22:07:12.804Z @@ -94,7 +94,7 @@ pub struct CheckUploadResponseFile { pub version_id: String, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "PascalCase")] pub struct GetFileLinkTokenResponse { pub access_key_id: String, diff --git a/src/util.rs b/src/util.rs index e86ebbc..0b3a33f 100644 --- a/src/util.rs +++ b/src/util.rs @@ -19,13 +19,64 @@ use tokio::{fs::File, io::BufReader, sync::Mutex}; use crate::{ client::{self}, types::response::ListFilesResponseFile, + TargetFile, }; use crate::{constants::CHUNK_SIZE, types}; -#[derive(Debug, Clone)] -pub struct TargetFile { - pub file: PathBuf, - pub path: String, +pub async fn single_upload( + token_res: &types::response::GetFileLinkTokenResponse, + bucket: &str, + target_file: &types::response::CheckUploadResponseFile, + prefix: &str, + region: &str, + upload_id: &str, + file: &[u8], +) -> anyhow::Result<()> { + let _ = upload_id; + // if !file.file.exists() { + // println!("File not found: {:?}", file.file); + // return Err(anyhow::anyhow!("File not found: {:?}", file.file)); + // } + + let file_size = file.len() as u64; + + let cledential = Credentials::new( + &token_res.access_key_id, + &token_res.secret_access_key, + Some(token_res.session_token.clone()), + // 2024-07-18T07:14:42Z + Some( + chrono::DateTime::parse_from_rfc3339(&token_res.expiration) + .unwrap() + .into(), + ), + "2021-06-01", + ); + + let config = aws_sdk_s3::Config::builder() + .behavior_version_latest() + .credentials_provider(cledential) + .region(Region::new(region.to_owned())) + // .endpoint_url("https://sendy-cloud.s3.ap-northeast-1.amazonaws.com") + .build(); + + let s3_client = aws_sdk_s3::Client::from_conf(config); + + let key = prefix.to_owned() + target_file.path.as_str(); + + let stream = ByteStream::from(file.to_vec()); + + let _put_object_res = s3_client + .put_object() + .bucket(bucket) + .key(key) + .body(stream) + .send() + .await?; + + // println!("Uploaded"); + + Ok(()) } pub async fn multipart_upload( @@ -185,6 +236,183 @@ pub async fn multipart_upload( if let Some(pb) = pb { pb.finish_with_message("Uploaded"); + } else { + println!("Uploaded"); + } + + Ok(()) +} + +pub async fn multipart_upload_from_path( + token_res: &types::response::GetFileLinkTokenResponse, + bucket: &str, + target_file: &types::response::CheckUploadResponseFile, + prefix: &str, + region: &str, + upload_id: &str, + file: TargetFile, + pb: Option, +) -> anyhow::Result<()> { + let _ = upload_id; + if !file.file.exists() { + println!("File not found: {:?}", file.file); + return Err(anyhow::anyhow!("File not found: {:?}", file.file)); + } + + let file_size = file.file.metadata().unwrap().len() as u64; + + let cledential = Credentials::new( + &token_res.access_key_id, + &token_res.secret_access_key, + Some(token_res.session_token.clone()), + // 2024-07-18T07:14:42Z + Some( + chrono::DateTime::parse_from_rfc3339(&token_res.expiration) + .unwrap() + .into(), + ), + "2021-06-01", + ); + + let config = aws_sdk_s3::Config::builder() + .behavior_version_latest() + .credentials_provider(cledential) + .region(Region::new(region.to_owned())) + // .endpoint_url("https://sendy-cloud.s3.ap-northeast-1.amazonaws.com") + .build(); + + let s3_client = aws_sdk_s3::Client::from_conf(config); + + let key = prefix.to_owned() + target_file.path.as_str(); + + let multipart_upload_res = s3_client + .create_multipart_upload() + .bucket(bucket) + .key(key.clone()) + .send() + .await + .unwrap(); + + let upload_id = multipart_upload_res.upload_id().unwrap().to_string(); + + let chunk_size = max(CHUNK_SIZE as u64, file_size / 10000); + + let mut chunk_count = file_size / chunk_size; + let mut size_of_last_chunk = file_size % chunk_size; + + if size_of_last_chunk == 0 { + size_of_last_chunk = chunk_size; + chunk_count -= 1; + } + + let upload_parts = Arc::new(Mutex::new(Vec::::new())); + + let semaphore = Arc::new(tokio::sync::Semaphore::new(20)); + let mut handles = Vec::new(); + + for chunk_index in 0..chunk_count { + let bucket = bucket.to_owned(); + let key = key.clone(); + let upload_id = upload_id.clone(); + let s3_client = s3_client.clone(); + let pb = pb.clone(); + let file = file.to_owned(); + let upload_parts = upload_parts.clone(); + + let semaphore = semaphore.clone().acquire_owned().await.unwrap(); + + let handle = tokio::spawn(async move { + let _permit = semaphore; + + let this_chunk = if chunk_count - 1 == chunk_index { + size_of_last_chunk + } else { + chunk_size + }; + loop { + let offset = chunk_index * chunk_size; + let length = this_chunk; + + let stream = ByteStream::read_from() + .path(file.file.clone()) + .offset(chunk_index * chunk_size) + .length(Length::Exact(this_chunk)) + .build() + .await; + let stream = match stream { + Ok(stream) => stream, + Err(e) => { + eprintln!("Error: {:?}", e); + continue; + } + }; + // let stream = match stream { + // Ok(stream) => stream, + // Err(e) => { + // eprintln!("Error: {:?}", e); + // continue; + // } + // }; + //Chunk index needs to start at 0, but part numbers start at 1. + let part_number = (chunk_index as i32) + 1; + let upload_part_res = s3_client + .upload_part() + .key(&key) + .bucket(bucket.clone()) + .upload_id(upload_id.clone()) + .body(stream) + .part_number(part_number) + .send() + .await; + let upload_part_res = match upload_part_res { + Ok(upload_part_res) => upload_part_res, + Err(e) => { + eprintln!("Error: {:?}", e); + continue; + } + }; + upload_parts.lock().await.push( + CompletedPart::builder() + .e_tag(upload_part_res.e_tag.unwrap_or_default()) + .part_number(part_number) + .build(), + ); + if let Some(pb) = &pb { + pb.inc(this_chunk); + } + break; + } + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + upload_parts + .lock() + .await + .sort_by(|a, b| a.part_number.cmp(&b.part_number)); + + let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder() + .set_parts(Some(upload_parts.lock().await.clone())) + .build(); + + let _complete_multipart_upload_res = s3_client + .complete_multipart_upload() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .multipart_upload(completed_multipart_upload) + .send() + .await + .unwrap(); + + if let Some(pb) = pb { + pb.finish_with_message("Uploaded"); + } else { + println!("Uploaded"); } Ok(()) @@ -267,6 +495,6 @@ pub async fn check_job(key: &str, client: &client::Client) -> anyhow::Result<()> return Err(anyhow::anyhow!("Error: {:?}", res)); } - std::thread::sleep(std::time::Duration::from_millis(200)); + // std::thread::sleep(std::time::Duration::from_millis(100)); } }