Compare commits

...

2 Commits
master ... wip

Author SHA1 Message Date
6801f8101d
wip 2024-09-21 15:32:29 +09:00
dc2be28874
impl file stream upload 2024-08-03 02:09:28 +09:00
7 changed files with 734 additions and 85 deletions

BIN
src/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -89,7 +89,7 @@ impl Client {
let request = client let request = client
.get(format!( .get(format!(
"https://forest.sendy.jp/cloud/service/file/v1/filelink/token?host_id={}&path={}", "https://forest.sendy.jp/cloud/service/file/v1/filelink/token?host_id={}&path={}",
self.host_id, "hello" self.host_id, ""
)) ))
.bearer_auth(&self.token.read().await); .bearer_auth(&self.token.read().await);

View File

@ -1,8 +1,10 @@
use aws_config::Region; use aws_config::Region;
use aws_sdk_s3::config::Credentials; use aws_sdk_s3::config::Credentials;
use constants::APP_VERSION; use constants::{APP_VERSION, CHUNK_SIZE};
use std::{io::BufReader, path::PathBuf, sync::Arc}; use std::{io::BufReader, path::PathBuf, sync::Arc};
use util::{check_job, file_detail, list_files, multipart_upload, TargetFile}; use util::{
check_job, file_detail, list_files, multipart_upload, multipart_upload_from_path, single_upload,
};
mod client; mod client;
mod constants; mod constants;
@ -13,6 +15,17 @@ pub struct RakutenDriveClient {
client: client::Client, client: client::Client,
} }
pub struct UploadFile {
pub data: Vec<u8>,
pub path: String,
}
#[derive(Debug, Clone)]
pub struct TargetFile {
pub file: PathBuf,
pub path: String,
}
impl RakutenDriveClient { impl RakutenDriveClient {
pub async fn try_new(refresh_token_str: String) -> anyhow::Result<Self> { pub async fn try_new(refresh_token_str: String) -> anyhow::Result<Self> {
let client = client::Client::try_new(refresh_token_str).await?; let client = client::Client::try_new(refresh_token_str).await?;
@ -34,22 +47,35 @@ impl RakutenDriveClient {
} }
pub async fn upload( pub async fn upload(
&self, &self,
file_path: &str, files: Vec<UploadFile>,
file_data: &[u8],
prefix: Option<&str>, prefix: Option<&str>,
fake_size: Option<u64>, fake_size: Option<u64>,
pb: Option<indicatif::ProgressBar>, pb: Option<indicatif::ProgressBar>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let prefix = if prefix.unwrap_or("") == "/" {
""
} else {
prefix.unwrap_or("")
};
let req = types::request::CheckUploadRequest { let req = types::request::CheckUploadRequest {
host_id: self.client.host_id.clone(), host_id: self.client.host_id.clone(),
path: prefix.unwrap_or("").to_string(), path: prefix.to_string(),
upload_id: "".to_string(), file: files
file: vec![types::request::CheckUploadRequestFile { .iter()
path: file_path.to_string(), .map(|file| types::request::CheckUploadRequestFile {
size: fake_size.unwrap_or(file_data.len() as u64) as i64, path: file.path.clone(),
}], size: fake_size.unwrap_or(file.data.len() as u64) as i64,
version_id: None,
host_id: None,
hash: None,
})
.collect(),
}; };
println!("{:#?}", req);
// println!("prefix: {:?}", prefix.unwrap_or(""));
let check_upload_res = self.client.check_upload(req).await.unwrap(); let check_upload_res = self.client.check_upload(req).await.unwrap();
let token_res = self.client.get_upload_token().await.unwrap(); let token_res = self.client.get_upload_token().await.unwrap();
@ -68,46 +94,220 @@ impl RakutenDriveClient {
.force_path_style(true) .force_path_style(true)
.build(); .build();
multipart_upload( let semaphore = Arc::new(tokio::sync::Semaphore::new(5));
let mut handles = Vec::new();
for (i, file) in files.iter().enumerate() {
let token_res = token_res.clone();
let check_upload_res = check_upload_res.clone();
let file_data = file.data.clone();
let pb = pb.clone();
let semaphore = semaphore.clone().acquire_owned().await.unwrap();
if file_data.len() > CHUNK_SIZE * 10 {
multipart_upload(
&token_res,
&check_upload_res.bucket,
&check_upload_res.file[i],
&check_upload_res.prefix,
&check_upload_res.region,
&check_upload_res.upload_id,
&file_data,
pb.clone(),
)
.await
.unwrap();
} else {
handles.push(tokio::spawn(async move {
let _permit = semaphore;
loop {
match single_upload(
&token_res,
&check_upload_res.bucket,
&check_upload_res.file[i],
&check_upload_res.prefix,
&check_upload_res.region,
&check_upload_res.upload_id,
&file_data,
)
.await
{
Ok(_) => {
// println!("Uploaded: {:?}", i);
break;
}
Err(e) => {
// println!("Error");
continue;
}
}
}
if let Some(pb) = pb {
pb.inc(1);
}
}));
}
}
for handle in handles {
handle.await.unwrap();
}
// multipart_upload(
// &token_res,
// &check_upload_res.bucket,
// &check_upload_res.file[0],
// &check_upload_res.prefix,
// &check_upload_res.region,
// &check_upload_res.upload_id,
// file_data,
// pb,
// )
// .await
// .unwrap();
// if file_size > CHUNK_SIZE as u64 {
// for (i, file) in files.iter().enumerate() {
// println!("Multi Uploading: {:?}", file.file);
// }
// } else {
// for (i, file) in files.iter().enumerate() {
// println!("Uploading: {:?}", file.file);
// let stream = ByteStream::read_from()
// .path(file.file.clone())
// .offset(0)
// .length(Length::Exact(file_size))
// .build()
// .await
// .unwrap();
// let key =
// check_upload_res.prefix.to_owned() + check_upload_res.file[i].path.as_str();
// let _upload_res = s3_client
// .put_object()
// .bucket(check_upload_res.bucket.clone())
// .key(key)
// .body(stream)
// .send()
// .await
// .unwrap();
// }
// }
// }
println!("Checking for upload complete...");
match check_job(&check_upload_res.upload_id, &self.client).await {
Ok(_) => Ok(()),
Err(e) => {
println!("Error: {:?}", e);
Err(anyhow::anyhow!("Error: {:?}", e))
}
}
}
pub async fn upload_from_path(
&self,
file: TargetFile,
prefix: Option<&str>,
fake_size: Option<u64>,
pb: Option<indicatif::ProgressBar>,
) -> anyhow::Result<()> {
let prefix = if prefix.unwrap_or("") == "/" {
""
} else {
prefix.unwrap_or("")
};
let req = types::request::CheckUploadRequest {
host_id: self.client.host_id.clone(),
path: prefix.to_string(),
file: vec![types::request::CheckUploadRequestFile {
path: file.file.to_str().unwrap().to_string(),
size: fake_size.unwrap_or(file.file.metadata().unwrap().len() as u64) as i64,
hash: None,
host_id: None,
version_id: None,
}],
};
println!("upload from path");
// println!("prefix: {:?}", prefix.unwrap_or(""));
let check_upload_res = self.client.check_upload(req).await.unwrap();
let token_res = self.client.get_upload_token().await.unwrap();
let cledential = Credentials::new(
token_res.access_key_id.clone(),
token_res.secret_access_key.clone(),
Some(token_res.session_token.clone()),
None,
"2021-06-01",
);
let _config = aws_sdk_s3::Config::builder()
.behavior_version_latest()
.region(Region::new(check_upload_res.region.clone()))
.credentials_provider(cledential)
.force_path_style(true)
.build();
let token_res = token_res.clone();
let check_upload_res = check_upload_res.clone();
let pb = pb.clone();
multipart_upload_from_path(
&token_res, &token_res,
&check_upload_res.bucket, &check_upload_res.bucket,
&check_upload_res.file[0], &check_upload_res.file[0],
&check_upload_res.prefix, &check_upload_res.prefix,
&check_upload_res.region, &check_upload_res.region,
&check_upload_res.upload_id, &check_upload_res.upload_id,
file_data, file,
pb, pb.clone(),
) )
.await .await
.unwrap(); .unwrap();
// multipart_upload(
// &token_res,
// &check_upload_res.bucket,
// &check_upload_res.file[0],
// &check_upload_res.prefix,
// &check_upload_res.region,
// &check_upload_res.upload_id,
// file_data,
// pb,
// )
// .await
// .unwrap();
// if file_size > CHUNK_SIZE as u64 { // if file_size > CHUNK_SIZE as u64 {
// for (i, file) in files.iter().enumerate() { // for (i, file) in files.iter().enumerate() {
// println!("Multi Uploading: {:?}", file.file); // println!("Multi Uploading: {:?}", file.file);
// } // }
// } else { // } else {
// for (i, file) in files.iter().enumerate() { // for (i, file) in files.iter().enumerate() {
// println!("Uploading: {:?}", file.file); // println!("Uploading: {:?}", file.file);
// let stream = ByteStream::read_from() // let stream = ByteStream::read_from()
// .path(file.file.clone()) // .path(file.file.clone())
// .offset(0) // .offset(0)
// .length(Length::Exact(file_size)) // .length(Length::Exact(file_size))
// .build() // .build()
// .await // .await
// .unwrap(); // .unwrap();
// let key = // let key =
// check_upload_res.prefix.to_owned() + check_upload_res.file[i].path.as_str(); // check_upload_res.prefix.to_owned() + check_upload_res.file[i].path.as_str();
// let _upload_res = s3_client // let _upload_res = s3_client
// .put_object() // .put_object()
// .bucket(check_upload_res.bucket.clone()) // .bucket(check_upload_res.bucket.clone())
// .key(key) // .key(key)
// .body(stream) // .body(stream)
// .send() // .send()
// .await // .await
// .unwrap(); // .unwrap();
// } // }
// }
// } // }
// }
println!("Checking for upload complete...");
match check_job(&check_upload_res.upload_id, &self.client).await { match check_job(&check_upload_res.upload_id, &self.client).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
@ -166,9 +366,15 @@ impl RakutenDriveClient {
pub async fn mkdir(&self, name: &str, path: Option<&str>) -> anyhow::Result<()> { pub async fn mkdir(&self, name: &str, path: Option<&str>) -> anyhow::Result<()> {
if name.contains('/') { if name.contains('/') {
println!("Please use --path option for set parent directory"); // println!("Please use --path option for set parent directory");
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"Please use --path option for set parent directory" "Can not use / in folder name. Use --path option for set parent directory."
));
}
if name.len() > 255 {
println!("Folder name should be less than 255 characters.");
return Err(anyhow::anyhow!(
"Folder name should be less than 255 characters."
)); ));
} }
let req = types::request::CreateFolderRequest { let req = types::request::CreateFolderRequest {
@ -190,11 +396,11 @@ impl RakutenDriveClient {
} }
pub async fn rename(&self, path: &str, name: &str) -> anyhow::Result<()> { pub async fn rename(&self, path: &str, name: &str) -> anyhow::Result<()> {
if name.contains('/') { // if name.contains('/') {
println!("Can't use / in file name"); // println!("Can't use / in file name");
println!("Name should be file name only."); // println!("Name should be file name only.");
return Err(anyhow::anyhow!("Can't use / in file name")); // return Err(anyhow::anyhow!("Can't use / in file name"));
} // }
let file_path = let file_path =
path.split('/').collect::<Vec<&str>>()[0..path.split('/').count() - 1].join("/") + "/"; path.split('/').collect::<Vec<&str>>()[0..path.split('/').count() - 1].join("/") + "/";

View File

@ -1,5 +1,6 @@
use std::{ use std::{
cmp::{max, min}, cmp::{max, min},
collections::{HashMap, HashSet},
io::{stdout, Write}, io::{stdout, Write},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
@ -15,7 +16,7 @@ use clap::{Parser, Subcommand};
use constants::REFRESH_TOKEN; use constants::REFRESH_TOKEN;
use human_bytes::human_bytes; use human_bytes::human_bytes;
use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use rakuten_drive_cui::RakutenDriveClient; use rakuten_drive_cui::{RakutenDriveClient, TargetFile, UploadFile};
use tokio::{ use tokio::{
fs::File, fs::File,
io::{AsyncReadExt, BufReader}, io::{AsyncReadExt, BufReader},
@ -59,6 +60,14 @@ enum Commands {
/// Send fake file size to server (byte) /// Send fake file size to server (byte)
#[clap(short, long)] #[clap(short, long)]
fake_size: Option<u64>, fake_size: Option<u64>,
/// Do not check file existence
#[clap(long)]
force: bool,
/// Stream Upload
#[clap(short, long)]
stream: bool,
}, },
#[clap(about = "Download file")] #[clap(about = "Download file")]
Download { Download {
@ -119,27 +128,67 @@ enum Commands {
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let args = Args::parse(); let args = Args::parse();
let client = RakutenDriveClient::try_new(REFRESH_TOKEN.to_string()).await?; let client = Arc::new(RakutenDriveClient::try_new(REFRESH_TOKEN.to_string()).await?);
match &args.command { match args.command {
Commands::List { prefix } => { Commands::List { prefix } => {
client.list(prefix.as_deref()).await.unwrap(); let res = client.list(prefix.as_deref()).await.unwrap();
res.file.iter().for_each(|f| {
let dir_str = if f.is_folder { "d" } else { " " };
println!(
"{}\t{}\t{}\t{}",
dir_str,
human_bytes(f.size as f64),
f.last_modified,
f.path,
)
});
} }
Commands::Upload { Commands::Upload {
file, file,
prefix, prefix,
recursive, recursive,
fake_size, fake_size,
force,
stream,
} => { } => {
// is folder // is folder
if file.is_dir() && !*recursive { if file.is_dir() && !recursive {
println!("Use --recursive option for folder upload"); println!("Use --recursive option for folder upload");
return Err(anyhow::anyhow!("Use --recursive option for folder upload")); return Err(anyhow::anyhow!("Use --recursive option for folder upload"));
} }
if stream && recursive {
println!("Can't use Stream Upload and Recursive Upload both.");
return Err(anyhow::anyhow!(
"Can't use Stream Upload and Recursive Upload both."
));
}
if let Some(prefix) = prefix.as_ref() {
if !prefix.ends_with('/') {
println!("Prefix must end with /");
return Err(anyhow::anyhow!("Prefix must end with /"));
}
}
println!("is_dir: {}", file.is_dir());
if file.is_dir() {
println!("name: {:?}", file.file_name().unwrap().to_str().unwrap());
println!("prefix: {:?}", prefix.as_deref());
client
.mkdir(
file.file_name().unwrap().to_str().unwrap(),
prefix.as_deref(),
)
.await?;
}
let prefix = if file.is_dir() && prefix.is_none() {
Some(file.file_name().unwrap().to_str().unwrap().to_string() + "/")
} else {
prefix
};
let mut files = Vec::<TargetFile>::new(); let mut files = Vec::<TargetFile>::new();
if file.is_dir() && *recursive { if file.is_dir() && recursive {
// upload folder // upload folder
let mut dirs = Vec::<PathBuf>::new(); let mut dirs = Vec::<PathBuf>::new();
dirs.push(file.clone()); dirs.push(file.clone());
@ -153,12 +202,12 @@ async fn main() -> anyhow::Result<()> {
} else { } else {
files.push(TargetFile { files.push(TargetFile {
file: path.clone(), file: path.clone(),
path: path path: (prefix.clone().unwrap_or("".to_string())
.strip_prefix(file) + path
.unwrap() .strip_prefix(&file)
.to_str() .unwrap()
.expect("Invalid File Name") .to_str()
.to_string(), .expect("Invalid File Name")),
}); });
} }
} }
@ -185,19 +234,87 @@ async fn main() -> anyhow::Result<()> {
}); });
} }
for file in &files { if !force {
if client.info(file.path.as_str()).await.is_ok() { println!("Checking file existence...");
println!("File already exists."); for file in &files {
return Err(anyhow::anyhow!("File already exists.")); println!("Checking: {:?}", file.path);
let res = client.info(file.path.as_str()).await;
if res.is_ok() {
println!("File already exists.");
return Err(anyhow::anyhow!("File already exists."));
} else {
println!("{:?}", res.err().unwrap());
}
} }
} }
for file in &files { // println!("{:#?}", files);
let mut dirs = files
.iter()
.map(|f| f.path.clone())
.map(|f| f.split('/').collect::<Vec<&str>>()[..f.split('/').count() - 1].join("/"))
.collect::<HashSet<String>>()
.into_iter()
.collect::<Vec<String>>();
dirs.sort();
println!("{:?}", dirs);
let mut done_set = Vec::new();
for dir in dirs {
let paths = dir.split('/');
println!("{}", dir);
for (i, path) in paths.clone().enumerate() {
let mut path_prefix = paths.clone().take(i).collect::<Vec<&str>>().join("/");
if path_prefix.is_empty() && path == file.file_name().unwrap().to_str().unwrap()
{
continue;
}
if !path_prefix.is_empty() {
path_prefix.push('/');
}
if done_set
.iter()
.any(|x| x == &(path.to_string(), path_prefix.clone()))
{
continue;
}
println!("path: {:?}", path);
println!("prefix: {:?}", path_prefix);
match client.mkdir(path, Some(&path_prefix)).await {
Ok(_) => {
done_set.push((path.to_string(), path_prefix));
}
Err(e) => {
println!("{:?}", e);
continue;
}
}
}
}
let small_files = files
.iter()
.filter(|f| f.file.metadata().unwrap().len() < 1024 * 1024 * 10 * 10)
.collect::<Vec<&TargetFile>>();
let large_files = files
.iter()
.filter(|f| f.file.metadata().unwrap().len() >= 1024 * 1024 * 10 * 10)
.collect::<Vec<&TargetFile>>();
println!("small: {}", small_files.len());
println!("large: {}", large_files.len());
for file in large_files {
// wait 50ms
// tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let file_size = file.file.metadata().unwrap().len(); let file_size = file.file.metadata().unwrap().len();
let file_data = File::open(file.file.clone()).await?; let file_data = File::open(file.file.clone()).await.unwrap();
let mut file_reader = tokio::io::BufReader::new(file_data);
let mut file_data: Vec<u8> = Vec::with_capacity(file_size as usize);
file_reader.read_to_end(&mut file_data).await?;
let pb = ProgressBar::new(file_size); let pb = ProgressBar::new(file_size);
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
@ -205,12 +322,102 @@ async fn main() -> anyhow::Result<()> {
.with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-")); .progress_chars("#>-"));
let file_name = file.file.file_name().unwrap().to_str().unwrap();
let file_dir = file.path.split('/').collect::<Vec<&str>>()
[..file.path.split('/').count() - 1]
.join("/")
+ "/";
// println!("file.path: {:?}", file_name);
// println!("prefix: {:?}", file_dir);
if stream {
println!("is stream true");
client
.upload_from_path(file.clone(), Some(&file_dir), fake_size, Some(pb))
.await
.unwrap();
} else {
let mut file_reader = tokio::io::BufReader::new(file_data);
let mut file_data: Vec<u8> = Vec::with_capacity(file_size as usize);
file_reader.read_to_end(&mut file_data).await.unwrap();
client
.upload(
vec![UploadFile {
data: file_data,
path: file_name.to_string(),
}],
Some(&file_dir),
fake_size,
Some(pb),
)
.await
.unwrap();
}
}
let mut a = HashMap::<String, Vec<TargetFile>>::new();
for file in small_files {
// per parent path
let file_size = file.file.metadata().unwrap().len();
let file_data = File::open(file.file.clone()).await.unwrap();
let mut file_reader = tokio::io::BufReader::new(file_data);
let mut file_data: Vec<u8> = Vec::with_capacity(file_size as usize);
file_reader.read_to_end(&mut file_data).await.unwrap();
let file_name = file.file.file_name().unwrap().to_str().unwrap();
let file_dir = file.path.split('/').collect::<Vec<&str>>()
[..file.path.split('/').count() - 1]
.join("/")
+ "/";
a.entry(file_dir.clone()).or_default().push(file.clone());
}
let b = a.into_values().collect::<Vec<Vec<TargetFile>>>();
for dir_group in b {
let mut upload_files = Vec::new();
for file in &dir_group {
let file_size = file.file.metadata().unwrap().len();
let file_data = File::open(file.file.clone()).await.unwrap();
let mut file_reader = tokio::io::BufReader::new(file_data);
let mut file_data: Vec<u8> = Vec::with_capacity(file_size as usize);
file_reader.read_to_end(&mut file_data).await.unwrap();
let file_name = file.file.file_name().unwrap().to_str().unwrap();
let file_dir = file.path.split('/').collect::<Vec<&str>>()
[..file.path.split('/').count() - 1]
.join("/")
+ "/";
upload_files.push(UploadFile {
data: file_data,
path: file_name.to_string(),
});
}
println!("Uploading {} files", upload_files.len());
let pb = ProgressBar::new(upload_files.len() as u64);
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-"));
client client
.upload( .upload(
&file.path, upload_files,
&file_data, Some(
prefix.as_deref(), (dir_group[0].path.split('/').collect::<Vec<&str>>()
*fake_size, [..dir_group[0].path.split('/').count() - 1]
.join("/")
+ "/")
.as_str(),
),
fake_size,
Some(pb), Some(pb),
) )
.await .await
@ -224,22 +431,22 @@ async fn main() -> anyhow::Result<()> {
.unwrap(); .unwrap();
} }
Commands::Move { path, dest } => { Commands::Move { path, dest } => {
client.move_file(path, dest).await.unwrap(); client.move_file(&path, &dest).await.unwrap();
} }
Commands::Delete { path, recursive } => { Commands::Delete { path, recursive } => {
client.delete(path, recursive).await.unwrap(); client.delete(&path, &recursive).await.unwrap();
} }
Commands::Mkdir { name, path } => { Commands::Mkdir { name, path } => {
client.mkdir(name, path.as_deref()).await.unwrap(); client.mkdir(&name, path.as_deref()).await.unwrap();
} }
Commands::Copy { src, dest } => { Commands::Copy { src, dest } => {
client.copy(src, dest).await.unwrap(); client.copy(&src, &dest).await.unwrap();
} }
Commands::Rename { path, name } => { Commands::Rename { path, name } => {
client.rename(path, name).await.unwrap(); client.rename(&path, &name).await.unwrap();
} }
Commands::Info { path } => { Commands::Info { path } => {
client.info(path).await.unwrap(); client.info(&path).await.unwrap();
} }
Commands::Auth {} => { Commands::Auth {} => {
println!("Click the link below to authorize the app:\n"); println!("Click the link below to authorize the app:\n");

View File

@ -76,7 +76,7 @@ pub struct CheckUploadRequest {
pub file: Vec<CheckUploadRequestFile>, pub file: Vec<CheckUploadRequestFile>,
pub host_id: String, pub host_id: String,
pub path: String, pub path: String,
pub upload_id: String, // pub upload_id: String,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -84,6 +84,9 @@ pub struct CheckUploadRequest {
pub struct CheckUploadRequestFile { pub struct CheckUploadRequestFile {
pub path: String, pub path: String,
pub size: i64, pub size: i64,
pub hash: Option<bool>, // always null
pub version_id: Option<bool>, // always null
pub host_id: Option<bool>, // always null
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -100,6 +103,9 @@ pub struct CompleteUploadRequest {
pub struct CompleteUploadRequestFile { pub struct CompleteUploadRequestFile {
pub path: String, pub path: String,
pub size: i64, pub size: i64,
pub hash: Option<bool>, // always null
pub version_id: Option<bool>, // always null
pub host_id: Option<bool>, // always null
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -159,3 +165,4 @@ pub struct VerifyCustomTokenRequest {
pub return_secure_token: bool, pub return_secure_token: bool,
pub token: String, pub token: String,
} }

View File

@ -75,7 +75,7 @@ pub struct CheckActionResponse {
pub message: Option<String>, pub message: Option<String>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub struct CheckUploadResponse { pub struct CheckUploadResponse {
pub bucket: String, pub bucket: String,
@ -85,7 +85,7 @@ pub struct CheckUploadResponse {
pub upload_id: String, pub upload_id: String,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CheckUploadResponseFile { pub struct CheckUploadResponseFile {
pub last_modified: String, // 1970-01-20T22:07:12.804Z pub last_modified: String, // 1970-01-20T22:07:12.804Z
@ -94,7 +94,7 @@ pub struct CheckUploadResponseFile {
pub version_id: String, pub version_id: String,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "PascalCase")] #[serde(rename_all = "PascalCase")]
pub struct GetFileLinkTokenResponse { pub struct GetFileLinkTokenResponse {
pub access_key_id: String, pub access_key_id: String,

View File

@ -19,13 +19,64 @@ use tokio::{fs::File, io::BufReader, sync::Mutex};
use crate::{ use crate::{
client::{self}, client::{self},
types::response::ListFilesResponseFile, types::response::ListFilesResponseFile,
TargetFile,
}; };
use crate::{constants::CHUNK_SIZE, types}; use crate::{constants::CHUNK_SIZE, types};
#[derive(Debug, Clone)] pub async fn single_upload(
pub struct TargetFile { token_res: &types::response::GetFileLinkTokenResponse,
pub file: PathBuf, bucket: &str,
pub path: String, target_file: &types::response::CheckUploadResponseFile,
prefix: &str,
region: &str,
upload_id: &str,
file: &[u8],
) -> anyhow::Result<()> {
let _ = upload_id;
// if !file.file.exists() {
// println!("File not found: {:?}", file.file);
// return Err(anyhow::anyhow!("File not found: {:?}", file.file));
// }
let file_size = file.len() as u64;
let cledential = Credentials::new(
&token_res.access_key_id,
&token_res.secret_access_key,
Some(token_res.session_token.clone()),
// 2024-07-18T07:14:42Z
Some(
chrono::DateTime::parse_from_rfc3339(&token_res.expiration)
.unwrap()
.into(),
),
"2021-06-01",
);
let config = aws_sdk_s3::Config::builder()
.behavior_version_latest()
.credentials_provider(cledential)
.region(Region::new(region.to_owned()))
// .endpoint_url("https://sendy-cloud.s3.ap-northeast-1.amazonaws.com")
.build();
let s3_client = aws_sdk_s3::Client::from_conf(config);
let key = prefix.to_owned() + target_file.path.as_str();
let stream = ByteStream::from(file.to_vec());
let _put_object_res = s3_client
.put_object()
.bucket(bucket)
.key(key)
.body(stream)
.send()
.await?;
// println!("Uploaded");
Ok(())
} }
pub async fn multipart_upload( pub async fn multipart_upload(
@ -185,6 +236,183 @@ pub async fn multipart_upload(
if let Some(pb) = pb { if let Some(pb) = pb {
pb.finish_with_message("Uploaded"); pb.finish_with_message("Uploaded");
} else {
println!("Uploaded");
}
Ok(())
}
pub async fn multipart_upload_from_path(
token_res: &types::response::GetFileLinkTokenResponse,
bucket: &str,
target_file: &types::response::CheckUploadResponseFile,
prefix: &str,
region: &str,
upload_id: &str,
file: TargetFile,
pb: Option<ProgressBar>,
) -> anyhow::Result<()> {
let _ = upload_id;
if !file.file.exists() {
println!("File not found: {:?}", file.file);
return Err(anyhow::anyhow!("File not found: {:?}", file.file));
}
let file_size = file.file.metadata().unwrap().len() as u64;
let cledential = Credentials::new(
&token_res.access_key_id,
&token_res.secret_access_key,
Some(token_res.session_token.clone()),
// 2024-07-18T07:14:42Z
Some(
chrono::DateTime::parse_from_rfc3339(&token_res.expiration)
.unwrap()
.into(),
),
"2021-06-01",
);
let config = aws_sdk_s3::Config::builder()
.behavior_version_latest()
.credentials_provider(cledential)
.region(Region::new(region.to_owned()))
// .endpoint_url("https://sendy-cloud.s3.ap-northeast-1.amazonaws.com")
.build();
let s3_client = aws_sdk_s3::Client::from_conf(config);
let key = prefix.to_owned() + target_file.path.as_str();
let multipart_upload_res = s3_client
.create_multipart_upload()
.bucket(bucket)
.key(key.clone())
.send()
.await
.unwrap();
let upload_id = multipart_upload_res.upload_id().unwrap().to_string();
let chunk_size = max(CHUNK_SIZE as u64, file_size / 10000);
let mut chunk_count = file_size / chunk_size;
let mut size_of_last_chunk = file_size % chunk_size;
if size_of_last_chunk == 0 {
size_of_last_chunk = chunk_size;
chunk_count -= 1;
}
let upload_parts = Arc::new(Mutex::new(Vec::<CompletedPart>::new()));
let semaphore = Arc::new(tokio::sync::Semaphore::new(20));
let mut handles = Vec::new();
for chunk_index in 0..chunk_count {
let bucket = bucket.to_owned();
let key = key.clone();
let upload_id = upload_id.clone();
let s3_client = s3_client.clone();
let pb = pb.clone();
let file = file.to_owned();
let upload_parts = upload_parts.clone();
let semaphore = semaphore.clone().acquire_owned().await.unwrap();
let handle = tokio::spawn(async move {
let _permit = semaphore;
let this_chunk = if chunk_count - 1 == chunk_index {
size_of_last_chunk
} else {
chunk_size
};
loop {
let offset = chunk_index * chunk_size;
let length = this_chunk;
let stream = ByteStream::read_from()
.path(file.file.clone())
.offset(chunk_index * chunk_size)
.length(Length::Exact(this_chunk))
.build()
.await;
let stream = match stream {
Ok(stream) => stream,
Err(e) => {
eprintln!("Error: {:?}", e);
continue;
}
};
// let stream = match stream {
// Ok(stream) => stream,
// Err(e) => {
// eprintln!("Error: {:?}", e);
// continue;
// }
// };
//Chunk index needs to start at 0, but part numbers start at 1.
let part_number = (chunk_index as i32) + 1;
let upload_part_res = s3_client
.upload_part()
.key(&key)
.bucket(bucket.clone())
.upload_id(upload_id.clone())
.body(stream)
.part_number(part_number)
.send()
.await;
let upload_part_res = match upload_part_res {
Ok(upload_part_res) => upload_part_res,
Err(e) => {
eprintln!("Error: {:?}", e);
continue;
}
};
upload_parts.lock().await.push(
CompletedPart::builder()
.e_tag(upload_part_res.e_tag.unwrap_or_default())
.part_number(part_number)
.build(),
);
if let Some(pb) = &pb {
pb.inc(this_chunk);
}
break;
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
upload_parts
.lock()
.await
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(upload_parts.lock().await.clone()))
.build();
let _complete_multipart_upload_res = s3_client
.complete_multipart_upload()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.multipart_upload(completed_multipart_upload)
.send()
.await
.unwrap();
if let Some(pb) = pb {
pb.finish_with_message("Uploaded");
} else {
println!("Uploaded");
} }
Ok(()) Ok(())
@ -218,6 +446,7 @@ pub async fn list_files(
thumbnail_size: 130, thumbnail_size: 130,
to: pagination_size, to: pagination_size,
}; };
println!("{:#?}", req);
let mut res = client.list_files(req).await?; let mut res = client.list_files(req).await?;
files.append(&mut res.file); files.append(&mut res.file);
@ -267,6 +496,6 @@ pub async fn check_job(key: &str, client: &client::Client) -> anyhow::Result<()>
return Err(anyhow::anyhow!("Error: {:?}", res)); return Err(anyhow::anyhow!("Error: {:?}", res));
} }
std::thread::sleep(std::time::Duration::from_millis(200)); // std::thread::sleep(std::time::Duration::from_millis(100));
} }
} }