improve library arguments

This commit is contained in:
sim1222 2024-07-19 23:15:48 +09:00
parent e47b29da21
commit 9e6ac3881f
Signed by: sim1222
GPG Key ID: D1AE30E316E44E5D
3 changed files with 150 additions and 125 deletions

View File

@ -1,7 +1,7 @@
use aws_config::Region; use aws_config::Region;
use aws_sdk_s3::config::Credentials; use aws_sdk_s3::config::Credentials;
use constants::APP_VERSION; use constants::APP_VERSION;
use std::path::PathBuf; use std::{io::BufReader, path::PathBuf, sync::Arc};
use util::{check_job, file_detail, list_files, multipart_upload, TargetFile}; use util::{check_job, file_detail, list_files, multipart_upload, TargetFile};
mod client; mod client;
@ -34,93 +34,26 @@ impl RakutenDriveClient {
} }
pub async fn upload( pub async fn upload(
&self, &self,
file: &PathBuf, file_path: &str,
file_data: &[u8],
prefix: Option<&str>, prefix: Option<&str>,
recursive: bool,
fake_size: Option<u64>, fake_size: Option<u64>,
pb: Option<indicatif::ProgressBar>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// is folder
if file.is_dir() && !recursive {
println!("Use --recursive option for folder upload");
return Err(anyhow::anyhow!("Use --recursive option for folder upload"));
}
let mut files = Vec::<TargetFile>::new();
if file.is_dir() && recursive {
// upload folder
let mut dirs = Vec::<PathBuf>::new();
dirs.push(file.clone());
while let Some(dir) = dirs.pop() {
let entries = std::fs::read_dir(dir).unwrap();
for entry in entries {
let entry = entry.unwrap();
let path = entry.path();
if path.is_dir() {
dirs.push(path);
} else {
files.push(TargetFile {
file: path.clone(),
path: path
.strip_prefix(file)
.unwrap()
.to_str()
.expect("Invalid File Name")
.to_string(),
});
}
}
}
// for file in files {
// println!("{:?}", file);
// }
} else {
// file check
if !file.exists() {
println!("File not found: {:?}", file);
return Err(anyhow::anyhow!("File not found: {:?}", file));
}
files.push(TargetFile {
file: file.clone(),
path: file.file_name().unwrap().to_str().unwrap().to_string(),
});
}
if cfg!(windows) {
// replase \ to /
files.iter_mut().for_each(|f| {
f.path = f.path.replace('\\', "/");
});
}
for file in &files {
if (file_detail(&file.path, &self.client).await).is_ok() {
println!("File already exists.");
return Err(anyhow::anyhow!("File already exists."));
}
}
let req = types::request::CheckUploadRequest { let req = types::request::CheckUploadRequest {
host_id: self.client.host_id.clone(), host_id: self.client.host_id.clone(),
path: prefix.unwrap_or("").to_string(), path: prefix.unwrap_or("").to_string(),
upload_id: "".to_string(), upload_id: "".to_string(),
file: files file: vec![types::request::CheckUploadRequestFile {
.iter() path: file_path.to_string(),
.map(|f| types::request::CheckUploadRequestFile { size: fake_size.unwrap_or(file_data.len() as u64) as i64,
path: f.path.clone(), }],
size: fake_size.unwrap_or(f.file.metadata().unwrap().len()) as i64,
})
.collect(),
}; };
let check_upload_res = self.client.check_upload(req).await.unwrap(); let check_upload_res = self.client.check_upload(req).await.unwrap();
// println!("{:#?}", check_upload_res);
let token_res = self.client.get_upload_token().await.unwrap(); let token_res = self.client.get_upload_token().await.unwrap();
// println!("{:#?}", token_res);
let cledential = Credentials::new( let cledential = Credentials::new(
token_res.access_key_id.clone(), token_res.access_key_id.clone(),
token_res.secret_access_key.clone(), token_res.secret_access_key.clone(),
@ -135,21 +68,22 @@ impl RakutenDriveClient {
.force_path_style(true) .force_path_style(true)
.build(); .build();
// if file_size > CHUNK_SIZE as u64 {
for (i, file) in files.iter().enumerate() {
println!("Multi Uploading: {:?}", file.file);
multipart_upload( multipart_upload(
&token_res, &token_res,
&check_upload_res.bucket, &check_upload_res.bucket,
&check_upload_res.file[i], &check_upload_res.file[0],
&check_upload_res.prefix, &check_upload_res.prefix,
&check_upload_res.region, &check_upload_res.region,
&check_upload_res.upload_id, &check_upload_res.upload_id,
file.clone(), file_data,
pb,
) )
.await .await
.unwrap(); .unwrap();
// if file_size > CHUNK_SIZE as u64 {
// for (i, file) in files.iter().enumerate() {
// println!("Multi Uploading: {:?}", file.file);
// } // }
// } else { // } else {
// for (i, file) in files.iter().enumerate() { // for (i, file) in files.iter().enumerate() {
@ -173,7 +107,7 @@ impl RakutenDriveClient {
// .unwrap(); // .unwrap();
// } // }
// } // }
} // }
match check_job(&check_upload_res.upload_id, &self.client).await { match check_job(&check_upload_res.upload_id, &self.client).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),

View File

@ -16,7 +16,11 @@ use constants::REFRESH_TOKEN;
use human_bytes::human_bytes; use human_bytes::human_bytes;
use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use rakuten_drive_cui::RakutenDriveClient; use rakuten_drive_cui::RakutenDriveClient;
use tokio::{fs::File, io::BufReader, sync::Mutex}; use tokio::{
fs::File,
io::{AsyncReadExt, BufReader},
sync::Mutex,
};
use types::response::ListFilesResponseFile; use types::response::ListFilesResponseFile;
use util::*; use util::*;
@ -105,7 +109,9 @@ enum Commands {
name: String, name: String,
}, },
#[clap(about = "Print file detail")] #[clap(about = "Print file detail")]
Info { path: String }, Info {
path: String,
},
Auth {}, Auth {},
} }
@ -125,12 +131,97 @@ async fn main() -> anyhow::Result<()> {
recursive, recursive,
fake_size, fake_size,
} => { } => {
// is folder
if file.is_dir() && !*recursive {
println!("Use --recursive option for folder upload");
return Err(anyhow::anyhow!("Use --recursive option for folder upload"));
}
let mut files = Vec::<TargetFile>::new();
if file.is_dir() && *recursive {
// upload folder
let mut dirs = Vec::<PathBuf>::new();
dirs.push(file.clone());
while let Some(dir) = dirs.pop() {
let entries = std::fs::read_dir(dir).unwrap();
for entry in entries {
let entry = entry.unwrap();
let path = entry.path();
if path.is_dir() {
dirs.push(path);
} else {
files.push(TargetFile {
file: path.clone(),
path: path
.strip_prefix(file)
.unwrap()
.to_str()
.expect("Invalid File Name")
.to_string(),
});
}
}
}
// for file in files {
// println!("{:?}", file);
// }
} else {
// file check
if !file.exists() {
println!("File not found: {:?}", file);
return Err(anyhow::anyhow!("File not found: {:?}", file));
}
files.push(TargetFile {
file: file.clone(),
path: file.file_name().unwrap().to_str().unwrap().to_string(),
});
}
if cfg!(windows) {
// replase \ to /
files.iter_mut().for_each(|f| {
f.path = f.path.replace('\\', "/");
});
}
for file in &files {
if client.info(file.path.as_str()).await.is_ok() {
println!("File already exists.");
return Err(anyhow::anyhow!("File already exists."));
}
}
for file in &files {
let file_size = file.file.metadata().unwrap().len();
let file_data = File::open(file.file.clone()).await?;
let mut file_reader = tokio::io::BufReader::new(file_data);
let mut file_data: Vec<u8> = Vec::with_capacity(file_size as usize);
file_reader.read_to_end(&mut file_data).await?;
let pb = ProgressBar::new(file_size);
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-"));
client client
.upload(file, prefix.as_deref(), *recursive, *fake_size) .upload(
.await.unwrap(); &file.path,
&file_data,
prefix.as_deref(),
*fake_size,
Some(pb),
)
.await
.unwrap();
}
} }
Commands::Download { path, prefix } => { Commands::Download { path, prefix } => {
client.download(path.as_str(), prefix.as_deref()).await.unwrap(); client
.download(path.as_str(), prefix.as_deref())
.await
.unwrap();
} }
Commands::Move { path, dest } => { Commands::Move { path, dest } => {
client.move_file(path, dest).await.unwrap(); client.move_file(path, dest).await.unwrap();
@ -169,7 +260,8 @@ async fn main() -> anyhow::Result<()> {
.ok_or_else(|| anyhow::anyhow!("Code not found in URL"))?; .ok_or_else(|| anyhow::anyhow!("Code not found in URL"))?;
let rid_token_auth_res = client::rid_token_auth(rid_code.as_str()).await?; let rid_token_auth_res = client::rid_token_auth(rid_code.as_str()).await?;
let token_verify_res = client::get_refresh_token(&rid_token_auth_res.custom_token).await?; let token_verify_res =
client::get_refresh_token(&rid_token_auth_res.custom_token).await?;
println!("Refresh token: {}", token_verify_res.refresh_token); println!("Refresh token: {}", token_verify_res.refresh_token);
} }

View File

@ -16,11 +16,11 @@ use human_bytes::human_bytes;
use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use tokio::{fs::File, io::BufReader, sync::Mutex}; use tokio::{fs::File, io::BufReader, sync::Mutex};
use crate::{constants::CHUNK_SIZE, types};
use crate::{ use crate::{
client::{self}, client::{self},
types::response::ListFilesResponseFile, types::response::ListFilesResponseFile,
}; };
use crate::{constants::CHUNK_SIZE, types};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TargetFile { pub struct TargetFile {
@ -35,15 +35,16 @@ pub async fn multipart_upload(
prefix: &str, prefix: &str,
region: &str, region: &str,
upload_id: &str, upload_id: &str,
file: TargetFile, file: &[u8],
pb: Option<ProgressBar>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let _ = upload_id; let _ = upload_id;
if !file.file.exists() { // if !file.file.exists() {
println!("File not found: {:?}", file.file); // println!("File not found: {:?}", file.file);
return Err(anyhow::anyhow!("File not found: {:?}", file.file)); // return Err(anyhow::anyhow!("File not found: {:?}", file.file));
} // }
let file_size = file.file.metadata().unwrap().len(); let file_size = file.len() as u64;
let cledential = Credentials::new( let cledential = Credentials::new(
&token_res.access_key_id, &token_res.access_key_id,
@ -83,6 +84,7 @@ pub async fn multipart_upload(
let mut chunk_count = file_size / chunk_size; let mut chunk_count = file_size / chunk_size;
let mut size_of_last_chunk = file_size % chunk_size; let mut size_of_last_chunk = file_size % chunk_size;
if size_of_last_chunk == 0 { if size_of_last_chunk == 0 {
size_of_last_chunk = chunk_size; size_of_last_chunk = chunk_size;
chunk_count -= 1; chunk_count -= 1;
@ -90,12 +92,6 @@ pub async fn multipart_upload(
let upload_parts = Arc::new(Mutex::new(Vec::<CompletedPart>::new())); let upload_parts = Arc::new(Mutex::new(Vec::<CompletedPart>::new()));
let pb = ProgressBar::new(file_size);
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-"));
let semaphore = Arc::new(tokio::sync::Semaphore::new(20)); let semaphore = Arc::new(tokio::sync::Semaphore::new(20));
let mut handles = Vec::new(); let mut handles = Vec::new();
@ -105,7 +101,7 @@ pub async fn multipart_upload(
let upload_id = upload_id.clone(); let upload_id = upload_id.clone();
let s3_client = s3_client.clone(); let s3_client = s3_client.clone();
let pb = pb.clone(); let pb = pb.clone();
let file = file.clone(); let file = file.to_owned();
let upload_parts = upload_parts.clone(); let upload_parts = upload_parts.clone();
let semaphore = semaphore.clone().acquire_owned().await.unwrap(); let semaphore = semaphore.clone().acquire_owned().await.unwrap();
@ -119,19 +115,18 @@ pub async fn multipart_upload(
chunk_size chunk_size
}; };
loop { loop {
let stream = ByteStream::read_from() let offset = chunk_index * chunk_size;
.path(file.file.clone()) let length = this_chunk;
.offset(chunk_index * chunk_size)
.length(Length::Exact(this_chunk)) let bytes = file[offset as usize..(offset + length) as usize].to_vec();
.build() let stream = ByteStream::from(bytes);
.await; // let stream = match stream {
let stream = match stream { // Ok(stream) => stream,
Ok(stream) => stream, // Err(e) => {
Err(e) => { // eprintln!("Error: {:?}", e);
eprintln!("Error: {:?}", e); // continue;
continue; // }
} // };
};
//Chunk index needs to start at 0, but part numbers start at 1. //Chunk index needs to start at 0, but part numbers start at 1.
let part_number = (chunk_index as i32) + 1; let part_number = (chunk_index as i32) + 1;
let upload_part_res = s3_client let upload_part_res = s3_client
@ -156,7 +151,9 @@ pub async fn multipart_upload(
.part_number(part_number) .part_number(part_number)
.build(), .build(),
); );
if let Some(pb) = &pb {
pb.inc(this_chunk); pb.inc(this_chunk);
}
break; break;
} }
}); });
@ -186,7 +183,9 @@ pub async fn multipart_upload(
.await .await
.unwrap(); .unwrap();
if let Some(pb) = pb {
pb.finish_with_message("Uploaded"); pb.finish_with_message("Uploaded");
}
Ok(()) Ok(())
} }