From 05c792e367539c2c7227a535512a614449445d1d Mon Sep 17 00:00:00 2001 From: haze518 Date: Sat, 26 Oct 2024 18:25:24 +0600 Subject: [PATCH] add trace errors in server binary --- server/src/binary/command.rs | 9 ++++++-- .../create_consumer_group_handler.rs | 22 +++++++++++++++++-- .../delete_consumer_group_handler.rs | 19 ++++++++++++++-- .../get_consumer_groups_handler.rs | 11 ++++++++-- .../join_consumer_group_handler.rs | 9 +++++++- .../leave_consumer_group_handler.rs | 9 +++++++- .../store_consumer_offset_handler.rs | 10 ++++++--- .../messages/flush_unsaved_buffer_handler.rs | 15 +++++++++---- .../messages/poll_messages_handler.rs | 9 ++++++-- .../messages/send_messages_handler.rs | 15 +++++++++---- .../partitions/create_partitions_handler.rs | 20 +++++++++++++++-- .../partitions/delete_partitions_handler.rs | 20 +++++++++++++++-- .../create_personal_access_token_handler.rs | 17 ++++++++++++-- .../delete_personal_access_token_handler.rs | 13 +++++++++-- .../get_personal_access_tokens_handler.rs | 6 ++++- ...ogin_with_personal_access_token_handler.rs | 9 +++++++- .../handlers/streams/create_stream_handler.rs | 19 ++++++++++++++-- .../handlers/streams/delete_stream_handler.rs | 14 ++++++++++-- .../handlers/streams/get_streams_handler.rs | 5 ++++- .../handlers/streams/purge_stream_handler.rs | 18 +++++++++++++-- .../handlers/streams/update_stream_handler.rs | 14 ++++++++++-- .../handlers/system/get_clients_handler.rs | 6 ++++- .../binary/handlers/system/get_me_handler.rs | 6 ++++- .../handlers/system/get_stats_handler.rs | 6 ++++- .../binary/handlers/system/ping_handler.rs | 5 ++++- .../handlers/topics/create_topic_handler.rs | 15 +++++++++++-- .../handlers/topics/delete_topic_handler.rs | 14 ++++++++++-- .../handlers/topics/get_topics_handler.rs | 10 ++++++++- .../handlers/topics/purge_topic_handler.rs | 17 ++++++++++++-- .../handlers/topics/update_topic_handler.rs | 17 ++++++++++++-- .../handlers/users/change_password_handler.rs | 17 ++++++++++++-- .../handlers/users/create_user_handler.rs | 17 ++++++++++++-- .../handlers/users/delete_user_handler.rs | 20 +++++++++++++++-- .../handlers/users/get_users_handler.rs | 6 ++++- .../handlers/users/login_user_handler.rs | 9 +++++++- .../handlers/users/logout_user_handler.rs | 6 ++++- .../users/update_permissions_handler.rs | 6 ++++- .../handlers/users/update_user_handler.rs | 19 ++++++++++++++-- 38 files changed, 412 insertions(+), 67 deletions(-) diff --git a/server/src/binary/command.rs b/server/src/binary/command.rs index 72e21c22a..5e678599e 100644 --- a/server/src/binary/command.rs +++ b/server/src/binary/command.rs @@ -21,6 +21,7 @@ use crate::binary::sender::Sender; use crate::command::ServerCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; +use error_set::ResultContext; use iggy::error::IggyError; use tracing::{debug, error}; @@ -38,12 +39,16 @@ pub async fn handle( Err(error) => { error!("Command was not handled successfully, session: {session}, error: {error}."); if let IggyError::ClientNotFound(_) = error { - sender.send_error_response(error).await?; + sender.send_error_response(error).await.with_error(|_| { + format!("BINARY - failed to send error response, session: {session}") + })?; debug!("TCP error response was sent to: {session}."); error!("Session: {session} will be deleted."); Err(IggyError::ClientNotFound(session.client_id)) } else { - sender.send_error_response(error).await?; + sender.send_error_response(error).await.with_error(|_| { + format!("BINARY - failed to send error response, session: {session}") + })?; debug!("TCP error response was sent to: {session}."); Ok(()) } diff --git a/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 4c7b1ed46..2ae1d218d 100644 --- a/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -4,6 +4,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup; use iggy::error::IggyError; use tracing::{debug, instrument}; @@ -27,18 +28,35 @@ pub async fn handle( command.group_id, &command.name, ) - .await?; + .await + .with_error(|_| { + format!( + "CONSUMER_GROUP_HANDLER - failed to create consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {:?}", + command.stream_id, command.topic_id, command.group_id, session + ) + })?; let consumer_group = consumer_group.read().await; response = mapper::map_consumer_group(&consumer_group).await; } + let system = system.read().await; + let stream_id = command.stream_id.clone(); + let topic_id = command.topic_id.clone(); + let group_id = command.group_id.clone(); + system .state .apply( session.get_user_id(), EntryCommand::CreateConsumerGroup(command), ) - .await?; + .await + .with_error(|_| { + format!( + "CONSUMER_GROUP_HANDLER - failed to apply create consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {}", + stream_id, topic_id, group_id, session + ) + })?; sender.send_ok_response(&response).await?; Ok(()) } diff --git a/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index c33aa8f16..b9b3fa3ad 100644 --- a/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::consumer_groups::delete_consumer_group::DeleteConsumerGroup; use iggy::error::IggyError; use tracing::{debug, instrument}; @@ -24,16 +25,30 @@ pub async fn handle( &command.topic_id, &command.group_id, ) - .await?; + .await.with_error(|_| format!( + "CONSUMER_GROUP_HANDLER - failed to delete consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {}", + command.stream_id, command.topic_id, command.group_id, session + ))?; } + let system = system.read().await; + let stream_id = command.stream_id.clone(); + let topic_id = command.topic_id.clone(); + let group_id = command.group_id.clone(); + system .state .apply( session.get_user_id(), EntryCommand::DeleteConsumerGroup(command), ) - .await?; + .await + .with_error(|_| { + format!( + "CONSUMER_GROUP_HANDLER - failed to apply delete consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {}", + stream_id, topic_id, group_id, session + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs b/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs index 73b34722a..0b6393342 100644 --- a/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs +++ b/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs @@ -3,6 +3,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::consumer_groups::get_consumer_groups::GetConsumerGroups; use iggy::error::IggyError; use tracing::debug; @@ -15,8 +16,14 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let consumer_groups = - system.get_consumer_groups(session, &command.stream_id, &command.topic_id)?; + let consumer_groups = system + .get_consumer_groups(session, &command.stream_id, &command.topic_id) + .with_error(|_| { + format!( + "CONSUMER_GROUP_HANDLER - failed on getting consumer groups for stream_id: {}, topic_id: {}, session: {}", + command.stream_id, command.topic_id, session + ) + })?; let consumer_groups = mapper::map_consumer_groups(&consumer_groups).await; sender.send_ok_response(&consumer_groups).await?; Ok(()) diff --git a/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs index df5f89478..7df3fb110 100644 --- a/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs @@ -2,6 +2,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup; use iggy::error::IggyError; use tracing::{debug, instrument}; @@ -22,7 +23,13 @@ pub async fn handle( &command.topic_id, &command.group_id, ) - .await?; + .await + .with_error(|_| { + format!( + "CONSUMER_GROUP_HANDLER - failed to join consumer group for stream_id: {}, topic_id: {}, group_id: {}, session: {}", + command.stream_id, command.topic_id, command.group_id, session + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs index 416cd4452..fc5e37d5a 100644 --- a/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs @@ -2,6 +2,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::consumer_groups::leave_consumer_group::LeaveConsumerGroup; use iggy::error::IggyError; use tracing::{debug, instrument}; @@ -22,7 +23,13 @@ pub async fn handle( &command.topic_id, &command.group_id, ) - .await?; + .await + .with_error(|_| { + format!( + "CONSUMER_GROUP_HANDLER - failed to leave consumer group for stream_id: {}, topic_id: {}, group_id: {}, session: {}", + command.stream_id, command.topic_id, command.group_id, session + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs b/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs index 5c7ab4033..fb280317d 100644 --- a/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs +++ b/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs @@ -2,6 +2,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::consumer_offsets::store_consumer_offset::StoreConsumerOffset; use iggy::error::IggyError; use tracing::debug; @@ -20,10 +21,13 @@ pub async fn handle( command.consumer, &command.stream_id, &command.topic_id, - command.partition_id, - command.offset, + command.partition_id.clone(), + command.offset.clone(), ) - .await?; + .await + .with_error(|_| format!("CONSUMER_OFFSET_HANDLER - failed to store consumer offset for stream_id: {}, topic_id: {}, partition_id: {:?}, offset: {}, session: {}", + command.stream_id, command.topic_id, command.partition_id, command.offset, session + ))?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs b/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs index 85f6c4abd..f63081fd4 100644 --- a/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs +++ b/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs @@ -2,6 +2,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::messages::flush_unsaved_buffer::FlushUnsavedBuffer; use tracing::{debug, instrument}; @@ -15,13 +16,19 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let stream_id = command.stream_id; - let topic_id = command.topic_id; - let partition_id = command.partition_id; + let stream_id = command.stream_id.clone(); + let topic_id = command.topic_id.clone(); + let partition_id = command.partition_id.clone(); let fsync = command.fsync; system .flush_unsaved_buffer(session, stream_id, topic_id, partition_id, fsync) - .await?; + .await + .with_error(|_| { + format!( + "MESSAGE_HANDLER - failed to flush unsaved buffer for stream_id: {}, topic_id: {}, partition_id: {}, session: {}", + command.stream_id, command.topic_id, command.partition_id, session + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/messages/poll_messages_handler.rs b/server/src/binary/handlers/messages/poll_messages_handler.rs index 24174ffdf..a760fd8db 100644 --- a/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -4,6 +4,7 @@ use crate::streaming::session::Session; use crate::streaming::systems::messages::PollingArgs; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::messages::poll_messages::PollMessages; use tracing::debug; @@ -22,10 +23,14 @@ pub async fn handle( &command.consumer, &command.stream_id, &command.topic_id, - command.partition_id, + command.partition_id.clone(), PollingArgs::new(command.strategy, command.count, command.auto_commit), ) - .await?; + .await + .with_error(|_| format!( + "MESSAGE_HANDLER - failed to poll messages for consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session: {}", + command.consumer, command.stream_id, command.topic_id, command.partition_id, session + ))?; let messages = mapper::map_polled_messages(&messages); sender.send_ok_response(&messages).await?; Ok(()) diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs b/server/src/binary/handlers/messages/send_messages_handler.rs index 5eb60852a..24d1220a9 100644 --- a/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/server/src/binary/handlers/messages/send_messages_handler.rs @@ -2,6 +2,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::messages::send_messages::SendMessages; use tracing::debug; @@ -14,13 +15,19 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let stream_id = command.stream_id; - let topic_id = command.topic_id; - let partitioning = command.partitioning; + let stream_id = command.stream_id.clone(); + let topic_id = command.topic_id.clone(); + let partitioning = command.partitioning.clone(); let messages = command.messages; system .append_messages(session, stream_id, topic_id, partitioning, messages) - .await?; + .await + .with_error(|_| { + format!( + "MESSAGE_HANDLER - failed to append messages for stream_id: {}, topic_id: {}, partitioning: {}, session: {}", + command.stream_id, command.topic_id, command.partitioning, session + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/partitions/create_partitions_handler.rs b/server/src/binary/handlers/partitions/create_partitions_handler.rs index 0de42e406..535cb7fa3 100644 --- a/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/server/src/binary/handlers/partitions/create_partitions_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::partitions::create_partitions::CreatePartitions; use tracing::{debug, instrument}; @@ -24,17 +25,32 @@ pub async fn handle( &command.topic_id, command.partitions_count, ) - .await?; + .await + .with_error(|_| { + format!( + "PARTITIONS_HANDLER - failed to create partitions for stream_id: {}, topic_id: {}, session: {}", + command.stream_id, command.topic_id, session + ) + })?; } let system = system.read().await; + let stream_id = command.stream_id.clone(); + let topic_id = command.topic_id.clone(); + system .state .apply( session.get_user_id(), EntryCommand::CreatePartitions(command), ) - .await?; + .await + .with_error(|_| { + format!( + "PARTITIONS_HANDLER - failed to apply create partitions for stream_id: {}, topic_id: {}, session: {}", + stream_id, topic_id, session + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/server/src/binary/handlers/partitions/delete_partitions_handler.rs index b24e98b51..7db240bab 100644 --- a/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::partitions::delete_partitions::DeletePartitions; use tracing::{debug, instrument}; @@ -15,6 +16,9 @@ pub async fn handle( system: &SharedSystem, ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); + let stream_id = command.stream_id.clone(); + let topic_id = command.topic_id.clone(); + { let mut system = system.write().await; system @@ -24,7 +28,13 @@ pub async fn handle( &command.topic_id, command.partitions_count, ) - .await?; + .await + .with_error(|_| { + format!( + "PARTITIONS_HANDLER - failed to delete partitions for stream_id: {}, topic_id: {}, session: {}", + stream_id, topic_id, session + ) + })?; } let system = system.read().await; @@ -34,7 +44,13 @@ pub async fn handle( session.get_user_id(), EntryCommand::DeletePartitions(command), ) - .await?; + .await + .with_error(|_| { + format!( + "PARTITIONS_HANDLER - failed to apply delete partitions for stream_id: {}, topic_id: {}, session: {}", + stream_id, topic_id, session + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs b/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index ee9e37d06..19b42b173 100644 --- a/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@ -6,6 +6,7 @@ use crate::streaming::personal_access_tokens::personal_access_token::PersonalAcc use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::personal_access_tokens::create_personal_access_token::CreatePersonalAccessToken; use tracing::{debug, instrument}; @@ -24,7 +25,13 @@ pub async fn handle( let mut system = system.write().await; let token = system .create_personal_access_token(session, &command.name, command.expiry) - .await?; + .await + .with_error(|_| { + format!( + "PERSONAL_ACCESS_TOKEN_HANDLER - failed to create personal access token with name: {}, session: {session}", + command.name + ) + })?; bytes = mapper::map_raw_pat(&token); token_hash = PersonalAccessToken::hash_token(&token); } @@ -42,7 +49,13 @@ pub async fn handle( hash: token_hash, }), ) - .await?; + .await + .with_error(|_| { + format!( + "PERSONAL_ACCESS_TOKEN_HANDLER - failed to create personal access token with name: {}, session: {session}", + command.name + ) + })?; sender.send_ok_response(&bytes).await?; Ok(()) } diff --git a/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs b/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs index 47e273ec4..d0b869038 100644 --- a/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs +++ b/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::personal_access_tokens::delete_personal_access_token::DeletePersonalAccessToken; use tracing::{debug, instrument}; @@ -15,11 +16,16 @@ pub async fn handle( system: &SharedSystem, ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); + let token_name = command.name.clone(); + { let mut system = system.write().await; system .delete_personal_access_token(session, &command.name) - .await?; + .await + .with_error(|_| {format!( + "PERSONAL_ACCESS_TOKEN_HANDLER - failed to delete personal access token with name: {token_name}, session: {session}" + )})?; } let system = system.read().await; @@ -29,7 +35,10 @@ pub async fn handle( session.get_user_id(), EntryCommand::DeletePersonalAccessToken(command), ) - .await?; + .await + .with_error(|_| {format!( + "PERSONAL_ACCESS_TOKEN_HANDLER - failed to apply delete personal access token with name: {token_name}, session: {session}" + )})?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs b/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs index 200e0f4ae..8da161907 100644 --- a/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs +++ b/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs @@ -2,6 +2,7 @@ use crate::binary::mapper; use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokens; use tracing::debug; @@ -14,7 +15,10 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let personal_access_tokens = system.get_personal_access_tokens(session).await?; + let personal_access_tokens = system + .get_personal_access_tokens(session) + .await + .with_error(|_| format!("PERSONAL_ACCESS_TOKEN_HANDLER - failed to get personal access tokens with session: {session}"))?; let personal_access_tokens = mapper::map_personal_access_tokens(&personal_access_tokens); sender.send_ok_response(&personal_access_tokens).await?; Ok(()) diff --git a/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs b/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs index f368fe2d2..a66eb9371 100644 --- a/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs +++ b/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs @@ -3,6 +3,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::personal_access_tokens::login_with_personal_access_token::LoginWithPersonalAccessToken; use tracing::{debug, instrument}; @@ -18,7 +19,13 @@ pub async fn handle( let system = system.read().await; let user = system .login_with_personal_access_token(&command.token, Some(session)) - .await?; + .await + .with_error(|_| { + format!( + "PERSONAL_ACCESS_TOKEN_HANDLER - failed to login with personal access token: {}, session: {session}", + command.token + ) + })?; let identity_info = mapper::map_identity_info(user.id); sender.send_ok_response(&identity_info).await?; Ok(()) diff --git a/server/src/binary/handlers/streams/create_stream_handler.rs b/server/src/binary/handlers/streams/create_stream_handler.rs index 57de3dbb6..3a0436fb2 100644 --- a/server/src/binary/handlers/streams/create_stream_handler.rs +++ b/server/src/binary/handlers/streams/create_stream_handler.rs @@ -4,6 +4,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::streams::create_stream::CreateStream; use tracing::{debug, instrument}; @@ -17,11 +18,19 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let response; + let stream_id = command.stream_id.clone(); + { let mut system = system.write().await; let stream = system .create_stream(session, command.stream_id, &command.name) - .await?; + .await + .with_error(|_| { + format!( + "STREAM_HANDLER - failed to create stream with id: {:?}, session: {session}", + stream_id + ) + })?; response = mapper::map_stream(stream); } @@ -29,7 +38,13 @@ pub async fn handle( system .state .apply(session.get_user_id(), EntryCommand::CreateStream(command)) - .await?; + .await + .with_error(|_| { + format!( + "STREAM_HANDLER - failed to apply create stream for id: {:?}, session: {session}", + stream_id + ) + })?; sender.send_ok_response(&response).await?; Ok(()) } diff --git a/server/src/binary/handlers/streams/delete_stream_handler.rs b/server/src/binary/handlers/streams/delete_stream_handler.rs index d636f94b1..33d81cb39 100644 --- a/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::streams::delete_stream::DeleteStream; use tracing::{debug, instrument}; @@ -15,16 +16,25 @@ pub async fn handle( system: &SharedSystem, ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); + let stream_id = command.stream_id.clone(); { let mut system = system.write().await; - system.delete_stream(session, &command.stream_id).await?; + system + .delete_stream(session, &command.stream_id) + .await + .with_error(|_| { + format!("STREAM_HANDLER - failed to delete stream with id: {stream_id}, session: {session}") + })?; } let system = system.read().await; system .state .apply(session.get_user_id(), EntryCommand::DeleteStream(command)) - .await?; + .await + .with_error(|_| { + format!("STREAM_HANDLER - failed to apply delete stream with id: {stream_id}, session: {session}") + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/streams/get_streams_handler.rs b/server/src/binary/handlers/streams/get_streams_handler.rs index 6009b7d00..2d0e6a107 100644 --- a/server/src/binary/handlers/streams/get_streams_handler.rs +++ b/server/src/binary/handlers/streams/get_streams_handler.rs @@ -3,6 +3,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::streams::get_streams::GetStreams; use tracing::debug; @@ -15,7 +16,9 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let streams = system.find_streams(session)?; + let streams = system.find_streams(session).with_error(|_| { + format!("STREAM_HANDLER - failed to find streams for session: {session}") + })?; let response = mapper::map_streams(&streams); sender.send_ok_response(&response).await?; Ok(()) diff --git a/server/src/binary/handlers/streams/purge_stream_handler.rs b/server/src/binary/handlers/streams/purge_stream_handler.rs index eb3fb635c..69a75b2d5 100644 --- a/server/src/binary/handlers/streams/purge_stream_handler.rs +++ b/server/src/binary/handlers/streams/purge_stream_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::streams::purge_stream::PurgeStream; use tracing::{debug, instrument}; @@ -16,11 +17,24 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - system.purge_stream(session, &command.stream_id).await?; + let stream_id = command.stream_id.clone(); + + system + .purge_stream(session, &command.stream_id) + .await + .with_error(|_| { + format!( + "STREAM_HANDLER - failed to purge stream with id: {stream_id}, session: {session}" + ) + })?; + system .state .apply(session.get_user_id(), EntryCommand::PurgeStream(command)) - .await?; + .await + .with_error(|_| { + format!("STREAM_HANDLER - failed to apply purge stream with id: {stream_id}, session: {session}") + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/streams/update_stream_handler.rs b/server/src/binary/handlers/streams/update_stream_handler.rs index a60d2ff2d..3faec2681 100644 --- a/server/src/binary/handlers/streams/update_stream_handler.rs +++ b/server/src/binary/handlers/streams/update_stream_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::streams::update_stream::UpdateStream; use tracing::{debug, instrument}; @@ -15,18 +16,27 @@ pub async fn handle( system: &SharedSystem, ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); + let stream_id = command.stream_id.clone(); + { let mut system = system.write().await; system .update_stream(session, &command.stream_id, &command.name) - .await?; + .await + .with_error(|_| { + format!("STREAM_HANDLER - failed to update stream with id: {stream_id}, session: {session}") + })?; } let system = system.read().await; + system .state .apply(session.get_user_id(), EntryCommand::UpdateStream(command)) - .await?; + .await + .with_error(|_| { + format!("STREAM_HANDLER - failed to apply update stream with id: {stream_id}, session: {session}") + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/system/get_clients_handler.rs b/server/src/binary/handlers/system/get_clients_handler.rs index e7b83aa55..36e02b6e7 100644 --- a/server/src/binary/handlers/system/get_clients_handler.rs +++ b/server/src/binary/handlers/system/get_clients_handler.rs @@ -2,6 +2,7 @@ use crate::binary::mapper; use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::system::get_clients::GetClients; use tracing::debug; @@ -14,7 +15,10 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let clients = system.get_clients(session).await?; + let clients = system + .get_clients(session) + .await + .with_error(|_| format!("SYSTEM_HANDLER - failed to get clients, session: {session}"))?; let clients = mapper::map_clients(&clients).await; sender.send_ok_response(&clients).await?; Ok(()) diff --git a/server/src/binary/handlers/system/get_me_handler.rs b/server/src/binary/handlers/system/get_me_handler.rs index 834c2ac79..b38c2f9cf 100644 --- a/server/src/binary/handlers/system/get_me_handler.rs +++ b/server/src/binary/handlers/system/get_me_handler.rs @@ -2,6 +2,7 @@ use crate::binary::mapper; use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::locking::IggySharedMutFn; use iggy::system::get_me::GetMe; @@ -17,7 +18,10 @@ pub async fn handle( let bytes; { let system = system.read().await; - let client = system.get_client(session, session.client_id).await?; + let client = system + .get_client(session, session.client_id) + .await + .with_error(|_| format!("SYSTEM_HANDLER - failed to get client, session: {session}"))?; { let client = client.read().await; bytes = mapper::map_client(&client); diff --git a/server/src/binary/handlers/system/get_stats_handler.rs b/server/src/binary/handlers/system/get_stats_handler.rs index f7a2fe9bc..8934be9ea 100644 --- a/server/src/binary/handlers/system/get_stats_handler.rs +++ b/server/src/binary/handlers/system/get_stats_handler.rs @@ -2,6 +2,7 @@ use crate::binary::mapper; use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::system::get_stats::GetStats; use tracing::debug; @@ -14,7 +15,10 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let stats = system.get_stats(session).await?; + let stats = system + .get_stats(session) + .await + .with_error(|_| format!("SYSTEM_HANDLER - failed to get stats, session: {session}"))?; let bytes = mapper::map_stats(&stats); sender.send_ok_response(&bytes).await?; Ok(()) diff --git a/server/src/binary/handlers/system/ping_handler.rs b/server/src/binary/handlers/system/ping_handler.rs index 4b983e50a..7de8fc783 100644 --- a/server/src/binary/handlers/system/ping_handler.rs +++ b/server/src/binary/handlers/system/ping_handler.rs @@ -2,6 +2,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::locking::IggySharedMutFn; use iggy::system::ping::Ping; @@ -17,7 +18,9 @@ pub async fn handle( debug!("session: {session}, command: {command}"); let system = system.read().await; let client_manager = system.client_manager.read().await; - let client = client_manager.get_client(session.client_id)?; + let client = client_manager + .get_client(session.client_id) + .with_error(|_| format!("SYSTEM_HANDLER - failed to get clients, session: {session}"))?; let mut client = client.write().await; let now = IggyTimestamp::now(); client.last_heartbeat = now; diff --git a/server/src/binary/handlers/topics/create_topic_handler.rs b/server/src/binary/handlers/topics/create_topic_handler.rs index afb15c567..1c53bf5b2 100644 --- a/server/src/binary/handlers/topics/create_topic_handler.rs +++ b/server/src/binary/handlers/topics/create_topic_handler.rs @@ -4,6 +4,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::topics::create_topic::CreateTopic; use tracing::{debug, instrument}; @@ -16,6 +17,9 @@ pub async fn handle( system: &SharedSystem, ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); + let stream_id = command.stream_id.clone(); + let topic_id = command.topic_id.clone(); + let response; { let mut system = system.write().await; @@ -31,7 +35,10 @@ pub async fn handle( command.max_topic_size, command.replication_factor, ) - .await?; + .await + .with_error(|_| format!("TOPIC_HANDLER - failed to create topic for stream_id: {stream_id}, topic_id: {:?}", + topic_id + ))?; command.message_expiry = topic.message_expiry; command.max_topic_size = topic.max_topic_size; response = mapper::map_topic(topic).await; @@ -41,7 +48,11 @@ pub async fn handle( system .state .apply(session.get_user_id(), EntryCommand::CreateTopic(command)) - .await?; + .await + .with_error(|_| format!( + "TOPIC_HANDLER - failed to apply create topic for stream_id: {stream_id}, topic_id: {:?}", + topic_id + ))?; sender.send_ok_response(&response).await?; Ok(()) } diff --git a/server/src/binary/handlers/topics/delete_topic_handler.rs b/server/src/binary/handlers/topics/delete_topic_handler.rs index 3945f3604..d37539ae6 100644 --- a/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::topics::delete_topic::DeleteTopic; use tracing::{debug, instrument}; @@ -15,18 +16,27 @@ pub async fn handle( system: &SharedSystem, ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); + let stream_id = command.stream_id.clone(); + let topic_id = command.topic_id.clone(); + { let mut system = system.write().await; system .delete_topic(session, &command.stream_id, &command.topic_id) - .await?; + .await + .with_error(|_| format!( + "TOPIC_HANDLER - failed to delete topic for stream_id: {stream_id}, topic_id: {topic_id}", + ))?; } let system = system.read().await; system .state .apply(session.get_user_id(), EntryCommand::DeleteTopic(command)) - .await?; + .await + .with_error(|_| format!( + "TOPIC_HANDLER - failed to apply delete topic for stream_id: {stream_id}, topic_id: {topic_id}", + ))?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/topics/get_topics_handler.rs b/server/src/binary/handlers/topics/get_topics_handler.rs index 891c440e1..fcef67628 100644 --- a/server/src/binary/handlers/topics/get_topics_handler.rs +++ b/server/src/binary/handlers/topics/get_topics_handler.rs @@ -3,6 +3,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::topics::get_topics::GetTopics; use tracing::debug; @@ -15,7 +16,14 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let topics = system.find_topics(session, &command.stream_id)?; + let topics = system + .find_topics(session, &command.stream_id) + .with_error(|_| { + format!( + "TOPIC_HANDLER - failed to find topics, stream_id: {}, session: {session}", + command.stream_id + ) + })?; let response = mapper::map_topics(&topics); sender.send_ok_response(&response).await?; Ok(()) diff --git a/server/src/binary/handlers/topics/purge_topic_handler.rs b/server/src/binary/handlers/topics/purge_topic_handler.rs index 6c2aff4df..73c9e4118 100644 --- a/server/src/binary/handlers/topics/purge_topic_handler.rs +++ b/server/src/binary/handlers/topics/purge_topic_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::topics::purge_topic::PurgeTopic; use tracing::{debug, instrument}; @@ -18,11 +19,23 @@ pub async fn handle( let system = system.read().await; system .purge_topic(session, &command.stream_id, &command.topic_id) - .await?; + .await + .with_error(|_| { + format!( + "TOPIC_HANDLER - failed to purge topic with id: {}, stream_id: {}", + command.topic_id, command.stream_id + ) + })?; + + let topic_id = command.topic_id.clone(); + let stream_id = command.stream_id.clone(); system .state .apply(session.get_user_id(), EntryCommand::PurgeTopic(command)) - .await?; + .await + .with_error(|_| format!( + "TOPIC_HANDLER - failed to apply purge topic with id: {topic_id}, stream_id: {stream_id}", + ))?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/topics/update_topic_handler.rs b/server/src/binary/handlers/topics/update_topic_handler.rs index 33f5a62b2..937a30caa 100644 --- a/server/src/binary/handlers/topics/update_topic_handler.rs +++ b/server/src/binary/handlers/topics/update_topic_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::topics::update_topic::UpdateTopic; use tracing::{debug, instrument}; @@ -17,6 +18,7 @@ pub async fn handle( debug!("session: {session}, command: {command}"); { let mut system = system.write().await; + let topic = system .update_topic( session, @@ -28,16 +30,27 @@ pub async fn handle( command.max_topic_size, command.replication_factor, ) - .await?; + .await + .with_error(|_| format!( + "TOPIC_HANDLER - failed to update topic with id: {}, stream_id: {}, session: {session}", + command.topic_id, command.stream_id + ))?; command.message_expiry = topic.message_expiry; command.max_topic_size = topic.max_topic_size; } + let topic_id = command.topic_id.clone(); + let stream_id = command.stream_id.clone(); let system = system.read().await; + system .state .apply(session.get_user_id(), EntryCommand::UpdateTopic(command)) - .await?; + .await + .with_error(|_| format!( + "TOPIC_HANDLER - failed to apply update topic with id: {}, stream_id: {}, session: {session}", + topic_id, stream_id + ))?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/users/change_password_handler.rs b/server/src/binary/handlers/users/change_password_handler.rs index 700d1ef83..5686ef384 100644 --- a/server/src/binary/handlers/users/change_password_handler.rs +++ b/server/src/binary/handlers/users/change_password_handler.rs @@ -4,6 +4,7 @@ use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use crate::streaming::utils::crypto; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::users::change_password::ChangePassword; use tracing::{debug, instrument}; @@ -25,7 +26,13 @@ pub async fn handle( &command.current_password, &command.new_password, ) - .await?; + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to change password for user_id: {}, session: {session}", + command.user_id + ) + })?; } // For the security of the system, we hash the password before storing it in metadata. @@ -40,7 +47,13 @@ pub async fn handle( new_password: crypto::hash_password(&command.new_password), }), ) - .await?; + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to apply change password for user_id: {}, session: {session}", + command.user_id + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/users/create_user_handler.rs b/server/src/binary/handlers/users/create_user_handler.rs index b831d45d9..703d00f5a 100644 --- a/server/src/binary/handlers/users/create_user_handler.rs +++ b/server/src/binary/handlers/users/create_user_handler.rs @@ -5,6 +5,7 @@ use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use crate::streaming::utils::crypto; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::users::create_user::CreateUser; use tracing::{debug, instrument}; @@ -28,7 +29,13 @@ pub async fn handle( command.status, command.permissions.clone(), ) - .await?; + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to create user with name: {}, session: {session}", + command.username + ) + })?; response = mapper::map_user(user); } @@ -45,7 +52,13 @@ pub async fn handle( permissions: command.permissions.clone(), }), ) - .await?; + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to apply create user with name: {}, session: {session}", + command.username + ) + })?; sender.send_ok_response(&response).await?; Ok(()) } diff --git a/server/src/binary/handlers/users/delete_user_handler.rs b/server/src/binary/handlers/users/delete_user_handler.rs index 678a276f1..0019ab7c4 100644 --- a/server/src/binary/handlers/users/delete_user_handler.rs +++ b/server/src/binary/handlers/users/delete_user_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::users::delete_user::DeleteUser; use tracing::{debug, instrument}; @@ -17,14 +18,29 @@ pub async fn handle( debug!("session: {session}, command: {command}"); { let mut system = system.write().await; - system.delete_user(session, &command.user_id).await?; + system + .delete_user(session, &command.user_id) + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to delete user with id: {}, session: {session}", + command.user_id + ) + })?; } let system = system.read().await; + let user_id = command.user_id.clone(); system .state .apply(session.get_user_id(), EntryCommand::DeleteUser(command)) - .await?; + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to apply delete user with id: {}, session: {session}", + user_id + ) + })?; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/users/get_users_handler.rs b/server/src/binary/handlers/users/get_users_handler.rs index 3e5180cd0..f6842c1e5 100644 --- a/server/src/binary/handlers/users/get_users_handler.rs +++ b/server/src/binary/handlers/users/get_users_handler.rs @@ -2,6 +2,7 @@ use crate::binary::mapper; use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::users::get_users::GetUsers; use tracing::debug; @@ -14,7 +15,10 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let users = system.get_users(session).await?; + let users = system + .get_users(session) + .await + .with_error(|_| format!("USER_HANDLER - failed to get users, session: {session}"))?; let users = mapper::map_users(&users); sender.send_ok_response(&users).await?; Ok(()) diff --git a/server/src/binary/handlers/users/login_user_handler.rs b/server/src/binary/handlers/users/login_user_handler.rs index 5c898831e..4f47a6cc4 100644 --- a/server/src/binary/handlers/users/login_user_handler.rs +++ b/server/src/binary/handlers/users/login_user_handler.rs @@ -3,6 +3,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::users::login_user::LoginUser; use tracing::{debug, instrument}; @@ -18,7 +19,13 @@ pub async fn handle( let system = system.read().await; let user = system .login_user(&command.username, &command.password, Some(session)) - .await?; + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to login user with name: {}, session: {session}", + command.username + ) + })?; let identity_info = mapper::map_identity_info(user.id); sender.send_ok_response(&identity_info).await?; Ok(()) diff --git a/server/src/binary/handlers/users/logout_user_handler.rs b/server/src/binary/handlers/users/logout_user_handler.rs index 8867eb0f5..026ca58dd 100644 --- a/server/src/binary/handlers/users/logout_user_handler.rs +++ b/server/src/binary/handlers/users/logout_user_handler.rs @@ -2,6 +2,7 @@ use crate::binary::sender::Sender; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::users::logout_user::LogoutUser; use tracing::{debug, instrument}; @@ -15,7 +16,10 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - system.logout_user(session).await?; + system + .logout_user(session) + .await + .with_error(|_| format!("USER_HANDLER - failed to logout user, session: {session}"))?; session.clear_user_id(); sender.send_empty_ok_response().await?; Ok(()) diff --git a/server/src/binary/handlers/users/update_permissions_handler.rs b/server/src/binary/handlers/users/update_permissions_handler.rs index 85dd29aa5..aefed13d2 100644 --- a/server/src/binary/handlers/users/update_permissions_handler.rs +++ b/server/src/binary/handlers/users/update_permissions_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::users::update_permissions::UpdatePermissions; use tracing::{debug, instrument}; @@ -19,7 +20,10 @@ pub async fn handle( let mut system = system.write().await; system .update_permissions(session, &command.user_id, command.permissions.clone()) - .await?; + .await + .with_error(|_| format!("USER_HANDLER - failed to update permissions for user_id: {}, session: {session}", + command.user_id + ))?; } let system = system.read().await; diff --git a/server/src/binary/handlers/users/update_user_handler.rs b/server/src/binary/handlers/users/update_user_handler.rs index 712c9bc91..b6f12a12c 100644 --- a/server/src/binary/handlers/users/update_user_handler.rs +++ b/server/src/binary/handlers/users/update_user_handler.rs @@ -3,6 +3,7 @@ use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ResultContext; use iggy::error::IggyError; use iggy::users::update_user::UpdateUser; use tracing::{debug, instrument}; @@ -24,14 +25,28 @@ pub async fn handle( command.username.clone(), command.status, ) - .await?; + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to update user with user_id: {}, session: {session}", + command.user_id + ) + })?; } let system = system.read().await; + let user_id = command.user_id.clone(); + system .state .apply(session.get_user_id(), EntryCommand::UpdateUser(command)) - .await?; + .await + .with_error(|_| { + format!( + "USER_HANDLER - failed to apply update user with user_id: {}, session: {session}", + user_id + ) + })?; sender.send_empty_ok_response().await?; Ok(()) }