4 Commits

Author SHA1 Message Date
006a83afac fix untrimmed size parse 2024-09-29 14:33:37 +09:00
26ca153f57 wip blockdevice 2024-09-29 14:27:21 +09:00
426300f2fa wip big 2024-09-29 08:13:01 +09:00
ad827c88e6 for backup 2024-09-21 17:13:45 +09:00
4 changed files with 310 additions and 245 deletions

View File

@ -1,3 +1,3 @@
pub const REFRESH_TOKEN: &str = "AMf-vBwuDNdrMsPlvESgMqZWGCdVlBxMvrQVNvHOWb-FdDRV0Ozeq26POxH2tzy463DGlZTZpPYYhWCSi-KI0-8pSYEXtuCG_8DlVRyqwm4POeobYWkrw3dMgiEDNjFCfXIN4-k65CsizmCFbWQz6ASsi-XGAwMn_mXyFj9JirB7vyzTTr2ugbA"; pub const REFRESH_TOKEN: &str = "AMf-vBwuDNdrMsPlvESgMqZWGCdVlBxMvrQVNvHOWb-FdDRV0Ozeq26POxH2tzy463DGlZTZpPYYhWCSi-KI0-8pSYEXtuCG_8DlVRyqwm4POeobYWkrw3dMgiEDNjFCfXIN4-k65CsizmCFbWQz6ASsi-XGAwMn_mXyFj9JirB7vyzTTr2ugbA";
pub const CHUNK_SIZE: usize = 1024 * 1024 * 10; // 10MB pub const CHUNK_SIZE: usize = 1024 * 1024 * 100; // 10MB
pub const APP_VERSION: &str = "v21.11.10"; pub const APP_VERSION: &str = "v21.11.10";

View File

