diff --git a/src/constants.rs b/src/constants.rs index d0bc1ce..e3d0a9b 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,3 +1,3 @@ pub const REFRESH_TOKEN: &str = "AMf-vBwuDNdrMsPlvESgMqZWGCdVlBxMvrQVNvHOWb-FdDRV0Ozeq26POxH2tzy463DGlZTZpPYYhWCSi-KI0-8pSYEXtuCG_8DlVRyqwm4POeobYWkrw3dMgiEDNjFCfXIN4-k65CsizmCFbWQz6ASsi-XGAwMn_mXyFj9JirB7vyzTTr2ugbA"; -pub const CHUNK_SIZE: usize = 1024 * 1024 * 10; // 10MB +pub const CHUNK_SIZE: usize = 1024 * 1024 * 100; // 10MB pub const APP_VERSION: &str = "v21.11.10"; diff --git a/src/lib.rs b/src/lib.rs index be50f41..eb2479b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -207,20 +207,34 @@ impl RakutenDriveClient { pub async fn upload_from_path( &self, file: TargetFile, + custom_file_name: Option<&str>, prefix: Option<&str>, fake_size: Option, + offset: Option, + length: Option, pb: Option, ) -> anyhow::Result<()> { + if offset.is_some() && length.is_none() || offset.is_none() && length.is_some() { + println!("offset and length are must be set both"); + return Err(anyhow::anyhow!("offset and length are must be set both")); + } + let prefix = if prefix.unwrap_or("") == "/" { "" } else { prefix.unwrap_or("") }; + + let file_name = match custom_file_name { + Some(n) => n, + None => file.file.file_name().unwrap().to_str().unwrap(), + }; + let req = types::request::CheckUploadRequest { host_id: self.client.host_id.clone(), path: prefix.to_string(), file: vec![types::request::CheckUploadRequestFile { - path: file.file.file_name().unwrap().to_str().unwrap().to_string(), + path: file_name.to_string(), size: fake_size.unwrap_or(file.file.metadata().unwrap().len() as u64) as i64, hash: None, host_id: None, @@ -262,6 +276,8 @@ impl RakutenDriveClient { &check_upload_res.region, &check_upload_res.upload_id, file, + offset, + length, pb.clone(), ) .await diff --git a/src/main.rs b/src/main.rs index 99bcafc..dec8992 100644 --- a/src/main.rs +++ b/src/main.rs @@ -70,6 +70,26 @@ enum Commands { #[clap(short, long)] stream: bool, }, + #[clap(about = "Upload ultra big file")] + UploadBig { + file: PathBuf, + + /// Parent folder path + #[clap(short, long)] + prefix: Option, + + /// Send fake file size to server (byte) + #[clap(short, long)] + fake_size: Option, + + /// Do not check file existence + #[clap(long)] + force: bool, + + /// Set division size in bytes + #[clap(long)] + length: Option, + }, #[clap(about = "Download file")] Download { path: String, @@ -241,7 +261,15 @@ async fn main() -> anyhow::Result<()> { // println!("prefix: {:?}", file_dir); client - .upload_from_path(target_file.clone(), prefix.as_deref(), fake_size, Some(pb)) + .upload_from_path( + target_file.clone(), + None, + prefix.as_deref(), + fake_size, + None, + None, + Some(pb), + ) .await .unwrap(); @@ -261,6 +289,149 @@ async fn main() -> anyhow::Result<()> { } } } + Commands::UploadBig { + file, + prefix, + fake_size, + force, + length, + } => { + // is folder + if file.is_dir() { + println!("Can't folder upload"); + return Err(anyhow::anyhow!("Can't folder upload")); + } + if let Some(prefix) = prefix.as_ref() { + if !prefix.ends_with('/') { + println!("Prefix must end with /"); + return Err(anyhow::anyhow!("Prefix must end with /")); + } + } + + let mut files = Vec::::new(); + + // file check + if !file.exists() { + println!("File not found: {:?}", file); + return Err(anyhow::anyhow!("File not found: {:?}", file)); + } + let target_file = TargetFile { + file: file.clone(), + path: file.file_name().unwrap().to_str().unwrap().to_string(), + }; + let path = match prefix.clone() { + Some(p) => p + target_file.clone().path.as_str(), + None => target_file.clone().path, + }; + + if cfg!(windows) { + // replase \ to / + files.iter_mut().for_each(|f| { + f.path = f.path.replace('\\', "/"); + }); + } + + let file_size = target_file.file.metadata().unwrap().len(); + + let file_chunk_size = length.unwrap_or(1 * 1024 * 1024 * 1024); // 16GB + + let mut chunk_count = file_size / file_chunk_size; + let mut size_of_last_chunk = file_size % file_chunk_size; + + if size_of_last_chunk == 0 { + size_of_last_chunk = file_chunk_size; + chunk_count -= 1; + } + + for file_chunk_index in 0..chunk_count + 1 { + // if !force { + // println!("Checking file existence..."); + // println!("Checking: {:?}", path); + // let res = client.info(path.as_str()).await; + // if res.is_ok() { + // println!("File already exists."); + // return Err(anyhow::anyhow!("File already exists.")); + // } else { + // println!("{:?}", res.err().unwrap()); + // } + // } + + let this_chunk_size = if chunk_count == file_chunk_index { + size_of_last_chunk + } else { + file_chunk_size + }; + + let offset = file_chunk_index * file_chunk_size; + + let file_name = target_file + .file + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string() + + "." + + (file_chunk_index + 1).to_string().as_str(); + + let file_path = match prefix.clone() { + Some(p) => p + file_name.as_str(), + None => file_name.clone(), + }; + + // let file_data = File::open(target_file.file.clone()).await.unwrap(); + + loop { + println!( + "Uploading {}/{} {}...", + file_chunk_index + 1, + chunk_count + 1, + file_name + ); + let pb = ProgressBar::new(this_chunk_size); + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") + .unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("#>-")); + + match client + .upload_from_path( + target_file.clone(), + Some(file_name.as_str()), + prefix.as_deref(), + fake_size, + Some(offset), + Some(this_chunk_size), + Some(pb), + ) + .await + { + Ok(_) => {} + Err(e) => { + println!("ERROR: {:#?}", e); + continue; + } + }; + + for i in 0..20 { + println!("Checking: {:?}", file_path); + let res = client.info(file_path.as_str()).await; + if res.is_ok() { + println!("File exists."); + break; + } else { + println!("{:?}", res.err().unwrap()); + if i > 20 { + return Err(anyhow::anyhow!("File not exists.")); + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + continue; + } + } + break; + } + } + } Commands::Download { path, prefix } => { client .download(path.as_str(), prefix.as_deref()) diff --git a/src/util.rs b/src/util.rs index fcf5419..e2b8f7d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -10,7 +10,7 @@ use aws_sdk_s3::{ config::Credentials, operation::upload_part, primitives::ByteStream, types::CompletedPart, }; use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; -use aws_smithy_types::byte_stream::Length; +use aws_smithy_types::byte_stream::{FsBuilder, Length}; use clap::{Parser, Subcommand}; use human_bytes::human_bytes; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; @@ -251,6 +251,8 @@ pub async fn multipart_upload_from_path( region: &str, upload_id: &str, file: TargetFile, + offset: Option, + length: Option, pb: Option, ) -> anyhow::Result<()> { let _ = upload_id; @@ -259,7 +261,17 @@ pub async fn multipart_upload_from_path( return Err(anyhow::anyhow!("File not found: {:?}", file.file)); } - let file_size = file.file.metadata().unwrap().len() as u64; + if offset.is_some() && length.is_none() || offset.is_none() && length.is_some() { + println!("offset and length are must be set both"); + return Err(anyhow::anyhow!("offset and length are must be set both")); + } + + let file_size = match length { + Some(length) => length, + None => file.file.metadata().unwrap().len() as u64, + }; + + println!("file_size: {}", file_size); let cledential = Credentials::new( &token_res.access_key_id, @@ -311,10 +323,10 @@ pub async fn multipart_upload_from_path( let upload_parts = Arc::new(Mutex::new(Vec::::new())); - let semaphore = Arc::new(tokio::sync::Semaphore::new(5)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(10)); let mut handles = Vec::new(); - for chunk_index in 0..chunk_count { + for chunk_index in 0..chunk_count + 1 { let bucket = bucket.to_owned(); let key = key.clone(); let upload_id = upload_id.clone(); @@ -328,18 +340,15 @@ pub async fn multipart_upload_from_path( let handle = tokio::spawn(async move { let _permit = semaphore; - let this_chunk = if chunk_count - 1 == chunk_index { + let this_chunk = if chunk_count == chunk_index { size_of_last_chunk } else { chunk_size }; loop { - let offset = chunk_index * chunk_size; - let length = this_chunk; - let stream = ByteStream::read_from() .path(file.file.clone()) - .offset(chunk_index * chunk_size) + .offset(chunk_index * chunk_size + offset.unwrap_or(0)) .length(Length::Exact(this_chunk)) .build() .await; @@ -459,29 +468,29 @@ pub async fn list_files( files.append(&mut res.file); - if !res.last_page { - let mut cursor = res.file.len() as i64; - loop { - let req = types::request::ListFilesRequest { - from: cursor, - host_id: client.host_id.clone(), - path: prefix.unwrap_or("").to_string(), - sort_type: types::request::ListFilesRequestSortType::Path, - reverse: false, - thumbnail_size: 130, - to: pagination_size + cursor, - }; - - let mut next_res = client.list_files(req).await?; - files.append(&mut next_res.file); - - if next_res.last_page { - break; - } else { - cursor += next_res.file.len() as i64; - } - } - } + // if !res.last_page { + // let mut cursor = res.file.len() as i64; + // loop { + // let req = types::request::ListFilesRequest { + // from: cursor, + // host_id: client.host_id.clone(), + // path: prefix.unwrap_or("").to_string(), + // sort_type: types::request::ListFilesRequestSortType::Path, + // reverse: false, + // thumbnail_size: 130, + // to: pagination_size + cursor, + // }; + // + // let mut next_res = client.list_files(req).await?; + // files.append(&mut next_res.file); + // + // if next_res.last_page { + // break; + // } else { + // cursor += next_res.file.len() as i64; + // } + // } + // } res.file = files; // files.iter().find(|f| f.path == "/").unwrap();