diff --git a/src/lib.rs b/src/lib.rs index 733763d..554cd17 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ use aws_config::Region; use aws_sdk_s3::config::Credentials; use constants::APP_VERSION; -use std::path::PathBuf; +use std::{io::BufReader, path::PathBuf, sync::Arc}; use util::{check_job, file_detail, list_files, multipart_upload, TargetFile}; mod client; @@ -34,93 +34,26 @@ impl RakutenDriveClient { } pub async fn upload( &self, - file: &PathBuf, + file_path: &str, + file_data: &[u8], prefix: Option<&str>, - recursive: bool, fake_size: Option, + pb: Option, ) -> anyhow::Result<()> { - // is folder - if file.is_dir() && !recursive { - println!("Use --recursive option for folder upload"); - return Err(anyhow::anyhow!("Use --recursive option for folder upload")); - } - - let mut files = Vec::::new(); - - if file.is_dir() && recursive { - // upload folder - let mut dirs = Vec::::new(); - dirs.push(file.clone()); - while let Some(dir) = dirs.pop() { - let entries = std::fs::read_dir(dir).unwrap(); - for entry in entries { - let entry = entry.unwrap(); - let path = entry.path(); - if path.is_dir() { - dirs.push(path); - } else { - files.push(TargetFile { - file: path.clone(), - path: path - .strip_prefix(file) - .unwrap() - .to_str() - .expect("Invalid File Name") - .to_string(), - }); - } - } - } - // for file in files { - // println!("{:?}", file); - // } - } else { - // file check - if !file.exists() { - println!("File not found: {:?}", file); - return Err(anyhow::anyhow!("File not found: {:?}", file)); - } - files.push(TargetFile { - file: file.clone(), - path: file.file_name().unwrap().to_str().unwrap().to_string(), - }); - } - - if cfg!(windows) { - // replase \ to / - files.iter_mut().for_each(|f| { - f.path = f.path.replace('\\', "/"); - }); - } - - for file in &files { - if (file_detail(&file.path, &self.client).await).is_ok() { - println!("File already exists."); - return Err(anyhow::anyhow!("File already exists.")); - } - } - let req = types::request::CheckUploadRequest { host_id: self.client.host_id.clone(), path: prefix.unwrap_or("").to_string(), upload_id: "".to_string(), - file: files - .iter() - .map(|f| types::request::CheckUploadRequestFile { - path: f.path.clone(), - size: fake_size.unwrap_or(f.file.metadata().unwrap().len()) as i64, - }) - .collect(), + file: vec![types::request::CheckUploadRequestFile { + path: file_path.to_string(), + size: fake_size.unwrap_or(file_data.len() as u64) as i64, + }], }; let check_upload_res = self.client.check_upload(req).await.unwrap(); - // println!("{:#?}", check_upload_res); - let token_res = self.client.get_upload_token().await.unwrap(); - // println!("{:#?}", token_res); - let cledential = Credentials::new( token_res.access_key_id.clone(), token_res.secret_access_key.clone(), @@ -135,21 +68,22 @@ impl RakutenDriveClient { .force_path_style(true) .build(); + multipart_upload( + &token_res, + &check_upload_res.bucket, + &check_upload_res.file[0], + &check_upload_res.prefix, + &check_upload_res.region, + &check_upload_res.upload_id, + file_data, + pb, + ) + .await + .unwrap(); // if file_size > CHUNK_SIZE as u64 { - for (i, file) in files.iter().enumerate() { - println!("Multi Uploading: {:?}", file.file); + // for (i, file) in files.iter().enumerate() { + // println!("Multi Uploading: {:?}", file.file); - multipart_upload( - &token_res, - &check_upload_res.bucket, - &check_upload_res.file[i], - &check_upload_res.prefix, - &check_upload_res.region, - &check_upload_res.upload_id, - file.clone(), - ) - .await - .unwrap(); // } // } else { // for (i, file) in files.iter().enumerate() { @@ -173,7 +107,7 @@ impl RakutenDriveClient { // .unwrap(); // } // } - } + // } match check_job(&check_upload_res.upload_id, &self.client).await { Ok(_) => Ok(()), diff --git a/src/main.rs b/src/main.rs index 6e26073..82d0f4c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,11 @@ use constants::REFRESH_TOKEN; use human_bytes::human_bytes; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use rakuten_drive_cui::RakutenDriveClient; -use tokio::{fs::File, io::BufReader, sync::Mutex}; +use tokio::{ + fs::File, + io::{AsyncReadExt, BufReader}, + sync::Mutex, +}; use types::response::ListFilesResponseFile; use util::*; @@ -105,7 +109,9 @@ enum Commands { name: String, }, #[clap(about = "Print file detail")] - Info { path: String }, + Info { + path: String, + }, Auth {}, } @@ -125,12 +131,97 @@ async fn main() -> anyhow::Result<()> { recursive, fake_size, } => { - client - .upload(file, prefix.as_deref(), *recursive, *fake_size) - .await.unwrap(); + // is folder + if file.is_dir() && !*recursive { + println!("Use --recursive option for folder upload"); + return Err(anyhow::anyhow!("Use --recursive option for folder upload")); + } + + let mut files = Vec::::new(); + + if file.is_dir() && *recursive { + // upload folder + let mut dirs = Vec::::new(); + dirs.push(file.clone()); + while let Some(dir) = dirs.pop() { + let entries = std::fs::read_dir(dir).unwrap(); + for entry in entries { + let entry = entry.unwrap(); + let path = entry.path(); + if path.is_dir() { + dirs.push(path); + } else { + files.push(TargetFile { + file: path.clone(), + path: path + .strip_prefix(file) + .unwrap() + .to_str() + .expect("Invalid File Name") + .to_string(), + }); + } + } + } + // for file in files { + // println!("{:?}", file); + // } + } else { + // file check + if !file.exists() { + println!("File not found: {:?}", file); + return Err(anyhow::anyhow!("File not found: {:?}", file)); + } + files.push(TargetFile { + file: file.clone(), + path: file.file_name().unwrap().to_str().unwrap().to_string(), + }); + } + + if cfg!(windows) { + // replase \ to / + files.iter_mut().for_each(|f| { + f.path = f.path.replace('\\', "/"); + }); + } + + for file in &files { + if client.info(file.path.as_str()).await.is_ok() { + println!("File already exists."); + return Err(anyhow::anyhow!("File already exists.")); + } + } + + for file in &files { + let file_size = file.file.metadata().unwrap().len(); + let file_data = File::open(file.file.clone()).await?; + let mut file_reader = tokio::io::BufReader::new(file_data); + let mut file_data: Vec = Vec::with_capacity(file_size as usize); + file_reader.read_to_end(&mut file_data).await?; + + let pb = ProgressBar::new(file_size); + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") + .unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("#>-")); + + client + .upload( + &file.path, + &file_data, + prefix.as_deref(), + *fake_size, + Some(pb), + ) + .await + .unwrap(); + } } Commands::Download { path, prefix } => { - client.download(path.as_str(), prefix.as_deref()).await.unwrap(); + client + .download(path.as_str(), prefix.as_deref()) + .await + .unwrap(); } Commands::Move { path, dest } => { client.move_file(path, dest).await.unwrap(); @@ -169,7 +260,8 @@ async fn main() -> anyhow::Result<()> { .ok_or_else(|| anyhow::anyhow!("Code not found in URL"))?; let rid_token_auth_res = client::rid_token_auth(rid_code.as_str()).await?; - let token_verify_res = client::get_refresh_token(&rid_token_auth_res.custom_token).await?; + let token_verify_res = + client::get_refresh_token(&rid_token_auth_res.custom_token).await?; println!("Refresh token: {}", token_verify_res.refresh_token); } diff --git a/src/util.rs b/src/util.rs index 2ac5217..e86ebbc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -16,11 +16,11 @@ use human_bytes::human_bytes; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use tokio::{fs::File, io::BufReader, sync::Mutex}; -use crate::{constants::CHUNK_SIZE, types}; use crate::{ client::{self}, types::response::ListFilesResponseFile, }; +use crate::{constants::CHUNK_SIZE, types}; #[derive(Debug, Clone)] pub struct TargetFile { @@ -35,15 +35,16 @@ pub async fn multipart_upload( prefix: &str, region: &str, upload_id: &str, - file: TargetFile, + file: &[u8], + pb: Option, ) -> anyhow::Result<()> { let _ = upload_id; - if !file.file.exists() { - println!("File not found: {:?}", file.file); - return Err(anyhow::anyhow!("File not found: {:?}", file.file)); - } + // if !file.file.exists() { + // println!("File not found: {:?}", file.file); + // return Err(anyhow::anyhow!("File not found: {:?}", file.file)); + // } - let file_size = file.file.metadata().unwrap().len(); + let file_size = file.len() as u64; let cledential = Credentials::new( &token_res.access_key_id, @@ -83,6 +84,7 @@ pub async fn multipart_upload( let mut chunk_count = file_size / chunk_size; let mut size_of_last_chunk = file_size % chunk_size; + if size_of_last_chunk == 0 { size_of_last_chunk = chunk_size; chunk_count -= 1; @@ -90,12 +92,6 @@ pub async fn multipart_upload( let upload_parts = Arc::new(Mutex::new(Vec::::new())); - let pb = ProgressBar::new(file_size); - pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") - .unwrap() - .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) - .progress_chars("#>-")); - let semaphore = Arc::new(tokio::sync::Semaphore::new(20)); let mut handles = Vec::new(); @@ -105,7 +101,7 @@ pub async fn multipart_upload( let upload_id = upload_id.clone(); let s3_client = s3_client.clone(); let pb = pb.clone(); - let file = file.clone(); + let file = file.to_owned(); let upload_parts = upload_parts.clone(); let semaphore = semaphore.clone().acquire_owned().await.unwrap(); @@ -119,19 +115,18 @@ pub async fn multipart_upload( chunk_size }; loop { - let stream = ByteStream::read_from() - .path(file.file.clone()) - .offset(chunk_index * chunk_size) - .length(Length::Exact(this_chunk)) - .build() - .await; - let stream = match stream { - Ok(stream) => stream, - Err(e) => { - eprintln!("Error: {:?}", e); - continue; - } - }; + let offset = chunk_index * chunk_size; + let length = this_chunk; + + let bytes = file[offset as usize..(offset + length) as usize].to_vec(); + let stream = ByteStream::from(bytes); + // let stream = match stream { + // Ok(stream) => stream, + // Err(e) => { + // eprintln!("Error: {:?}", e); + // continue; + // } + // }; //Chunk index needs to start at 0, but part numbers start at 1. let part_number = (chunk_index as i32) + 1; let upload_part_res = s3_client @@ -156,7 +151,9 @@ pub async fn multipart_upload( .part_number(part_number) .build(), ); - pb.inc(this_chunk); + if let Some(pb) = &pb { + pb.inc(this_chunk); + } break; } }); @@ -186,7 +183,9 @@ pub async fn multipart_upload( .await .unwrap(); - pb.finish_with_message("Uploaded"); + if let Some(pb) = pb { + pb.finish_with_message("Uploaded"); + } Ok(()) }