From ad827c88e61074e3cc7d336577cbdfdc77796168 Mon Sep 17 00:00:00 2001 From: sim1222 Date: Sat, 21 Sep 2024 17:13:45 +0900 Subject: [PATCH] for backup --- src/lib.rs | 3 +- src/main.rs | 265 ++++++++++------------------------------------------ src/util.rs | 25 +++-- 3 files changed, 72 insertions(+), 221 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d466328..be50f41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -220,7 +220,7 @@ impl RakutenDriveClient { host_id: self.client.host_id.clone(), path: prefix.to_string(), file: vec![types::request::CheckUploadRequestFile { - path: file.file.to_str().unwrap().to_string(), + path: file.file.file_name().unwrap().to_str().unwrap().to_string(), size: fake_size.unwrap_or(file.file.metadata().unwrap().len() as u64) as i64, hash: None, host_id: None, @@ -229,6 +229,7 @@ impl RakutenDriveClient { }; println!("upload from path"); + println!("{:#?}", req); // println!("prefix: {:?}", prefix.unwrap_or("")); diff --git a/src/main.rs b/src/main.rs index d782325..99bcafc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use std::{ collections::{HashMap, HashSet}, io::{stdout, Write}, path::{Path, PathBuf}, + str::FromStr, sync::Arc, }; @@ -188,44 +189,19 @@ async fn main() -> anyhow::Result<()> { let mut files = Vec::::new(); - if file.is_dir() && recursive { - // upload folder - let mut dirs = Vec::::new(); - dirs.push(file.clone()); - while let Some(dir) = dirs.pop() { - let entries = std::fs::read_dir(dir).unwrap(); - for entry in entries { - let entry = entry.unwrap(); - let path = entry.path(); - if path.is_dir() { - dirs.push(path); - } else { - files.push(TargetFile { - file: path.clone(), - path: (prefix.clone().unwrap_or("".to_string()) - + path - .strip_prefix(&file) - .unwrap() - .to_str() - .expect("Invalid File Name")), - }); - } - } - } - // for file in files { - // println!("{:?}", file); - // } - } else { - // file check - if !file.exists() { - println!("File not found: {:?}", file); - return Err(anyhow::anyhow!("File not found: {:?}", file)); - } - files.push(TargetFile { - file: file.clone(), - path: file.file_name().unwrap().to_str().unwrap().to_string(), - }); + // file check + if !file.exists() { + println!("File not found: {:?}", file); + return Err(anyhow::anyhow!("File not found: {:?}", file)); } + let target_file = TargetFile { + file: file.clone(), + path: file.file_name().unwrap().to_str().unwrap().to_string(), + }; + let path = match prefix.clone() { + Some(p) => p + target_file.clone().path.as_str(), + None => target_file.clone().path, + }; if cfg!(windows) { // replase \ to / @@ -236,192 +212,53 @@ async fn main() -> anyhow::Result<()> { if !force { println!("Checking file existence..."); - for file in &files { - println!("Checking: {:?}", file.path); - let res = client.info(file.path.as_str()).await; - if res.is_ok() { - println!("File already exists."); - return Err(anyhow::anyhow!("File already exists.")); - } else { - println!("{:?}", res.err().unwrap()); - } - } - } - - // println!("{:#?}", files); - - let mut dirs = files - .iter() - .map(|f| f.path.clone()) - .map(|f| f.split('/').collect::>()[..f.split('/').count() - 1].join("/")) - .collect::>() - .into_iter() - .collect::>(); - dirs.sort(); - - println!("{:?}", dirs); - - let mut done_set = Vec::new(); - - for dir in dirs { - let paths = dir.split('/'); - println!("{}", dir); - for (i, path) in paths.clone().enumerate() { - let mut path_prefix = paths.clone().take(i).collect::>().join("/"); - if path_prefix.is_empty() && path == file.file_name().unwrap().to_str().unwrap() - { - continue; - } - if !path_prefix.is_empty() { - path_prefix.push('/'); - } - - if done_set - .iter() - .any(|x| x == &(path.to_string(), path_prefix.clone())) - { - continue; - } - - println!("path: {:?}", path); - println!("prefix: {:?}", path_prefix); - match client.mkdir(path, Some(&path_prefix)).await { - Ok(_) => { - done_set.push((path.to_string(), path_prefix)); - } - Err(e) => { - println!("{:?}", e); - continue; - } - } - } - } - - let small_files = files - .iter() - .filter(|f| f.file.metadata().unwrap().len() < 1024 * 1024 * 10 * 10) - .collect::>(); - let large_files = files - .iter() - .filter(|f| f.file.metadata().unwrap().len() >= 1024 * 1024 * 10 * 10) - .collect::>(); - - println!("small: {}", small_files.len()); - println!("large: {}", large_files.len()); - - for file in large_files { - // wait 50ms - // tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let file_size = file.file.metadata().unwrap().len(); - let file_data = File::open(file.file.clone()).await.unwrap(); - - let pb = ProgressBar::new(file_size); - pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") - .unwrap() - .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) - .progress_chars("#>-")); - - let file_name = file.file.file_name().unwrap().to_str().unwrap(); - let file_dir = file.path.split('/').collect::>() - [..file.path.split('/').count() - 1] - .join("/") - + "/"; - - // println!("file.path: {:?}", file_name); - // println!("prefix: {:?}", file_dir); - - if stream { - println!("is stream true"); - client - .upload_from_path(file.clone(), Some(&file_dir), fake_size, Some(pb)) - .await - .unwrap(); + println!("Checking: {:?}", path); + let res = client.info(path.as_str()).await; + if res.is_ok() { + println!("File already exists."); + return Err(anyhow::anyhow!("File already exists.")); } else { - let mut file_reader = tokio::io::BufReader::new(file_data); - let mut file_data: Vec = Vec::with_capacity(file_size as usize); - file_reader.read_to_end(&mut file_data).await.unwrap(); - - client - .upload( - vec![UploadFile { - data: file_data, - path: file_name.to_string(), - }], - Some(&file_dir), - fake_size, - Some(pb), - ) - .await - .unwrap(); + println!("{:?}", res.err().unwrap()); } } - let mut a = HashMap::>::new(); + let file_size = target_file.file.metadata().unwrap().len(); + // let file_data = File::open(target_file.file.clone()).await.unwrap(); - for file in small_files { - // per parent path - let file_size = file.file.metadata().unwrap().len(); - let file_data = File::open(file.file.clone()).await.unwrap(); - let mut file_reader = tokio::io::BufReader::new(file_data); - let mut file_data: Vec = Vec::with_capacity(file_size as usize); - file_reader.read_to_end(&mut file_data).await.unwrap(); - - let file_name = file.file.file_name().unwrap().to_str().unwrap(); - let file_dir = file.path.split('/').collect::>() - [..file.path.split('/').count() - 1] - .join("/") - + "/"; - - a.entry(file_dir.clone()).or_default().push(file.clone()); - } - - let b = a.into_values().collect::>>(); - - for dir_group in b { - let mut upload_files = Vec::new(); - - for file in &dir_group { - let file_size = file.file.metadata().unwrap().len(); - let file_data = File::open(file.file.clone()).await.unwrap(); - let mut file_reader = tokio::io::BufReader::new(file_data); - let mut file_data: Vec = Vec::with_capacity(file_size as usize); - file_reader.read_to_end(&mut file_data).await.unwrap(); - - let file_name = file.file.file_name().unwrap().to_str().unwrap(); - let file_dir = file.path.split('/').collect::>() - [..file.path.split('/').count() - 1] - .join("/") - + "/"; - upload_files.push(UploadFile { - data: file_data, - path: file_name.to_string(), - }); - } - - println!("Uploading {} files", upload_files.len()); - - let pb = ProgressBar::new(upload_files.len() as u64); - pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})") + let pb = ProgressBar::new(file_size); + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") .unwrap() .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .progress_chars("#>-")); - client - .upload( - upload_files, - Some( - (dir_group[0].path.split('/').collect::>() - [..dir_group[0].path.split('/').count() - 1] - .join("/") - + "/") - .as_str(), - ), - fake_size, - Some(pb), - ) - .await - .unwrap(); + // let file_name = target_file.file.file_name().unwrap().to_str().unwrap(); + // let file_dir = target_file.path.split('/').collect::>() + // [..target_file.path.split('/').count() - 1] + // .join("/") + // + "/"; + + // println!("file.path: {:?}", file_name); + // println!("prefix: {:?}", file_dir); + + client + .upload_from_path(target_file.clone(), prefix.as_deref(), fake_size, Some(pb)) + .await + .unwrap(); + + for i in 0..5 { + println!("Checking: {:?}", path); + let res = client.info(path.as_str()).await; + if res.is_ok() { + println!("File exists."); + break; + } else { + println!("{:?}", res.err().unwrap()); + if i > 5 { + return Err(anyhow::anyhow!("File not exists.")); + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + continue; + } } } Commands::Download { path, prefix } => { diff --git a/src/util.rs b/src/util.rs index 0048131..fcf5419 100644 --- a/src/util.rs +++ b/src/util.rs @@ -290,8 +290,12 @@ pub async fn multipart_upload_from_path( .bucket(bucket) .key(key.clone()) .send() - .await - .unwrap(); + .await; + + let multipart_upload_res = match multipart_upload_res { + Ok(res) => res, + Err(e) => return Err(anyhow::anyhow!("Can't create multipart upload: {:?}", e)), + }; let upload_id = multipart_upload_res.upload_id().unwrap().to_string(); @@ -307,7 +311,7 @@ pub async fn multipart_upload_from_path( let upload_parts = Arc::new(Mutex::new(Vec::::new())); - let semaphore = Arc::new(tokio::sync::Semaphore::new(20)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(5)); let mut handles = Vec::new(); for chunk_index in 0..chunk_count { @@ -406,8 +410,12 @@ pub async fn multipart_upload_from_path( .upload_id(upload_id) .multipart_upload(completed_multipart_upload) .send() - .await - .unwrap(); + .await; + + match _complete_multipart_upload_res { + Ok(_) => {} + Err(e) => return Err(anyhow::anyhow!("Can't complete multipart upload: {:?}", e)), + }; if let Some(pb) = pb { pb.finish_with_message("Uploaded"); @@ -486,7 +494,12 @@ pub async fn check_job(key: &str, client: &client::Client) -> anyhow::Result<()> let req = types::request::CheckActionRequest { key: key.to_string(), }; - let res = client.check_action(req).await.unwrap(); + let res = client.check_action(req).await; + + let res = match res { + Ok(res) => res, + Err(e) => return Err(anyhow::anyhow!("Error: {:?}", e)), + }; if res.state == "complete" { return Ok(());