diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..689dca4 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,311 @@ +use anyhow::Ok; +use reqwest::StatusCode; + +use crate::types; + +pub struct Client { + pub token: String, + pub refresh_token: String, + pub last_refresh: std::time::Instant, + pub host_id: String, + token_valid_time: u32, +} + +impl Client { + pub async fn try_new(refresh_token_str: String) -> anyhow::Result { + let refresh_token_req = types::request::RefreshTokenRequest { + refresh_token: refresh_token_str, + }; + let token = refresh_token(refresh_token_req).await.unwrap(); + Ok(Self { + refresh_token: token.refresh_token, + token: token.id_token, + last_refresh: std::time::Instant::now(), + host_id: token.uid, + token_valid_time: 3600, + }) + } + pub async fn refresh_token(&mut self) -> anyhow::Result<()> { + let refresh_token_req = types::request::RefreshTokenRequest { + refresh_token: self.refresh_token.clone(), + }; + let token = refresh_token(refresh_token_req).await.unwrap(); + self.token = token.id_token; + self.last_refresh = std::time::Instant::now(); + Ok(()) + } + pub async fn list_files( + &mut self, + req: types::request::ListFilesRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .post("https://forest.sendy.jp/cloud/service/file/v1/files") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn check_upload( + &mut self, + req: types::request::CheckUploadRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .post("https://forest.sendy.jp/cloud/service/file/v1/check/upload") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn get_upload_token(&mut self) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .get(format!( + "https://forest.sendy.jp/cloud/service/file/v1/filelink/token?host_id={}&path={}", + self.host_id, "hello" + )) + .bearer_auth(&self.token); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn get_download_link( + &mut self, + req: types::request::GetFileLinkRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .post("https://forest.sendy.jp/cloud/service/file/v1/filelink/download") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn check_action( + &mut self, + req: types::request::CheckActionRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .post("https://forest.sendy.jp/cloud/service/file/v3/files/check") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn file_detail( + &mut self, + req: types::request::FileDetailRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .post("https://forest.sendy.jp/cloud/service/file/v1/file") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn delete_file( + &mut self, + req: types::request::DeleteFileRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .delete("https://forest.sendy.jp/cloud/service/file/v3/files") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn mkdir(&mut self, req: types::request::CreateFolderRequest) -> anyhow::Result<()> { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .post("https://forest.sendy.jp/cloud/service/file/v1/files/create") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + + if response.status() == StatusCode::NO_CONTENT { + Ok(()) + } else { + let text = response.text().await?; + Err(anyhow::anyhow!("Failed to create folder: {}", text)) + } + } + + pub async fn copy_file( + &mut self, + req: types::request::CopyFileRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .post("https://forest.sendy.jp/cloud/service/file/v3/files/copy") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn rename_file( + &mut self, + req: types::request::RenameFileRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .put("https://forest.sendy.jp/cloud/service/file/v3/files/rename") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } + + pub async fn move_file( + &mut self, + req: types::request::MoveFileRequest, + ) -> anyhow::Result { + if self.last_refresh.elapsed().as_secs() > self.token_valid_time.into() { + self.refresh_token().await?; + } + let client = reqwest::Client::new(); + let request = client + .put("https://forest.sendy.jp/cloud/service/file/v3/files/move") + .bearer_auth(&self.token) + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + match serde_json::from_str(&text) { + std::result::Result::Ok(json) => Ok(json), + Err(e) => Err(anyhow::Error::new(e).context(text.trim().to_string())), + } + } +} + +pub async fn refresh_token( + req: types::request::RefreshTokenRequest, +) -> anyhow::Result { + let client = reqwest::Client::new(); + let request = client + .post("https://www.rakuten-drive.com/api/account/refreshtoken") + .json(&req); + + let response = request.send().await?; + let text = response.text().await?; + + // + + let json: types::response::RefreshTokenResponse = serde_json::from_str(&text)?; + Ok(json) + + // response + // .json::() + // .await + // .map_err(Into::into) +} + +// https://www.rakuten-drive.com/api/account/refreshtoken POST RefreshTokenRequest RefreshTokenResponse +// https://forest.sendy.jp/cloud/service/file/v1/file POST FileDetailRequest FileDetailResponse +// https://forest.sendy.jp/cloud/service/file/v1/files POST ListFilesRequest ListFilesResponse +// https://forest.sendy.jp/cloud/service/file/v3/files DELETE DeleteFileRequest JobKeyResponse +// https://forest.sendy.jp/cloud/service/file/v1/files/create POST CreateFolderRequest +// https://forest.sendy.jp/cloud/service/file/v3/files/rename PUT RenameFileRequest RenameFileResponse +// https://forest.sendy.jp/cloud/service/file/v3/files/check POST CheckActionRequest CheckActionResponse +// https://forest.sendy.jp/cloud/service/file/v3/files/move PUT MoveFileRequest MoveFileResponse +// https://forest.sendy.jp/cloud/service/file/v1/check/upload POST CheckUploadRequest CheckUploadResponse +// https://forest.sendy.jp/cloud/service/file/v1/filelink/token?host_id=GclT7DrnLFho7vnIirUzjtMLhRk2&path=hello GET GetFileLinkTokenResponse +// https://forest.sendy.jp/cloud/service/file/v1/complete/upload POST CompleteUploadRequest +// https://forest.sendy.jp/cloud/service/file/v1/filelink/download POST GetFileLinkRequest GetFileLinkResponse +// https://forest.sendy.jp/cloud/service/file/v3/files/copy POST CopyFileRequest JobKeyResponse diff --git a/src/commands.rs b/src/commands.rs new file mode 100644 index 0000000..4a684d5 --- /dev/null +++ b/src/commands.rs @@ -0,0 +1,363 @@ +use std::{ + cmp::{max, min}, + io::{stdout, Write}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use aws_config::{environment::region, BehaviorVersion, Region, SdkConfig}; +use aws_sdk_s3::{ + config::Credentials, operation::upload_part, primitives::ByteStream, types::CompletedPart, +}; +use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; +use aws_smithy_types::byte_stream::Length; +use clap::{Parser, Subcommand}; +use human_bytes::human_bytes; +use indicatif::{ProgressBar, ProgressState, ProgressStyle}; +use tokio::{fs::File, io::BufReader, sync::Mutex}; + +use crate::{ + client, constants::APP_VERSION, list_files, types, util::{check_job, file_detail, multipart_upload} +}; + +#[derive(Debug, Clone)] +pub struct TargetFile { + pub file: PathBuf, + pub path: String, +} + +pub async fn list(prefix: &Option, client: &mut client::Client) -> anyhow::Result<()> { + let res = list_files(Some(&prefix.clone().unwrap_or("".to_string())), client) + .await + .unwrap(); + res.file.iter().for_each(|f| { + let permission_string = if f.is_folder { "d" } else { "-" }; + println!( + "{}\t{}\t{}\t{}", + permission_string, + human_bytes(f.size as u32), + f.last_modified, + f.path + ); + }); + Ok(()) +} + +pub async fn upload( + file: &PathBuf, + prefix: &Option, + recursive: &bool, + fake_size: &Option, + client: &mut client::Client, +) -> anyhow::Result<()> { + // is folder + if file.is_dir() && !*recursive { + println!("Use --recursive option for folder upload"); + return Err(anyhow::anyhow!("Use --recursive option for folder upload")); + } + + let mut files = Vec::::new(); + + if file.is_dir() && *recursive { + // upload folder + let mut dirs = Vec::::new(); + dirs.push(file.clone()); + while let Some(dir) = dirs.pop() { + let entries = std::fs::read_dir(dir).unwrap(); + for entry in entries { + let entry = entry.unwrap(); + let path = entry.path(); + if path.is_dir() { + dirs.push(path); + } else { + files.push(TargetFile { + file: path.clone(), + path: path + .strip_prefix(file) + .unwrap() + .to_str() + .expect("Invalid File Name") + .to_string(), + }); + } + } + } + // for file in files { + // println!("{:?}", file); + // } + } else { + // file check + if !file.exists() { + println!("File not found: {:?}", file); + return Err(anyhow::anyhow!("File not found: {:?}", file)); + } + files.push(TargetFile { + file: file.clone(), + path: file.file_name().unwrap().to_str().unwrap().to_string(), + }); + } + + if cfg!(windows) { + // replase \ to / + files.iter_mut().for_each(|f| { + f.path = f.path.replace('\\', "/"); + }); + } + + let req = types::request::CheckUploadRequest { + host_id: client.host_id.clone(), + path: prefix.clone().unwrap_or("".to_string()), + upload_id: "".to_string(), + file: files + .iter() + .map(|f| types::request::CheckUploadRequestFile { + path: f.path.clone(), + size: fake_size.unwrap_or(f.file.metadata().unwrap().len()) as i64, + }) + .collect(), + }; + + let check_upload_res = client.check_upload(req).await.unwrap(); + + // println!("{:#?}", check_upload_res); + + let token_res = client.get_upload_token().await.unwrap(); + + // println!("{:#?}", token_res); + + let cledential = Credentials::new( + token_res.access_key_id.clone(), + token_res.secret_access_key.clone(), + Some(token_res.session_token.clone()), + None, + "2021-06-01", + ); + let _config = aws_sdk_s3::Config::builder() + .behavior_version_latest() + .region(Region::new(check_upload_res.region.clone())) + .credentials_provider(cledential) + .force_path_style(true) + .build(); + + // if file_size > CHUNK_SIZE as u64 { + for (i, file) in files.iter().enumerate() { + println!("Multi Uploading: {:?}", file.file); + + multipart_upload( + &token_res, + &check_upload_res.bucket, + &check_upload_res.file[i], + &check_upload_res.prefix, + &check_upload_res.region, + &check_upload_res.upload_id, + file.clone(), + ) + .await + .unwrap(); + // } + // } else { + // for (i, file) in files.iter().enumerate() { + // println!("Uploading: {:?}", file.file); + // let stream = ByteStream::read_from() + // .path(file.file.clone()) + // .offset(0) + // .length(Length::Exact(file_size)) + // .build() + // .await + // .unwrap(); + // let key = + // check_upload_res.prefix.to_owned() + check_upload_res.file[i].path.as_str(); + // let _upload_res = s3_client + // .put_object() + // .bucket(check_upload_res.bucket.clone()) + // .key(key) + // .body(stream) + // .send() + // .await + // .unwrap(); + // } + // } + } + + match check_job(&check_upload_res.upload_id, client).await { + Ok(_) => Ok(()), + Err(e) => { + println!("Error: {:?}", e); + return Err(anyhow::anyhow!("Error: {:?}", e)); + } + } +} + +pub async fn download( + path: &String, + prefix: &Option, + client: &mut client::Client, +) -> anyhow::Result<()> { + let _file_name = path.split('/').last().unwrap(); + let file_path = + path.split('/').collect::>()[0..path.split('/').count() - 1].join("/"); + + let list = list_files(Some(&file_path), client).await.unwrap(); + + let file = list + .file + .iter() + .find(|f| f.path == *path) + .expect("File not found"); + + let req = types::request::GetFileLinkRequest { + app_version: APP_VERSION.to_string(), + file: vec![types::request::GetFileLinkRequestFile { + path: path.to_string(), + size: file.size, + }], + host_id: client.host_id.clone(), + path: file_path, + }; + + let res = client.get_download_link(req).await.unwrap(); + + // run aria2c + + // TODO: Implement self implementation of multi connection download + let stdout = std::process::Command::new("aria2c") + .arg("-x16") + .arg("-s16") + .arg("-d") + .arg(".") + .arg(res.url) + .stdout(std::process::Stdio::piped()) + .spawn() + .expect("failed to execute process") + .stdout + .expect("failed to get stdout"); + + let reader = std::io::BufReader::new(stdout); + + std::io::BufRead::lines(reader).for_each(|line| println!("{}", line.unwrap())); + + Ok(()) +} + +pub async fn delete( + path: &String, + recursive: &bool, + client: &mut client::Client, +) -> anyhow::Result<()> { + let file = file_detail(path, client).await.unwrap(); + if file.is_folder && !*recursive { + println!("Use --recursive option for folder delete"); + return Err(anyhow::anyhow!("Use --recursive option for folder delete")); + } + let req = types::request::DeleteFileRequest { + file: vec![types::request::FileModifyRequestFile { + last_modified: file.last_modified, + path: file.path, + version_id: file.version_id, + size: file.size, + }], + host_id: client.host_id.clone(), + prefix: "".to_string(), + trash: true, + }; + let res = client.delete_file(req).await.unwrap(); + + match check_job(&res.key, client).await { + Ok(_) => { + println!("Deleted."); + Ok(()) + } + Err(e) => { + println!("Error: {:?}", e); + Err(anyhow::anyhow!("Error: {:?}", e)) + } + } +} + +pub async fn mkdir( + name: &String, + path: &Option, + client: &mut client::Client, +) -> anyhow::Result<()> { + if name.contains('/') { + println!("Please use --path option for set parent directory"); + return Err(anyhow::anyhow!( + "Please use --path option for set parent directory" + )); + } + let req = types::request::CreateFolderRequest { + host_id: client.host_id.clone(), + name: name.clone(), + path: path.clone().unwrap_or("".to_string()), + }; + + match client.mkdir(req).await { + Ok(_) => { + println!("Created: {:?}", name); + Ok(()) + } + Err(e) => { + println!("Error: {:?}", e); + Err(anyhow::anyhow!("Error: {:?}", e)) + } + } +} + +pub async fn rename( + path: &String, + name: &String, + client: &mut client::Client, +) -> anyhow::Result<()> { + if name.contains('/') { + println!("Can't use / in file name"); + println!("Name should be file name only."); + return Err(anyhow::anyhow!("Can't use / in file name")); + } + + let file_path = + path.split('/').collect::>()[0..path.split('/').count() - 1].join("/") + "/"; + + let list = list_files(Some(&file_path), client).await.unwrap(); + + let file = list + .file + .iter() + .find(|f| f.path == *path) + .expect("File not found"); + + let req = types::request::RenameFileRequest { + file: types::request::FileModifyRequestFile { + last_modified: file.last_modified.clone(), + path: file.path.clone(), + version_id: file.version_id.clone(), + size: file.size, + }, + host_id: client.host_id.clone(), + name: name.clone(), + prefix: file_path, + }; + + let res = client.rename_file(req).await.unwrap(); + + match check_job(&res.key, client).await { + Ok(_) => { + println!("Renamed."); + Ok(()) + } + Err(e) => { + println!("Error: {:?}", e); + Err(anyhow::anyhow!("Error: {:?}", e)) + } + } +} + +pub async fn info(path: &String, client: &mut client::Client) -> anyhow::Result<()> { + let req = types::request::FileDetailRequest { + host_id: client.host_id.clone(), + path: path.to_string(), + thumbnail_size: 130, + }; + let res = client.file_detail(req).await.unwrap(); + println!("{:#?}", res); + Ok(()) +} diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..d0bc1ce --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,3 @@ +pub const REFRESH_TOKEN: &str = "AMf-vBwuDNdrMsPlvESgMqZWGCdVlBxMvrQVNvHOWb-FdDRV0Ozeq26POxH2tzy463DGlZTZpPYYhWCSi-KI0-8pSYEXtuCG_8DlVRyqwm4POeobYWkrw3dMgiEDNjFCfXIN4-k65CsizmCFbWQz6ASsi-XGAwMn_mXyFj9JirB7vyzTTr2ugbA"; +pub const CHUNK_SIZE: usize = 1024 * 1024 * 10; // 10MB +pub const APP_VERSION: &str = "v21.11.10"; diff --git a/src/endpoints.rs b/src/endpoints.rs deleted file mode 100644 index 55dc0d5..0000000 --- a/src/endpoints.rs +++ /dev/null @@ -1,167 +0,0 @@ -use anyhow::Ok; - -use crate::types; - -pub struct Client { - pub token: String, - pub host_id: String, -} -impl Client { - pub fn new(token: String, host_id: String) -> Self { - Self { token, host_id } - } - pub async fn list_files( - &self, - req: types::request::ListFilesRequest, - ) -> anyhow::Result { - let client = reqwest::Client::new(); - let request = client - .post("https://forest.sendy.jp/cloud/service/file/v1/files") - .bearer_auth(&self.token) - .json(&req); - - let response = request.send().await?; - response - .json::() - .await - .map_err(Into::into) - } - - pub async fn check_upload( - &self, - req: types::request::CheckUploadRequest, - ) -> anyhow::Result { - let client = reqwest::Client::new(); - let request = client - .post("https://forest.sendy.jp/cloud/service/file/v1/check/upload") - .bearer_auth(&self.token) - .json(&req); - - let response = request.send().await?; - response - .json::() - .await - .map_err(Into::into) - } - - pub async fn get_upload_token( - &self, - ) -> anyhow::Result { - let client = reqwest::Client::new(); - let request = client - .get(&format!( - "https://forest.sendy.jp/cloud/service/file/v1/filelink/token?host_id={}&path={}", - self.host_id, "hello" - )) - .bearer_auth(&self.token); - - let response = request.send().await?; - response - .json::() - .await - .map_err(Into::into) - } - - pub async fn get_download_link( - &self, - req: types::request::GetFileLinkRequest, - ) -> anyhow::Result { - let client = reqwest::Client::new(); - let request = client - .post("https://forest.sendy.jp/cloud/service/file/v1/filelink/download") - .bearer_auth(&self.token) - .json(&req); - - let response = request.send().await?; - response - .json::() - .await - .map_err(Into::into) - } - - pub async fn check_action( - &self, - req: types::request::CheckActionRequest, - ) -> anyhow::Result { - let client = reqwest::Client::new(); - let request = client - .post("https://forest.sendy.jp/cloud/service/file/v3/files/check") - .bearer_auth(&self.token) - .json(&req); - - let response = request.send().await?; - response - .json::() - .await - .map_err(Into::into) - } - - pub async fn file_detail( - &self, - req: types::request::FileDetailRequest, - ) -> anyhow::Result { - let client = reqwest::Client::new(); - let request = client - .post("https://forest.sendy.jp/cloud/service/file/v1/file") - .bearer_auth(&self.token) - .json(&req); - - let response = request.send().await?; - response - .json::() - .await - .map_err(Into::into) - } - - pub async fn delete_file( - &self, - req: types::request::DeleteFileRequest, - ) -> anyhow::Result { - let client = reqwest::Client::new(); - let request = client - .delete("https://forest.sendy.jp/cloud/service/file/v3/files") - .bearer_auth(&self.token) - .json(&req); - - let response = request.send().await?; - response - .json::() - .await - .map_err(Into::into) - } -} - -pub async fn refresh_token( - req: types::request::RefreshTokenRequest, -) -> anyhow::Result { - let client = reqwest::Client::new(); - let request = client - .post("https://www.rakuten-drive.com/api/account/refreshtoken") - .json(&req); - - let response = request.send().await?; - let text = response.text().await?; - - // println!("{}", text); - - let json: types::response::RefreshTokenResponse = serde_json::from_str(&text)?; - Ok(json) - - // response - // .json::() - // .await - // .map_err(Into::into) -} - -// https://www.rakuten-drive.com/api/account/refreshtoken POST RefreshTokenRequest RefreshTokenResponse -// https://forest.sendy.jp/cloud/service/file/v1/file POST FileDetailRequest FileDetailResponse -// https://forest.sendy.jp/cloud/service/file/v1/files POST ListFilesRequest ListFilesResponse -// https://forest.sendy.jp/cloud/service/file/v3/files DELETE DeleteFileRequest JobKeyResponse -// https://forest.sendy.jp/cloud/service/file/v1/files/create POST CreateFolderRequest -// https://forest.sendy.jp/cloud/service/file/v3/files/rename PUT RenameFileRequest RenameFileResponse -// https://forest.sendy.jp/cloud/service/file/v3/files/check POST CheckActionRequest CheckActionResponse -// https://forest.sendy.jp/cloud/service/file/v3/files/move PUT MoveFileRequest MoveFileResponse -// https://forest.sendy.jp/cloud/service/file/v1/check/upload POST CheckUploadRequest CheckUploadResponse -// https://forest.sendy.jp/cloud/service/file/v1/filelink/token?host_id=GclT7DrnLFho7vnIirUzjtMLhRk2&path=hello GET GetFileLinkTokenResponse -// https://forest.sendy.jp/cloud/service/file/v1/complete/upload POST CompleteUploadRequest -// https://forest.sendy.jp/cloud/service/file/v1/filelink/download POST GetFileLinkRequest GetFileLinkResponse diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a6dabff --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,40 @@ +use commands::list; +use util::list_files; + +mod client; +mod commands; +mod constants; +mod types; +mod util; + +pub struct RakutenDriveClient { + client: client::Client, +} + +impl RakutenDriveClient { + pub async fn try_new(refresh_token_str: String) -> anyhow::Result { + let client = client::Client::try_new(refresh_token_str).await?; + Ok(Self { client }) + } + pub async fn list( + &mut self, + prefix: &Option, + ) -> anyhow::Result { + list_files( + Some(&prefix.clone().unwrap_or("".to_string())), + &mut self.client, + ) + .await + } + pub async fn info( + &mut self, + path: &str, + ) -> anyhow::Result { + let req = types::request::FileDetailRequest { + host_id: self.client.host_id.clone(), + path: path.to_string(), + thumbnail_size: 130, + }; + self.client.file_detail(req).await + } +} diff --git a/src/main.rs b/src/main.rs index f120d2d..7d90958 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,19 +12,20 @@ use aws_sdk_s3::{ use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; use aws_smithy_types::byte_stream::Length; use clap::{Parser, Subcommand}; +use commands::{delete, download, info, list, mkdir, rename, upload}; +use constants::REFRESH_TOKEN; use human_bytes::human_bytes; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use tokio::{fs::File, io::BufReader, sync::Mutex}; use types::response::ListFilesResponseFile; +use util::*; -mod endpoints; +mod client; +mod commands; mod types; +mod util; +mod constants; -const BEARER_TOKEN: &str = "eyJhbGciOiJSUzI1NiIsImtpZCI6ImMxNTQwYWM3MWJiOTJhYTA2OTNjODI3MTkwYWNhYmU1YjA1NWNiZWMiLCJ0eXAiOiJKV1QifQ.eyJuYW1lIjoi5bm457-8IOW_l-adkSIsInBsYW4iOiJza2YiLCJpc3MiOiJodHRwczovL3NlY3VyZXRva2VuLmdvb2dsZS5jb20vc2VuZHktc2VydmljZSIsImF1ZCI6InNlbmR5LXNlcnZpY2UiLCJhdXRoX3RpbWUiOjE3MjEyMjYwMTUsInVzZXJfaWQiOiJHY2xUN0RybkxGaG83dm5JaXJVemp0TUxoUmsyIiwic3ViIjoiR2NsVDdEcm5MRmhvN3ZuSWlyVXpqdE1MaFJrMiIsImlhdCI6MTcyMTI2MzA4NCwiZXhwIjoxNzIxMjY2Njg0LCJlbWFpbCI6ImtvdXN1a2UxMTIzNjEyNEBnbWFpbC5jb20iLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsImZpcmViYXNlIjp7ImlkZW50aXRpZXMiOnsiZW1haWwiOlsia291c3VrZTExMjM2MTI0QGdtYWlsLmNvbSJdfSwic2lnbl9pbl9wcm92aWRlciI6ImN1c3RvbSJ9fQ.F7gbJ41DOBk6lmOEYzJTYOKPOn0xVleQm2ZQGKGc5rVHudIjhahfkc5av5LsooLA8SI_BZf70Tic6MUz5yOtmSVKDk67pYgJLPpDvWnVgfhcQz-MV4qOmZkvQRLsmsRlG5kcP0BhZSRfIt3DdMQ1FCcqrw6G0Kirvj7C5OJvPnwtqNjDgI9J1HFH71t_5Q7Mx2OHRYSjUM1jZR6bAngG-aNpJC9BpcF-1dgITIrvNGXkcWO1W0tQwwovQSMVq9on_bOq2arnvq8hj0BK7cu4yntBBNY2Mx_qhng7kNWoTFK4pd9p3GQc_kUacJ0PZIxE_63JiQuwiGJVuiSYjbu8iw"; -const REFRESH_TOKEN: &str = "AMf-vBwuDNdrMsPlvESgMqZWGCdVlBxMvrQVNvHOWb-FdDRV0Ozeq26POxH2tzy463DGlZTZpPYYhWCSi-KI0-8pSYEXtuCG_8DlVRyqwm4POeobYWkrw3dMgiEDNjFCfXIN4-k65CsizmCFbWQz6ASsi-XGAwMn_mXyFj9JirB7vyzTTr2ugbA"; -const HOST_ID: &str = "GclT7DrnLFho7vnIirUzjtMLhRk2"; -const CHUNK_SIZE: usize = 1024 * 1024 * 10; // 10MB -const APP_VERSION: &str = "v21.11.10"; #[derive(Parser, Debug)] #[command(version, about, long_about=None)] @@ -35,534 +36,100 @@ struct Args { #[derive(Subcommand, Debug)] enum Commands { + #[clap(about = "List files")] List { + /// Parent folder path #[clap(short, long)] prefix: Option, }, + #[clap(about = "Upload file")] Upload { file: PathBuf, + + /// Parent folder path #[clap(short, long)] prefix: Option, + + /// Upload folder recursively #[clap(short, long)] recursive: bool, + + /// Send fake file size to server (byte) #[clap(short, long)] fake_size: Option, }, + #[clap(about = "Download file")] Download { path: String, + + /// Parent folder path #[clap(long)] prefix: Option, }, + #[clap(about = "Move file")] Move {}, + #[clap(about = "Delete file")] Delete { path: String, + + /// Delete folder recursively #[clap(long)] recursive: bool, }, - MkDir {}, -} + #[clap(about = "Make directory")] + Mkdir { + name: String, -#[derive(Debug, Clone)] -struct TargetFile { - file: PathBuf, - path: String, + /// Path to create directory + #[clap(long)] + path: Option, + }, + #[clap(about = "Copy file")] + Copy { + /// Source file path + src: String, + + /// Destination file directory + dest: String, + }, + #[clap(about = "Rename file")] + Rename { + /// Target file path + path: String, + + /// New file name + name: String, + }, + #[clap(about = "Print file detail")] + Info { path: String }, } #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { let args = Args::parse(); - let refresh_token_req = types::request::RefreshTokenRequest { - refresh_token: REFRESH_TOKEN.to_string(), - }; - let token = endpoints::refresh_token(refresh_token_req).await.unwrap(); - // println!("{:?}", token); - let token = token.id_token; + let mut client = client::Client::try_new(REFRESH_TOKEN.to_string()) + .await + .unwrap(); match &args.command { - Commands::List { prefix } => { - let res = list_files(Some(&prefix.clone().unwrap_or("".to_string())), &token) - .await - .unwrap(); - res.file.iter().for_each(|f| { - let permission_string = if f.is_folder { "d" } else { "-" }; - println!( - "{} {}\t{}\t{}", - permission_string, - human_bytes(f.size as u32), - f.last_modified, - f.path - ); - }); - } + Commands::List { prefix } => list(prefix, &mut client).await, Commands::Upload { file, prefix, recursive, fake_size, - } => { - println!("{}", recursive); - - // is folder - if file.is_dir() && !*recursive { - println!("Use --recursive option for folder upload"); - return; - } - - let mut files = Vec::::new(); - - if file.is_dir() && *recursive { - // upload folder - let mut dirs = Vec::::new(); - dirs.push(file.clone()); - while let Some(dir) = dirs.pop() { - let entries = std::fs::read_dir(dir).unwrap(); - for entry in entries { - let entry = entry.unwrap(); - let path = entry.path(); - if path.is_dir() { - dirs.push(path); - } else { - files.push(TargetFile { - file: path.clone(), - path: path - .strip_prefix(file) - .unwrap() - .to_str() - .unwrap() - .to_string(), - }); - } - } - } - // for file in files { - // println!("{:?}", file); - // } - } else { - // file check - if !file.exists() { - println!("File not found: {:?}", file); - return; - } - files.push(TargetFile { - file: file.clone(), - path: file.file_name().unwrap().to_str().unwrap().to_string(), - }); - } - - if cfg!(windows) { - // replase \ to / - files.iter_mut().for_each(|f| { - f.path = f.path.replace('\\', "/"); - }); - } - - let client = endpoints::Client::new(token.to_string(), HOST_ID.to_string()); - - let req = types::request::CheckUploadRequest { - host_id: client.host_id.clone(), - path: prefix.clone().unwrap_or("".to_string()), - upload_id: "".to_string(), - file: files - .iter() - .map(|f| types::request::CheckUploadRequestFile { - path: f.path.clone(), - size: fake_size.unwrap_or(f.file.metadata().unwrap().len()) as i64, - }) - .collect(), - }; - - let check_upload_res = client.check_upload(req).await.unwrap(); - - // println!("{:#?}", check_upload_res); - - let token_res = client.get_upload_token().await.unwrap(); - - // println!("{:#?}", token_res); - - let cledential = Credentials::new( - token_res.access_key_id.clone(), - token_res.secret_access_key.clone(), - Some(token_res.session_token.clone()), - None, - "2021-06-01", - ); - let config = aws_sdk_s3::Config::builder() - .behavior_version_latest() - .region(Region::new(check_upload_res.region.clone())) - .credentials_provider(cledential) - .force_path_style(true) - .build(); - - let s3_client = aws_sdk_s3::Client::from_conf(config); - let file_size = file.metadata().unwrap().len(); - - // if file_size > CHUNK_SIZE as u64 { - for (i, file) in files.iter().enumerate() { - println!("Multi Uploading: {:?}", file.file); - - multipart_upload( - &token_res, - &check_upload_res.bucket, - &check_upload_res.file[i], - &check_upload_res.prefix, - &check_upload_res.region, - &check_upload_res.upload_id, - file.clone(), - ) - .await - .unwrap(); - // } - // } else { - // for (i, file) in files.iter().enumerate() { - // println!("Uploading: {:?}", file.file); - // let stream = ByteStream::read_from() - // .path(file.file.clone()) - // .offset(0) - // .length(Length::Exact(file_size)) - // .build() - // .await - // .unwrap(); - // let key = - // check_upload_res.prefix.to_owned() + check_upload_res.file[i].path.as_str(); - // let _upload_res = s3_client - // .put_object() - // .bucket(check_upload_res.bucket.clone()) - // .key(key) - // .body(stream) - // .send() - // .await - // .unwrap(); - // } - // } - } - - loop { - let req = types::request::CheckActionRequest { - key: check_upload_res.upload_id.clone(), - }; - let res = client.check_action(req).await.unwrap(); - - if res.state == "complete" { - break; - } - - std::thread::sleep(std::time::Duration::from_millis(200)); - } - - println!("Uploaded"); - } - Commands::Download { path, prefix } => { - let client = endpoints::Client::new(token.to_string(), HOST_ID.to_string()); - - let file_name = path.split('/').last().unwrap(); - let file_path = - path.split('/').collect::>()[0..path.split('/').count() - 1].join("/"); - - let list = list_files(Some(&file_path), &token).await.unwrap(); - - let file = list - .file - .iter() - .find(|f| f.path == *path) - .expect("File not found"); - - let req = types::request::GetFileLinkRequest { - app_version: APP_VERSION.to_string(), - file: vec![types::request::GetFileLinkRequestFile { - path: path.to_string(), - size: file.size, - }], - host_id: client.host_id.clone(), - path: file_path, - }; - - let res = client.get_download_link(req).await.unwrap(); - - // run aria2c - - let stdout = std::process::Command::new("aria2c") - .arg("-x16") - .arg("-s16") - .arg("-d") - .arg(".") - .arg(res.url) - .stdout(std::process::Stdio::piped()) - .spawn() - .expect("failed to execute process") - .stdout - .expect("failed to get stdout"); - - let reader = std::io::BufReader::new(stdout); - - std::io::BufRead::lines(reader).for_each(|line| println!("{}", line.unwrap())); - - println!("Download"); - } + } => upload(file, prefix, recursive, fake_size, &mut client).await, + Commands::Download { path, prefix } => download(path, prefix, &mut client).await, Commands::Move {} => { - println!("Move"); + todo!("Move"); } - Commands::Delete { path, recursive } => { - let client = endpoints::Client::new(token.to_string(), HOST_ID.to_string()); - let file = file_detail(path, &token).await.unwrap(); - if file.is_folder && !*recursive { - println!("Use --recursive option for folder delete"); - return; - } - let req = types::request::DeleteFileRequest { - file: vec![types::request::FileModifyRequestFile { - last_modified: file.last_modified, - path: file.path, - version_id: file.version_id, - size: file.size, - }], - host_id: client.host_id.clone(), - prefix: "".to_string(), - trash: true, - }; - let res = client.delete_file(req).await.unwrap(); - - loop { - let req = types::request::CheckActionRequest { - key: res.key.clone(), - }; - let res = client.check_action(req).await.unwrap(); - - if res.state == "complete" { - break; - } - - std::thread::sleep(std::time::Duration::from_millis(200)); - } - - println!("Deleted"); - } - Commands::MkDir {} => { - println!("MkDir"); + Commands::Delete { path, recursive } => delete(path, recursive, &mut client).await, + Commands::Mkdir { name, path } => mkdir(name, path, &mut client).await, + Commands::Copy { src: _, dest: _ } => { + todo!("Copy"); } + Commands::Rename { path, name } => rename(path, name, &mut client).await, + Commands::Info { path } => info(path, &mut client).await, } } - -async fn multipart_upload( - token_res: &types::response::GetFileLinkTokenResponse, - bucket: &str, - target_file: &types::response::CheckUploadResponseFile, - prefix: &str, - region: &str, - upload_id: &str, - file: TargetFile, -) -> anyhow::Result<()> { - if !file.file.exists() { - println!("File not found: {:?}", file.file); - return Err(anyhow::anyhow!("File not found: {:?}", file.file)); - } - - let file_size = file.file.metadata().unwrap().len(); - - let cledential = Credentials::new( - &token_res.access_key_id, - &token_res.secret_access_key, - Some(token_res.session_token.clone()), - // 2024-07-18T07:14:42Z - Some( - chrono::DateTime::parse_from_rfc3339(&token_res.expiration) - .unwrap() - .into(), - ), - "2021-06-01", - ); - - let config = aws_sdk_s3::Config::builder() - .behavior_version_latest() - .credentials_provider(cledential) - .region(Region::new(region.to_owned())) - // .endpoint_url("https://sendy-cloud.s3.ap-northeast-1.amazonaws.com") - .build(); - - let s3_client = aws_sdk_s3::Client::from_conf(config); - - let key = prefix.to_owned() + target_file.path.as_str(); - - let multipart_upload_res = s3_client - .create_multipart_upload() - .bucket(bucket) - .key(key.clone()) - .send() - .await - .unwrap(); - - let upload_id = multipart_upload_res.upload_id().unwrap().to_string(); - - let chunk_size = max(CHUNK_SIZE as u64, file_size / 10000); - - let mut chunk_count = file_size / chunk_size; - let mut size_of_last_chunk = file_size % chunk_size; - if size_of_last_chunk == 0 { - size_of_last_chunk = chunk_size; - chunk_count -= 1; - } - - let upload_parts = Arc::new(Mutex::new(Vec::::new())); - - let pb = ProgressBar::new(file_size); - pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") - .unwrap() - .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) - .progress_chars("#>-")); - - let semaphore = Arc::new(tokio::sync::Semaphore::new(20)); - let mut handles = Vec::new(); - - for chunk_index in 0..chunk_count { - let bucket = bucket.to_owned(); - let key = key.clone(); - let upload_id = upload_id.clone(); - let s3_client = s3_client.clone(); - let pb = pb.clone(); - let file = file.clone(); - let upload_parts = upload_parts.clone(); - - let semaphore = semaphore.clone().acquire_owned().await.unwrap(); - - let handle = tokio::spawn(async move { - let _permit = semaphore; - - let this_chunk = if chunk_count - 1 == chunk_index { - size_of_last_chunk - } else { - chunk_size - }; - loop { - let stream = ByteStream::read_from() - .path(file.file.clone()) - .offset(chunk_index * chunk_size) - .length(Length::Exact(this_chunk)) - .build() - .await; - let stream = match stream { - Ok(stream) => stream, - Err(e) => { - eprintln!("Error: {:?}", e); - continue; - } - }; - //Chunk index needs to start at 0, but part numbers start at 1. - let part_number = (chunk_index as i32) + 1; - let upload_part_res = s3_client - .upload_part() - .key(&key) - .bucket(bucket.clone()) - .upload_id(upload_id.clone()) - .body(stream) - .part_number(part_number) - .send() - .await; - let upload_part_res = match upload_part_res { - Ok(upload_part_res) => upload_part_res, - Err(e) => { - eprintln!("Error: {:?}", e); - continue; - } - }; - upload_parts.lock().await.push( - CompletedPart::builder() - .e_tag(upload_part_res.e_tag.unwrap_or_default()) - .part_number(part_number) - .build(), - ); - pb.inc(this_chunk); - break; - } - }); - handles.push(handle); - } - - for handle in handles { - handle.await.unwrap(); - } - - upload_parts - .lock() - .await - .sort_by(|a, b| a.part_number.cmp(&b.part_number)); - - let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder() - .set_parts(Some(upload_parts.lock().await.clone())) - .build(); - - let _complete_multipart_upload_res = s3_client - .complete_multipart_upload() - .bucket(bucket) - .key(key) - .upload_id(upload_id) - .multipart_upload(completed_multipart_upload) - .send() - .await - .unwrap(); - - pb.finish_with_message("Uploaded"); - - Ok(()) -} - -async fn file_detail( - path: &str, - token: &str, -) -> anyhow::Result { - let client = endpoints::Client::new(token.to_string(), HOST_ID.to_string()); - let req = types::request::FileDetailRequest { - host_id: client.host_id.clone(), - path: path.to_string(), - thumbnail_size: 130, - }; - let res = client.file_detail(req).await?; - Ok(res.file) -} - -async fn list_files( - prefix: Option<&str>, - token: &str, -) -> anyhow::Result { - let client = endpoints::Client::new(token.to_string(), HOST_ID.to_string()); - let pagination_size = 40; - let mut files = Vec::::new(); - let req = types::request::ListFilesRequest { - from: 0, - host_id: client.host_id.clone(), - path: prefix.clone().unwrap_or("").to_string(), - sort_type: "path".to_string(), - reverse: false, - thumbnail_size: 130, - to: pagination_size, - }; - let mut res = client.list_files(req).await?; - - files.append(&mut res.file); - - if !res.last_page { - let mut cursor = res.file.len() as i64; - loop { - let req = types::request::ListFilesRequest { - from: cursor, - host_id: client.host_id.clone(), - path: prefix.clone().unwrap_or("").to_string(), - sort_type: "path".to_string(), - reverse: false, - thumbnail_size: 130, - to: pagination_size + cursor, - }; - - let mut next_res = client.list_files(req).await?; - files.append(&mut next_res.file); - - if next_res.last_page { - break; - } else { - cursor += next_res.file.len() as i64; - } - } - } - res.file = files; - - Ok(res) -} diff --git a/src/types/request.rs b/src/types/request.rs index 1898595..b4bd213 100644 --- a/src/types/request.rs +++ b/src/types/request.rs @@ -13,11 +13,21 @@ pub struct ListFilesRequest { pub host_id: String, pub path: String, pub reverse: bool, - pub sort_type: String, + pub sort_type: ListFilesRequestSortType, pub thumbnail_size: i64, pub to: i64, } +#[derive(Debug, Serialize, Deserialize)] +pub enum ListFilesRequestSortType { + #[serde(rename = "name")] + Path, + #[serde(rename = "modified")] + Modified, + #[serde(rename = "size")] + Size, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct CreateFolderRequest { @@ -29,10 +39,10 @@ pub struct CreateFolderRequest { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct RenameFileRequest { - pub file: Vec, + pub file: FileModifyRequestFile, pub host_id: String, pub name: String, - pub path: String, + pub prefix: String, } #[derive(Debug, Serialize, Deserialize)] @@ -123,4 +133,23 @@ pub struct FileDetailRequest { pub host_id: String, pub path: String, pub thumbnail_size: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CopyFileRequest { + pub file: Vec, + pub host_id: String, + pub prefix: String, + pub target_id: String, + pub to_path: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CopyFileRequestFile { + pub last_modified: String, // 1970-01-20T22:07:12.804Z + pub path: String, + pub size: i64, + pub version_id: String, } \ No newline at end of file diff --git a/src/types/response.rs b/src/types/response.rs index 72486a2..04bfe3e 100644 --- a/src/types/response.rs +++ b/src/types/response.rs @@ -25,7 +25,7 @@ pub struct RefreshTokenResponseMetadata { #[derive(Debug, Serialize, Deserialize)] pub struct RefreshTokenResponseCustomClaims { - pub plan: String, // skf = 50GB free + pub plan: String, // skf = 50GB free, sk3 = Unlimited } #[derive(Debug, Serialize, Deserialize)] @@ -72,6 +72,7 @@ pub struct CheckActionResponse { pub action: String, pub state: String, pub usage_size: Option, + pub message: Option, } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..9fb46db --- /dev/null +++ b/src/util.rs @@ -0,0 +1,268 @@ +use std::{ + cmp::{max, min}, + io::{stdout, Write}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use aws_config::{environment::region, BehaviorVersion, Region, SdkConfig}; +use aws_sdk_s3::{ + config::Credentials, operation::upload_part, primitives::ByteStream, types::CompletedPart, +}; +use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; +use aws_smithy_types::byte_stream::Length; +use clap::{Parser, Subcommand}; +use human_bytes::human_bytes; +use indicatif::{ProgressBar, ProgressState, ProgressStyle}; +use tokio::{fs::File, io::BufReader, sync::Mutex}; + +use crate::{constants::CHUNK_SIZE, types}; +use crate::{ + client::{self}, + commands::TargetFile, + types::response::ListFilesResponseFile, +}; + +pub async fn multipart_upload( + token_res: &types::response::GetFileLinkTokenResponse, + bucket: &str, + target_file: &types::response::CheckUploadResponseFile, + prefix: &str, + region: &str, + upload_id: &str, + file: TargetFile, +) -> anyhow::Result<()> { + let _ = upload_id; + if !file.file.exists() { + println!("File not found: {:?}", file.file); + return Err(anyhow::anyhow!("File not found: {:?}", file.file)); + } + + let file_size = file.file.metadata().unwrap().len(); + + let cledential = Credentials::new( + &token_res.access_key_id, + &token_res.secret_access_key, + Some(token_res.session_token.clone()), + // 2024-07-18T07:14:42Z + Some( + chrono::DateTime::parse_from_rfc3339(&token_res.expiration) + .unwrap() + .into(), + ), + "2021-06-01", + ); + + let config = aws_sdk_s3::Config::builder() + .behavior_version_latest() + .credentials_provider(cledential) + .region(Region::new(region.to_owned())) + // .endpoint_url("https://sendy-cloud.s3.ap-northeast-1.amazonaws.com") + .build(); + + let s3_client = aws_sdk_s3::Client::from_conf(config); + + let key = prefix.to_owned() + target_file.path.as_str(); + + let multipart_upload_res = s3_client + .create_multipart_upload() + .bucket(bucket) + .key(key.clone()) + .send() + .await + .unwrap(); + + let upload_id = multipart_upload_res.upload_id().unwrap().to_string(); + + let chunk_size = max(CHUNK_SIZE as u64, file_size / 10000); + + let mut chunk_count = file_size / chunk_size; + let mut size_of_last_chunk = file_size % chunk_size; + if size_of_last_chunk == 0 { + size_of_last_chunk = chunk_size; + chunk_count -= 1; + } + + let upload_parts = Arc::new(Mutex::new(Vec::::new())); + + let pb = ProgressBar::new(file_size); + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") + .unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("#>-")); + + let semaphore = Arc::new(tokio::sync::Semaphore::new(20)); + let mut handles = Vec::new(); + + for chunk_index in 0..chunk_count { + let bucket = bucket.to_owned(); + let key = key.clone(); + let upload_id = upload_id.clone(); + let s3_client = s3_client.clone(); + let pb = pb.clone(); + let file = file.clone(); + let upload_parts = upload_parts.clone(); + + let semaphore = semaphore.clone().acquire_owned().await.unwrap(); + + let handle = tokio::spawn(async move { + let _permit = semaphore; + + let this_chunk = if chunk_count - 1 == chunk_index { + size_of_last_chunk + } else { + chunk_size + }; + loop { + let stream = ByteStream::read_from() + .path(file.file.clone()) + .offset(chunk_index * chunk_size) + .length(Length::Exact(this_chunk)) + .build() + .await; + let stream = match stream { + Ok(stream) => stream, + Err(e) => { + eprintln!("Error: {:?}", e); + continue; + } + }; + //Chunk index needs to start at 0, but part numbers start at 1. + let part_number = (chunk_index as i32) + 1; + let upload_part_res = s3_client + .upload_part() + .key(&key) + .bucket(bucket.clone()) + .upload_id(upload_id.clone()) + .body(stream) + .part_number(part_number) + .send() + .await; + let upload_part_res = match upload_part_res { + Ok(upload_part_res) => upload_part_res, + Err(e) => { + eprintln!("Error: {:?}", e); + continue; + } + }; + upload_parts.lock().await.push( + CompletedPart::builder() + .e_tag(upload_part_res.e_tag.unwrap_or_default()) + .part_number(part_number) + .build(), + ); + pb.inc(this_chunk); + break; + } + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + upload_parts + .lock() + .await + .sort_by(|a, b| a.part_number.cmp(&b.part_number)); + + let completed_multipart_upload = aws_sdk_s3::types::CompletedMultipartUpload::builder() + .set_parts(Some(upload_parts.lock().await.clone())) + .build(); + + let _complete_multipart_upload_res = s3_client + .complete_multipart_upload() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .multipart_upload(completed_multipart_upload) + .send() + .await + .unwrap(); + + pb.finish_with_message("Uploaded"); + + Ok(()) +} + +pub async fn file_detail( + path: &str, + client: &mut client::Client, +) -> anyhow::Result { + let req = types::request::FileDetailRequest { + host_id: client.host_id.clone(), + path: path.to_string(), + thumbnail_size: 130, + }; + let res = client.file_detail(req).await?; + Ok(res.file) +} + +pub async fn list_files( + prefix: Option<&str>, + client: &mut client::Client, +) -> anyhow::Result { + let pagination_size = 40; + let mut files = Vec::::new(); + let req = types::request::ListFilesRequest { + from: 0, + host_id: client.host_id.clone(), + path: prefix.unwrap_or("").to_string(), + sort_type: types::request::ListFilesRequestSortType::Path, + reverse: false, + thumbnail_size: 130, + to: pagination_size, + }; + let mut res = client.list_files(req).await?; + + files.append(&mut res.file); + + if !res.last_page { + let mut cursor = res.file.len() as i64; + loop { + let req = types::request::ListFilesRequest { + from: cursor, + host_id: client.host_id.clone(), + path: prefix.unwrap_or("").to_string(), + sort_type: types::request::ListFilesRequestSortType::Path, + reverse: false, + thumbnail_size: 130, + to: pagination_size + cursor, + }; + + let mut next_res = client.list_files(req).await?; + files.append(&mut next_res.file); + + if next_res.last_page { + break; + } else { + cursor += next_res.file.len() as i64; + } + } + } + res.file = files; + + // files.iter().find(|f| f.path == "/").unwrap(); + + Ok(res) +} + +pub async fn check_job(key: &str, client: &mut client::Client) -> anyhow::Result<()> { + loop { + let req = types::request::CheckActionRequest { + key: key.to_string(), + }; + let res = client.check_action(req).await.unwrap(); + + if res.state == "complete" { + return Ok(()); + } + if res.state == "error" { + println!("Error: {:?}", res); + return Err(anyhow::anyhow!("Error: {:?}", res)); + } + + std::thread::sleep(std::time::Duration::from_millis(200)); + } +}