Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update logs #6735

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions frontend/rust-lib/flowy-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ collab-entity = { workspace = true }
collab-plugins = { workspace = true }
collab-folder = { workspace = true }

#collab = { workspace = true }
collab = { workspace = true, features = ["verbose_log"] }
collab = { workspace = true }
#collab = { workspace = true, features = ["verbose_log"] }

diesel.workspace = true
uuid.workspace = true
Expand Down
113 changes: 61 additions & 52 deletions frontend/rust-lib/flowy-storage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::entities::FileStatePB;
use crate::file_cache::FileTempStorage;
use crate::notification::{make_notification, StorageNotification};
use crate::sqlite_sql::{
batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part,
is_upload_completed, select_upload_file, select_upload_parts, update_upload_file_completed,
update_upload_file_upload_id, UploadFilePartTable, UploadFileTable,
batch_select_upload_file, delete_all_upload_parts, delete_upload_file, insert_upload_file,
insert_upload_part, is_upload_completed, select_upload_file, select_upload_parts,
update_upload_file_completed, update_upload_file_upload_id, UploadFilePartTable, UploadFileTable,
};
use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue};
use allo_isolate::Isolate;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl StorageManager {
"{}/cache_files",
user_service.get_application_root_dir()
));
let (global_notifier, _) = broadcast::channel(1000);
let (global_notifier, _) = broadcast::channel(2000);
let temp_storage = Arc::new(FileTempStorage::new(temp_storage_path));
let (notifier, notifier_rx) = watch::channel(Signal::Proceed);
let task_queue = Arc::new(UploadTaskQueue::new(notifier));
Expand Down Expand Up @@ -205,7 +205,7 @@ async fn prepare_upload_task(
retry_count: 0,
})
.collect::<Vec<_>>();
info!("prepare upload task: {}", tasks.len());
info!("[File] prepare upload task: {}", tasks.len());
uploader.queue_tasks(tasks).await;
Ok(())
}
Expand Down Expand Up @@ -359,7 +359,7 @@ impl StorageService for StorageServiceImpl {
},
Err(err) => {
if matches!(err.code, ErrorCode::DuplicateSqliteRecord) {
info!("upload record already exists, skip creating new upload task");
info!("[File] upload record already exists, skip creating new upload task");
Ok::<_, FlowyError>((CreatedUpload { url, file_id }, None))
} else {
Err(err)
Expand All @@ -373,7 +373,7 @@ impl StorageService for StorageServiceImpl {
FlowyError::internal().with_context("failed to downcast record to UploadFileTable")
})?;

if let Err(err) = start_upload(
start_upload(
&self.cloud_service,
&self.user_service,
&self.temp_storage,
Expand All @@ -382,10 +382,8 @@ impl StorageService for StorageServiceImpl {
self.progress_notifiers.clone(),
self.global_notifier.clone(),
)
.await
{
error!("[File] start upload failed: {}", err);
}
.await?;

Ok(())
}

Expand Down Expand Up @@ -496,6 +494,7 @@ async fn start_upload(
.collect::<Vec<_>>();

let upload_offset = completed_parts.len() as i32;
let total_parts = chunked_bytes.iter().count();
chunked_bytes.set_current_offset(upload_offset);

info!(
Expand Down Expand Up @@ -550,22 +549,29 @@ async fn start_upload(
}

// 3. start uploading parts
trace!(
"[File] {} start uploading parts: {}",
info!(
"[File] {} start uploading parts:{}, offset:{}",
upload_file.file_id,
chunked_bytes.iter().count()
chunked_bytes.iter().count(),
upload_offset,
);
let total_parts = chunked_bytes.iter().count();
let iter = chunked_bytes.iter().enumerate();

for (index, chunk_bytes) in iter {
let part_number = upload_offset + index as i32 + 1;
trace!(
info!(
"[File] {} uploading {}th part, size:{}KB",
upload_file.file_id,
part_number,
chunk_bytes.len() / 1000,
);

let file_url = cloud_service
.get_object_url_v1(
&upload_file.workspace_id,
&upload_file.parent_dir,
&upload_file.file_id,
)
.await?;
// start uploading parts
match upload_part(
cloud_service,
Expand All @@ -582,19 +588,12 @@ async fn start_upload(
Ok(resp) => {
let progress = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0);
trace!(
"[File] {} upload progress: {}",
"[File] {} upload progress:{}, etag: {}",
upload_file.file_id,
progress
progress,
resp.e_tag
);

let file_url = cloud_service
.get_object_url_v1(
&upload_file.workspace_id,
&upload_file.parent_dir,
&upload_file.file_id,
)
.await?;

if let Err(err) = global_notifier.send(FileProgress::new_progress(file_url, progress)) {
error!("[File] send global notifier failed: {}", err);
}
Expand All @@ -618,7 +617,9 @@ async fn start_upload(
.send();
}

error!("[File] {} upload part failed: {}", upload_file.file_id, err);
if let Err(err) = global_notifier.send(FileProgress::new_error(file_url, err.msg.clone())) {
error!("[File] send global notifier failed: {}", err);
}
return Err(err);
},
}
Expand All @@ -641,9 +642,10 @@ async fn start_upload(
.payload(err.clone())
.send();
}

return Err(err);
}

trace!("[File] {} upload completed", upload_file.file_id);
Ok(())
}

Expand Down Expand Up @@ -678,21 +680,18 @@ async fn resume_upload(
)
.await?;
},
Err(err) => {
//
match err.kind() {
ErrorKind::NotFound => {
error!("[File] file not found: {}", upload_file.local_file_path);
if let Ok(uid) = user_service.user_id() {
if let Ok(conn) = user_service.sqlite_connection(uid) {
delete_upload_file(conn, &upload_file.upload_id)?;
}
Err(err) => match err.kind() {
ErrorKind::NotFound => {
error!("[File] file not found: {}", upload_file.local_file_path);
if let Ok(uid) = user_service.user_id() {
if let Ok(conn) = user_service.sqlite_connection(uid) {
delete_upload_file(conn, &upload_file.upload_id)?;
}
},
_ => {
error!("[File] read file failed: {}", err);
},
}
}
},
_ => {
error!("[File] read file failed: {}", err);
},
},
}
Ok(())
Expand Down Expand Up @@ -749,6 +748,14 @@ async fn complete_upload(
upload_file.file_id,
parts.len()
);
let file_url = cloud_service
.get_object_url_v1(
&upload_file.workspace_id,
&upload_file.parent_dir,
&upload_file.file_id,
)
.await?;

match cloud_service
.complete_upload(
&upload_file.workspace_id,
Expand All @@ -769,19 +776,12 @@ async fn complete_upload(
.await;
}

let file_url = cloud_service
.get_object_url_v1(
&upload_file.workspace_id,
&upload_file.parent_dir,
&upload_file.file_id,
)
.await?;

let progress = FileProgress::new_progress(file_url, 1.0);
info!(
"[File]: notify upload progress:{}, {}",
upload_file.file_id, progress
);

if let Err(err) = global_notifier.send(progress) {
error!("[File] send global notifier failed: {}", err);
}
Expand All @@ -796,7 +796,16 @@ async fn complete_upload(
}
},
Err(err) => {
error!("[File] complete upload failed: {}", err);
let progress = FileProgress::new_error(file_url, err.msg.clone());
if let Err(send_err) = global_notifier.send(progress) {
error!("[File] send global notifier failed: {}", send_err);
}

let conn = user_service.sqlite_connection(user_service.user_id()?)?;
if let Err(err) = delete_all_upload_parts(conn, &upload_file.upload_id) {
error!("[File] delete all upload parts failed: {}", err);
}
return Err(err);
},
}
Ok(())
Expand Down
8 changes: 8 additions & 0 deletions frontend/rust-lib/flowy-storage/src/sqlite_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResul
Ok(())
}

pub fn delete_all_upload_parts(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
diesel::delete(
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
)
.execute(&mut *conn)?;
Ok(())
}

pub fn insert_upload_part(
mut conn: DBConnection,
upload_part: &UploadFilePartTable,
Expand Down
23 changes: 16 additions & 7 deletions frontend/rust-lib/flowy-storage/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::{watch, RwLock};
use tracing::{error, info, trace};
use tracing::{error, info, instrument, trace, warn};

#[derive(Clone)]
pub enum Signal {
Expand All @@ -34,7 +34,7 @@ impl UploadTaskQueue {
pub async fn queue_task(&self, task: UploadTask) {
trace!("[File] Queued task: {}", task);
self.tasks.write().await.push(task);
let _ = self.notifier.send(Signal::Proceed);
let _ = self.notifier.send_replace(Signal::Proceed);
}
}

Expand Down Expand Up @@ -104,6 +104,7 @@ impl FileUploader {
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(3));
}

#[instrument(name = "[File]: process next", level = "debug", skip(self))]
pub async fn process_next(&self) -> Option<()> {
// Do not proceed if the uploader is paused.
if self.pause_sync.load(std::sync::atomic::Ordering::Relaxed) {
Expand All @@ -125,6 +126,7 @@ impl FileUploader {
{
// If the current uploads count is greater than or equal to the max uploads, do not proceed.
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(10));
trace!("[File] max uploads reached, process_next after 10 seconds");
return None;
}

Expand All @@ -133,13 +135,15 @@ impl FileUploader {
.load(std::sync::atomic::Ordering::SeqCst)
{
// If the storage limitation is enabled, do not proceed.
error!("[File] storage limit exceeded, uploader is disabled");
return None;
}

let task = self.queue.tasks.write().await.pop()?;
if task.retry_count() > 5 {
// If the task has been retried more than 5 times, we should not retry it anymore.
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(2));
warn!("[File] Task has been retried more than 5 times: {}", task);
return None;
}

Expand All @@ -166,12 +170,12 @@ impl FileUploader {
.await
{
if err.is_file_limit_exceeded() {
error!("Failed to upload file: {}", err);
error!("[File] Failed to upload file: {}", err);
self.disable_storage_write();
}

info!(
"Failed to upload file: {}, retry_count:{}",
"[File] Failed to upload file: {}, retry_count:{}",
err, retry_count
);

Expand All @@ -197,12 +201,12 @@ impl FileUploader {
.await
{
if err.is_file_limit_exceeded() {
error!("Failed to upload file: {}", err);
error!("[File] failed to upload file: {}", err);
self.disable_storage_write();
}

info!(
"Failed to resume upload file: {}, retry_count:{}",
"[File] failed to resume upload file: {}, retry_count:{}",
err, retry_count
);
retry_count += 1;
Expand All @@ -216,10 +220,15 @@ impl FileUploader {
}
},
}

self
.current_uploads
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(2));
trace!("[File] process_next after 2 seconds");
self
.queue
.notifier
.send_replace(Signal::ProceedAfterSecs(2));
None
}
}
Expand Down
Loading