diff --git a/frontend/rust-lib/flowy-core/Cargo.toml b/frontend/rust-lib/flowy-core/Cargo.toml index 07dc153a4b03..09fd59d0715a 100644 --- a/frontend/rust-lib/flowy-core/Cargo.toml +++ b/frontend/rust-lib/flowy-core/Cargo.toml @@ -29,8 +29,8 @@ collab-entity = { workspace = true } collab-plugins = { workspace = true } collab-folder = { workspace = true } -#collab = { workspace = true } -collab = { workspace = true, features = ["verbose_log"] } +collab = { workspace = true } +#collab = { workspace = true, features = ["verbose_log"] } diesel.workspace = true uuid.workspace = true diff --git a/frontend/rust-lib/flowy-storage/src/manager.rs b/frontend/rust-lib/flowy-storage/src/manager.rs index 9fdbab1a2c71..34586fff9597 100644 --- a/frontend/rust-lib/flowy-storage/src/manager.rs +++ b/frontend/rust-lib/flowy-storage/src/manager.rs @@ -2,9 +2,9 @@ use crate::entities::FileStatePB; use crate::file_cache::FileTempStorage; use crate::notification::{make_notification, StorageNotification}; use crate::sqlite_sql::{ - batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part, - is_upload_completed, select_upload_file, select_upload_parts, update_upload_file_completed, - update_upload_file_upload_id, UploadFilePartTable, UploadFileTable, + batch_select_upload_file, delete_all_upload_parts, delete_upload_file, insert_upload_file, + insert_upload_part, is_upload_completed, select_upload_file, select_upload_parts, + update_upload_file_completed, update_upload_file_upload_id, UploadFilePartTable, UploadFileTable, }; use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue}; use allo_isolate::Isolate; @@ -64,7 +64,7 @@ impl StorageManager { "{}/cache_files", user_service.get_application_root_dir() )); - let (global_notifier, _) = broadcast::channel(1000); + let (global_notifier, _) = broadcast::channel(2000); let temp_storage = Arc::new(FileTempStorage::new(temp_storage_path)); let (notifier, notifier_rx) = watch::channel(Signal::Proceed); let task_queue = Arc::new(UploadTaskQueue::new(notifier)); @@ -205,7 +205,7 @@ async fn prepare_upload_task( retry_count: 0, }) .collect::>(); - info!("prepare upload task: {}", tasks.len()); + info!("[File] prepare upload task: {}", tasks.len()); uploader.queue_tasks(tasks).await; Ok(()) } @@ -359,7 +359,7 @@ impl StorageService for StorageServiceImpl { }, Err(err) => { if matches!(err.code, ErrorCode::DuplicateSqliteRecord) { - info!("upload record already exists, skip creating new upload task"); + info!("[File] upload record already exists, skip creating new upload task"); Ok::<_, FlowyError>((CreatedUpload { url, file_id }, None)) } else { Err(err) @@ -373,7 +373,7 @@ impl StorageService for StorageServiceImpl { FlowyError::internal().with_context("failed to downcast record to UploadFileTable") })?; - if let Err(err) = start_upload( + start_upload( &self.cloud_service, &self.user_service, &self.temp_storage, @@ -382,10 +382,8 @@ impl StorageService for StorageServiceImpl { self.progress_notifiers.clone(), self.global_notifier.clone(), ) - .await - { - error!("[File] start upload failed: {}", err); - } + .await?; + Ok(()) } @@ -496,6 +494,7 @@ async fn start_upload( .collect::>(); let upload_offset = completed_parts.len() as i32; + let total_parts = chunked_bytes.iter().count(); chunked_bytes.set_current_offset(upload_offset); info!( @@ -550,22 +549,29 @@ async fn start_upload( } // 3. start uploading parts - trace!( - "[File] {} start uploading parts: {}", + info!( + "[File] {} start uploading parts:{}, offset:{}", upload_file.file_id, - chunked_bytes.iter().count() + chunked_bytes.iter().count(), + upload_offset, ); - let total_parts = chunked_bytes.iter().count(); let iter = chunked_bytes.iter().enumerate(); - for (index, chunk_bytes) in iter { let part_number = upload_offset + index as i32 + 1; - trace!( + info!( "[File] {} uploading {}th part, size:{}KB", upload_file.file_id, part_number, chunk_bytes.len() / 1000, ); + + let file_url = cloud_service + .get_object_url_v1( + &upload_file.workspace_id, + &upload_file.parent_dir, + &upload_file.file_id, + ) + .await?; // start uploading parts match upload_part( cloud_service, @@ -582,19 +588,12 @@ async fn start_upload( Ok(resp) => { let progress = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0); trace!( - "[File] {} upload progress: {}", + "[File] {} upload progress:{}, etag: {}", upload_file.file_id, - progress + progress, + resp.e_tag ); - let file_url = cloud_service - .get_object_url_v1( - &upload_file.workspace_id, - &upload_file.parent_dir, - &upload_file.file_id, - ) - .await?; - if let Err(err) = global_notifier.send(FileProgress::new_progress(file_url, progress)) { error!("[File] send global notifier failed: {}", err); } @@ -618,7 +617,9 @@ async fn start_upload( .send(); } - error!("[File] {} upload part failed: {}", upload_file.file_id, err); + if let Err(err) = global_notifier.send(FileProgress::new_error(file_url, err.msg.clone())) { + error!("[File] send global notifier failed: {}", err); + } return Err(err); }, } @@ -641,9 +642,10 @@ async fn start_upload( .payload(err.clone()) .send(); } + + return Err(err); } - trace!("[File] {} upload completed", upload_file.file_id); Ok(()) } @@ -678,21 +680,18 @@ async fn resume_upload( ) .await?; }, - Err(err) => { - // - match err.kind() { - ErrorKind::NotFound => { - error!("[File] file not found: {}", upload_file.local_file_path); - if let Ok(uid) = user_service.user_id() { - if let Ok(conn) = user_service.sqlite_connection(uid) { - delete_upload_file(conn, &upload_file.upload_id)?; - } + Err(err) => match err.kind() { + ErrorKind::NotFound => { + error!("[File] file not found: {}", upload_file.local_file_path); + if let Ok(uid) = user_service.user_id() { + if let Ok(conn) = user_service.sqlite_connection(uid) { + delete_upload_file(conn, &upload_file.upload_id)?; } - }, - _ => { - error!("[File] read file failed: {}", err); - }, - } + } + }, + _ => { + error!("[File] read file failed: {}", err); + }, }, } Ok(()) @@ -749,6 +748,14 @@ async fn complete_upload( upload_file.file_id, parts.len() ); + let file_url = cloud_service + .get_object_url_v1( + &upload_file.workspace_id, + &upload_file.parent_dir, + &upload_file.file_id, + ) + .await?; + match cloud_service .complete_upload( &upload_file.workspace_id, @@ -769,19 +776,12 @@ async fn complete_upload( .await; } - let file_url = cloud_service - .get_object_url_v1( - &upload_file.workspace_id, - &upload_file.parent_dir, - &upload_file.file_id, - ) - .await?; - let progress = FileProgress::new_progress(file_url, 1.0); info!( "[File]: notify upload progress:{}, {}", upload_file.file_id, progress ); + if let Err(err) = global_notifier.send(progress) { error!("[File] send global notifier failed: {}", err); } @@ -796,7 +796,16 @@ async fn complete_upload( } }, Err(err) => { - error!("[File] complete upload failed: {}", err); + let progress = FileProgress::new_error(file_url, err.msg.clone()); + if let Err(send_err) = global_notifier.send(progress) { + error!("[File] send global notifier failed: {}", send_err); + } + + let conn = user_service.sqlite_connection(user_service.user_id()?)?; + if let Err(err) = delete_all_upload_parts(conn, &upload_file.upload_id) { + error!("[File] delete all upload parts failed: {}", err); + } + return Err(err); }, } Ok(()) diff --git a/frontend/rust-lib/flowy-storage/src/sqlite_sql.rs b/frontend/rust-lib/flowy-storage/src/sqlite_sql.rs index 52487e6de2e0..b92cd2e0eb9f 100644 --- a/frontend/rust-lib/flowy-storage/src/sqlite_sql.rs +++ b/frontend/rust-lib/flowy-storage/src/sqlite_sql.rs @@ -137,6 +137,14 @@ pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResul Ok(()) } +pub fn delete_all_upload_parts(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> { + diesel::delete( + upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)), + ) + .execute(&mut *conn)?; + Ok(()) +} + pub fn insert_upload_part( mut conn: DBConnection, upload_part: &UploadFilePartTable, diff --git a/frontend/rust-lib/flowy-storage/src/uploader.rs b/frontend/rust-lib/flowy-storage/src/uploader.rs index ab590e2d2e72..e4d609bbe496 100644 --- a/frontend/rust-lib/flowy-storage/src/uploader.rs +++ b/frontend/rust-lib/flowy-storage/src/uploader.rs @@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicBool, AtomicU8}; use std::sync::{Arc, Weak}; use std::time::Duration; use tokio::sync::{watch, RwLock}; -use tracing::{error, info, trace}; +use tracing::{error, info, instrument, trace, warn}; #[derive(Clone)] pub enum Signal { @@ -34,7 +34,7 @@ impl UploadTaskQueue { pub async fn queue_task(&self, task: UploadTask) { trace!("[File] Queued task: {}", task); self.tasks.write().await.push(task); - let _ = self.notifier.send(Signal::Proceed); + let _ = self.notifier.send_replace(Signal::Proceed); } } @@ -104,6 +104,7 @@ impl FileUploader { let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(3)); } + #[instrument(name = "[File]: process next", level = "debug", skip(self))] pub async fn process_next(&self) -> Option<()> { // Do not proceed if the uploader is paused. if self.pause_sync.load(std::sync::atomic::Ordering::Relaxed) { @@ -125,6 +126,7 @@ impl FileUploader { { // If the current uploads count is greater than or equal to the max uploads, do not proceed. let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(10)); + trace!("[File] max uploads reached, process_next after 10 seconds"); return None; } @@ -133,6 +135,7 @@ impl FileUploader { .load(std::sync::atomic::Ordering::SeqCst) { // If the storage limitation is enabled, do not proceed. + error!("[File] storage limit exceeded, uploader is disabled"); return None; } @@ -140,6 +143,7 @@ impl FileUploader { if task.retry_count() > 5 { // If the task has been retried more than 5 times, we should not retry it anymore. let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(2)); + warn!("[File] Task has been retried more than 5 times: {}", task); return None; } @@ -166,12 +170,12 @@ impl FileUploader { .await { if err.is_file_limit_exceeded() { - error!("Failed to upload file: {}", err); + error!("[File] Failed to upload file: {}", err); self.disable_storage_write(); } info!( - "Failed to upload file: {}, retry_count:{}", + "[File] Failed to upload file: {}, retry_count:{}", err, retry_count ); @@ -197,12 +201,12 @@ impl FileUploader { .await { if err.is_file_limit_exceeded() { - error!("Failed to upload file: {}", err); + error!("[File] failed to upload file: {}", err); self.disable_storage_write(); } info!( - "Failed to resume upload file: {}, retry_count:{}", + "[File] failed to resume upload file: {}, retry_count:{}", err, retry_count ); retry_count += 1; @@ -216,10 +220,15 @@ impl FileUploader { } }, } + self .current_uploads .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(2)); + trace!("[File] process_next after 2 seconds"); + self + .queue + .notifier + .send_replace(Signal::ProceedAfterSecs(2)); None } }