@ -207,20 +207,34 @@ impl RakutenDriveClient {
pub async fn upload_from_path( pub async fn upload_from_path(
&self, &self,
file: TargetFile, file: TargetFile,
custom_file_name: Option<&str>,
prefix: Option<&str>, prefix: Option<&str>,
fake_size: Option<u64>, fake_size: Option<u64>,
offset: Option<u64>,
length: Option<u64>,
pb: Option<indicatif::ProgressBar>, pb: Option<indicatif::ProgressBar>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if offset.is_some() && length.is_none() || offset.is_none() && length.is_some() {
println!("offset and length are must be set both");
return Err(anyhow::anyhow!("offset and length are must be set both"));
}
let prefix = if prefix.unwrap_or("") == "/" { let prefix = if prefix.unwrap_or("") == "/" {
"" ""
} else { } else {
prefix.unwrap_or("") prefix.unwrap_or("")
}; };
let file_name = match custom_file_name {
Some(n) => n,
None => file.file.file_name().unwrap().to_str().unwrap(),
};
let req = types::request::CheckUploadRequest { let req = types::request::CheckUploadRequest {
host_id: self.client.host_id.clone(), host_id: self.client.host_id.clone(),
path: prefix.to_string(), path: prefix.to_string(),
file: vec![types::request::CheckUploadRequestFile { file: vec![types::request::CheckUploadRequestFile {
path: file.file.to_str().unwrap().to_string(), path: file_name.to_string(),
size: fake_size.unwrap_or(file.file.metadata().unwrap().len() as u64) as i64, size: fake_size.unwrap_or(file.file.metadata().unwrap().len() as u64) as i64,
hash: None, hash: None,
host_id: None, host_id: None,
@ -229,6 +243,7 @@ impl RakutenDriveClient {
}; };
println!("upload from path"); println!("upload from path");
println!("{:#?}", req);
// println!("prefix: {:?}", prefix.unwrap_or("")); // println!("prefix: {:?}", prefix.unwrap_or(""));
@ -261,6 +276,8 @@ impl RakutenDriveClient {
&check_upload_res.region, &check_upload_res.region,
&check_upload_res.upload_id, &check_upload_res.upload_id,
file, file,
offset,
length,
pb.clone(), pb.clone(),
) )
.await .await

View File

@ -2,7 +2,9 @@ use std::{
cmp::{max, min}, cmp::{max, min},
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
io::{stdout, Write}, io::{stdout, Write},
os::unix::fs::FileTypeExt,
path::{Path, PathBuf}, path::{Path, PathBuf},
str::FromStr,
sync::Arc, sync::Arc,
}; };
@ -69,6 +71,26 @@ enum Commands {
#[clap(short, long)] #[clap(short, long)]
stream: bool, stream: bool,
}, },
#[clap(about = "Upload ultra big file")]
UploadBig {
file: PathBuf,
/// Parent folder path
#[clap(short, long)]
prefix: Option<String>,
/// Send fake file size to server (byte)
#[clap(short, long)]
fake_size: Option<u64>,
/// Do not check file existence
#[clap(long)]
force: bool,
/// Set division size in bytes
#[clap(long)]
length: Option<u64>,
},
#[clap(about = "Download file")] #[clap(about = "Download file")]
Download { Download {
path: String, path: String,
@ -188,44 +210,19 @@ async fn main() -> anyhow::Result<()> {
let mut files = Vec::<TargetFile>::new(); let mut files = Vec::<TargetFile>::new();
if file.is_dir() && recursive {
// upload folder
let mut dirs = Vec::<PathBuf>::new();
dirs.push(file.clone());
while let Some(dir) = dirs.pop() {
let entries = std::fs::read_dir(dir).unwrap();
for entry in entries {
let entry = entry.unwrap();
let path = entry.path();
if path.is_dir() {
dirs.push(path);
} else {
files.push(TargetFile {
file: path.clone(),
path: (prefix.clone().unwrap_or("".to_string())
+ path
.strip_prefix(&file)
.unwrap()
.to_str()
.expect("Invalid File Name")),
});
}
}
}
// for file in files {
// println!("{:?}", file);
// }
} else {
// file check // file check
if !file.exists() { if !file.exists() {
println!("File not found: {:?}", file); println!("File not found: {:?}", file);
return Err(anyhow::anyhow!("File not found: {:?}", file)); return Err(anyhow::anyhow!("File not found: {:?}", file));
} }
files.push(TargetFile { let target_file = TargetFile {
file: file.clone(), file: file.clone(),
path: file.file_name().unwrap().to_str().unwrap().to_string(), path: file.file_name().unwrap().to_str().unwrap().to_string(),
}); };
} let path = match prefix.clone() {
Some(p) => p + target_file.clone().path.as_str(),
None => target_file.clone().path,
};
if cfg!(windows) { if cfg!(windows) {
// replase \ to / // replase \ to /
@ -236,9 +233,8 @@ async fn main() -> anyhow::Result<()> {
if !force { if !force {
println!("Checking file existence..."); println!("Checking file existence...");
for file in &files { println!("Checking: {:?}", path);
println!("Checking: {:?}", file.path); let res = client.info(path.as_str()).await;
let res = client.info(file.path.as_str()).await;
if res.is_ok() { if res.is_ok() {
println!("File already exists."); println!("File already exists.");
return Err(anyhow::anyhow!("File already exists.")); return Err(anyhow::anyhow!("File already exists."));
@ -246,75 +242,9 @@ async fn main() -> anyhow::Result<()> {
println!("{:?}", res.err().unwrap()); println!("{:?}", res.err().unwrap());
} }
} }
}
// println!("{:#?}", files); let file_size = target_file.file.metadata().unwrap().len();
// let file_data = File::open(target_file.file.clone()).await.unwrap();
let mut dirs = files
.iter()
.map(|f| f.path.clone())
.map(|f| f.split('/').collect::<Vec<&str>>()[..f.split('/').count() - 1].join("/"))
.collect::<HashSet<String>>()
.into_iter()
.collect::<Vec<String>>();
dirs.sort();
println!("{:?}", dirs);
let mut done_set = Vec::new();
for dir in dirs {
let paths = dir.split('/');
println!("{}", dir);
for (i, path) in paths.clone().enumerate() {
let mut path_prefix = paths.clone().take(i).collect::<Vec<&str>>().join("/");
if path_prefix.is_empty() && path == file.file_name().unwrap().to_str().unwrap()
{
continue;
}
if !path_prefix.is_empty() {
path_prefix.push('/');
}
if done_set
.iter()
.any(|x| x == &(path.to_string(), path_prefix.clone()))
{
continue;
}
println!("path: {:?}", path);
println!("prefix: {:?}", path_prefix);
match client.mkdir(path, Some(&path_prefix)).await {
Ok(_) => {
done_set.push((path.to_string(), path_prefix));
}
Err(e) => {
println!("{:?}", e);
continue;
}
}
}
}
let small_files = files
.iter()
.filter(|f| f.file.metadata().unwrap().len() < 1024 * 1024 * 10 * 10)
.collect::<Vec<&TargetFile>>();
let large_files = files
.iter()
.filter(|f| f.file.metadata().unwrap().len() >= 1024 * 1024 * 10 * 10)
.collect::<Vec<&TargetFile>>();
println!("small: {}", small_files.len());
println!("large: {}", large_files.len());
for file in large_files {
// wait 50ms
// tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let file_size = file.file.metadata().unwrap().len();
let file_data = File::open(file.file.clone()).await.unwrap();
let pb = ProgressBar::new(file_size); let pb = ProgressBar::new(file_size);
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
@ -322,106 +252,202 @@ async fn main() -> anyhow::Result<()> {
.with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-")); .progress_chars("#>-"));
let file_name = file.file.file_name().unwrap().to_str().unwrap(); // let file_name = target_file.file.file_name().unwrap().to_str().unwrap();
let file_dir = file.path.split('/').collect::<Vec<&str>>() // let file_dir = target_file.path.split('/').collect::<Vec<&str>>()
[..file.path.split('/').count() - 1] // [..target_file.path.split('/').count() - 1]
.join("/") // .join("/")
+ "/"; // + "/";
// println!("file.path: {:?}", file_name); // println!("file.path: {:?}", file_name);
// println!("prefix: {:?}", file_dir); // println!("prefix: {:?}", file_dir);
if stream {
println!("is stream true");
client client
.upload_from_path(file.clone(), Some(&file_dir), fake_size, Some(pb)) .upload_from_path(
.await target_file.clone(),
.unwrap(); None,
} else { prefix.as_deref(),
let mut file_reader = tokio::io::BufReader::new(file_data);
let mut file_data: Vec<u8> = Vec::with_capacity(file_size as usize);
file_reader.read_to_end(&mut file_data).await.unwrap();
client
.upload(
vec![UploadFile {
data: file_data,
path: file_name.to_string(),
}],
Some(&file_dir),
fake_size, fake_size,
None,
None,
Some(pb), Some(pb),
) )
.await .await
.unwrap(); .unwrap();
for i in 0..5 {
println!("Checking: {:?}", path);
let res = client.info(path.as_str()).await;
if res.is_ok() {
println!("File exists.");
break;
} else {
println!("{:?}", res.err().unwrap());
if i > 5 {
return Err(anyhow::anyhow!("File not exists."));
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
continue;
}
}
}
Commands::UploadBig {
file,
prefix,
fake_size,
force,
length,
} => {
// is folder
if file.is_dir() {
println!("Can't folder upload");
return Err(anyhow::anyhow!("Can't folder upload"));
}
if let Some(prefix) = prefix.as_ref() {
if !prefix.ends_with('/') {
println!("Prefix must end with /");
return Err(anyhow::anyhow!("Prefix must end with /"));
} }
} }
let mut a = HashMap::<String, Vec<TargetFile>>::new(); let mut files = Vec::<TargetFile>::new();
for file in small_files { // file check
// per parent path if !file.exists() {
let file_size = file.file.metadata().unwrap().len(); println!("File not found: {:?}", file);
let file_data = File::open(file.file.clone()).await.unwrap(); return Err(anyhow::anyhow!("File not found: {:?}", file));
let mut file_reader = tokio::io::BufReader::new(file_data);
let mut file_data: Vec<u8> = Vec::with_capacity(file_size as usize);
file_reader.read_to_end(&mut file_data).await.unwrap();
let file_name = file.file.file_name().unwrap().to_str().unwrap();
let file_dir = file.path.split('/').collect::<Vec<&str>>()
[..file.path.split('/').count() - 1]
.join("/")
+ "/";
a.entry(file_dir.clone()).or_default().push(file.clone());
} }
let target_file = TargetFile {
file: file.clone(),
path: file.file_name().unwrap().to_str().unwrap().to_string(),
};
let path = match prefix.clone() {
Some(p) => p + target_file.clone().path.as_str(),
None => target_file.clone().path,
};
let b = a.into_values().collect::<Vec<Vec<TargetFile>>>(); if cfg!(windows) {
// replase \ to /
for dir_group in b { files.iter_mut().for_each(|f| {
let mut upload_files = Vec::new(); f.path = f.path.replace('\\', "/");
for file in &dir_group {
let file_size = file.file.metadata().unwrap().len();
let file_data = File::open(file.file.clone()).await.unwrap();
let mut file_reader = tokio::io::BufReader::new(file_data);
let mut file_data: Vec<u8> = Vec::with_capacity(file_size as usize);
file_reader.read_to_end(&mut file_data).await.unwrap();
let file_name = file.file.file_name().unwrap().to_str().unwrap();
let file_dir = file.path.split('/').collect::<Vec<&str>>()
[..file.path.split('/').count() - 1]
.join("/")
+ "/";
upload_files.push(UploadFile {
data: file_data,
path: file_name.to_string(),
}); });
} }
println!("Uploading {} files", upload_files.len()); let is_blockdevice = std::fs::metadata(target_file.file.as_path())
.unwrap()
.file_type()
.is_block_device();
let pb = ProgressBar::new(upload_files.len() as u64); let file_size = if is_blockdevice {
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})") let sectors: u64 = std::fs::read_to_string(format!(
"/sys/block/{}/size",
target_file.file.file_name().unwrap().to_str().unwrap()
))
.unwrap()
.trim()
.parse()
.unwrap();
sectors.checked_mul(512).unwrap()
} else {
target_file.file.metadata().unwrap().len()
};
let file_chunk_size = length.unwrap_or(1 * 1024 * 1024 * 1024); // 16GB
let mut chunk_count = file_size.checked_div(file_chunk_size).unwrap();
let mut size_of_last_chunk = file_size % file_chunk_size;
if size_of_last_chunk == 0 {
size_of_last_chunk = file_chunk_size;
chunk_count -= 1;
}
for file_chunk_index in 0..chunk_count + 1 {
// if !force {
// println!("Checking file existence...");
// println!("Checking: {:?}", path);
// let res = client.info(path.as_str()).await;
// if res.is_ok() {
// println!("File already exists.");
// return Err(anyhow::anyhow!("File already exists."));
// } else {
// println!("{:?}", res.err().unwrap());
// }
// }
let this_chunk_size = if chunk_count == file_chunk_index {
size_of_last_chunk
} else {
file_chunk_size
};
let offset = file_chunk_index * file_chunk_size;
let file_name = target_file
.file
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string()
+ "."
+ (file_chunk_index + 1).to_string().as_str();
let file_path = match prefix.clone() {
Some(p) => p + file_name.as_str(),
None => file_name.clone(),
};
// let file_data = File::open(target_file.file.clone()).await.unwrap();
loop {
println!(
"Uploading {}/{} {}...",
file_chunk_index + 1,
chunk_count + 1,
file_name
);
let pb = ProgressBar::new(this_chunk_size);
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap() .unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-")); .progress_chars("#>-"));
client match client
.upload( .upload_from_path(
upload_files, target_file.clone(),
Some( Some(file_name.as_str()),
(dir_group[0].path.split('/').collect::<Vec<&str>>() prefix.as_deref(),
[..dir_group[0].path.split('/').count() - 1]
.join("/")
+ "/")
.as_str(),
),
fake_size, fake_size,
Some(offset),
Some(this_chunk_size),
Some(pb), Some(pb),
) )
.await .await
.unwrap(); {
Ok(_) => {}
Err(e) => {
println!("ERROR: {:#?}", e);
continue;
}
};
for i in 0..20 {
println!("Checking: {:?}", file_path);
let res = client.info(file_path.as_str()).await;
if res.is_ok() {
println!("File exists.");
break;
} else {
println!("{:?}", res.err().unwrap());
if i > 20 {
return Err(anyhow::anyhow!("File not exists."));
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
continue;
}
}
break;
}
} }
} }
Commands::Download { path, prefix } => { Commands::Download { path, prefix } => {

View File

@ -10,7 +10,7 @@ use aws_sdk_s3::{
config::Credentials, operation::upload_part, primitives::ByteStream, types::CompletedPart, config::Credentials, operation::upload_part, primitives::ByteStream, types::CompletedPart,
}; };
use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
use aws_smithy_types::byte_stream::Length; use aws_smithy_types::byte_stream::{FsBuilder, Length};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use human_bytes::human_bytes; use human_bytes::human_bytes;
use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use indicatif::{ProgressBar, ProgressState, ProgressStyle};
@ -251,6 +251,8 @@ pub async fn multipart_upload_from_path(
region: &str, region: &str,
upload_id: &str, upload_id: &str,
file: TargetFile, file: TargetFile,
offset: Option<u64>,
length: Option<u64>,
pb: Option<ProgressBar>, pb: Option<ProgressBar>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let _ = upload_id; let _ = upload_id;
@ -259,7 +261,17 @@ pub async fn multipart_upload_from_path(
return Err(anyhow::anyhow!("File not found: {:?}", file.file)); return Err(anyhow::anyhow!("File not found: {:?}", file.file));
} }
let file_size = file.file.metadata().unwrap().len() as u64; if offset.is_some() && length.is_none() || offset.is_none() && length.is_some() {
println!("offset and length are must be set both");
return Err(anyhow::anyhow!("offset and length are must be set both"));
}
let file_size = match length {
Some(length) => length,
None => file.file.metadata().unwrap().len() as u64,
};
println!("file_size: {}", file_size);
let cledential = Credentials::new( let cledential = Credentials::new(
&token_res.access_key_id, &token_res.access_key_id,
@ -290,8 +302,12 @@ pub async fn multipart_upload_from_path(
.bucket(bucket) .bucket(bucket)
.key(key.clone()) .key(key.clone())
.send() .send()
.await .await;
.unwrap();
let multipart_upload_res = match multipart_upload_res {
Ok(res) => res,
Err(e) => return Err(anyhow::anyhow!("Can't create multipart upload: {:?}", e)),
};
let upload_id = multipart_upload_res.upload_id().unwrap().to_string(); let upload_id = multipart_upload_res.upload_id().unwrap().to_string();
@ -307,10 +323,10 @@ pub async fn multipart_upload_from_path(
let upload_parts = Arc::new(Mutex::new(Vec::<CompletedPart>::new())); let upload_parts = Arc::new(Mutex::new(Vec::<CompletedPart>::new()));
let semaphore = Arc::new(tokio::sync::Semaphore::new(20)); let semaphore = Arc::new(tokio::sync::Semaphore::new(10));
let mut handles = Vec::new(); let mut handles = Vec::new();
for chunk_index in 0..chunk_count { for chunk_index in 0..chunk_count + 1 {
let bucket = bucket.to_owned(); let bucket = bucket.to_owned();
let key = key.clone(); let key = key.clone();
let upload_id = upload_id.clone(); let upload_id = upload_id.clone();
@ -324,18 +340,15 @@ pub async fn multipart_upload_from_path(
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let _permit = semaphore; let _permit = semaphore;
let this_chunk = if chunk_count - 1 == chunk_index { let this_chunk = if chunk_count == chunk_index {
size_of_last_chunk size_of_last_chunk
} else { } else {
chunk_size chunk_size
}; };
loop { loop {
let offset = chunk_index * chunk_size;
let length = this_chunk;
let stream = ByteStream::read_from() let stream = ByteStream::read_from()
.path(file.file.clone()) .path(file.file.clone())
.offset(chunk_index * chunk_size) .offset(chunk_index * chunk_size + offset.unwrap_or(0))
.length(Length::Exact(this_chunk)) .length(Length::Exact(this_chunk))
.build() .build()
.await; .await;
@ -406,8 +419,12 @@ pub async fn multipart_upload_from_path(
.upload_id(upload_id) .upload_id(upload_id)
.multipart_upload(completed_multipart_upload) .multipart_upload(completed_multipart_upload)
.send() .send()
.await .await;
.unwrap();
match _complete_multipart_upload_res {
Ok(_) => {}
Err(e) => return Err(anyhow::anyhow!("Can't complete multipart upload: {:?}", e)),
};
if let Some(pb) = pb { if let Some(pb) = pb {
pb.finish_with_message("Uploaded"); pb.finish_with_message("Uploaded");
@ -451,29 +468,29 @@ pub async fn list_files(
files.append(&mut res.file); files.append(&mut res.file);
if !res.last_page { // if !res.last_page {
let mut cursor = res.file.len() as i64; // let mut cursor = res.file.len() as i64;
loop { // loop {
let req = types::request::ListFilesRequest { // let req = types::request::ListFilesRequest {
from: cursor, // from: cursor,
host_id: client.host_id.clone(), // host_id: client.host_id.clone(),
path: prefix.unwrap_or("").to_string(), // path: prefix.unwrap_or("").to_string(),
sort_type: types::request::ListFilesRequestSortType::Path, // sort_type: types::request::ListFilesRequestSortType::Path,
reverse: false, // reverse: false,
thumbnail_size: 130, // thumbnail_size: 130,
to: pagination_size + cursor, // to: pagination_size + cursor,
}; // };
//
let mut next_res = client.list_files(req).await?; // let mut next_res = client.list_files(req).await?;
files.append(&mut next_res.file); // files.append(&mut next_res.file);
//
if next_res.last_page { // if next_res.last_page {
break; // break;
} else { // } else {
cursor += next_res.file.len() as i64; // cursor += next_res.file.len() as i64;
} // }
} // }
} // }
res.file = files; res.file = files;
// files.iter().find(|f| f.path == "/").unwrap(); // files.iter().find(|f| f.path == "/").unwrap();
@ -486,7 +503,12 @@ pub async fn check_job(key: &str, client: &client::Client) -> anyhow::Result<()>
let req = types::request::CheckActionRequest { let req = types::request::CheckActionRequest {
key: key.to_string(), key: key.to_string(),
}; };
let res = client.check_action(req).await.unwrap(); let res = client.check_action(req).await;
let res = match res {
Ok(res) => res,
Err(e) => return Err(anyhow::anyhow!("Error: {:?}", e)),
};
if res.state == "complete" { if res.state == "complete" {
return Ok(()); return Ok(());