Skip to content

Commit

Permalink
fix notes and graceful shutdown on init failure
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Dec 10, 2024
1 parent e3d9a9f commit 5f47368
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 64 deletions.
39 changes: 1 addition & 38 deletions mm2src/kdf_walletconnect/src/connection_handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::storage::WalletConnectStorageOps;
use crate::WalletConnectCtxImpl;

use common::executor::Timer;
Expand All @@ -7,9 +6,8 @@ use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::StreamExt;
use relay_client::error::ClientError;
use relay_client::websocket::{CloseFrame, ConnectionHandler, PublishedMessage};
use std::sync::Arc;

const MAX_BACKOFF: u64 = 60;
pub(crate) const MAX_BACKOFF: u64 = 60;

pub struct Handler {
name: &'static str,
Expand Down Expand Up @@ -70,41 +68,6 @@ impl ConnectionHandler for Handler {
}
}

/// Establishes initial connection to WalletConnect relay server with linear retry mechanism.
/// Uses increasing delay between retry attempts starting from [RETRY_SECS].
/// After successful connection, attempts to restore previous session state from storage.
pub(crate) async fn spawn_connection_initialization(
wc: Arc<WalletConnectCtxImpl>,
connection_live_rx: UnboundedReceiver<Option<String>>,
) {
info!("Initializing WalletConnect connection");
let mut retry_count = 0;
let mut retry_secs = 10;

while let Err(err) = wc.connect_client().await {
retry_count += 1;
error!(
"Error during initial connection attempt {}: {:?}. Retrying in {retry_secs} seconds...",
retry_count, err
);
Timer::sleep(retry_secs as f64).await;
retry_secs = std::cmp::min(retry_secs * 2, MAX_BACKOFF);
}

// Initialize storage
if let Err(err) = wc.session_manager.storage().init().await {
error!("Unable to initialize WalletConnect persistent storage: {err:?}. Only inmemory storage will be utilized for this Session.");
};

// load session from storage
if let Err(err) = wc.load_session_from_storage().await {
panic!("Unable to load session from storage: {err:?}");
};

// Spawn session disconnection watcher.
handle_disconnections(&wc, connection_live_rx).await;
}

/// Handles unexpected disconnections from WalletConnect relay server.
/// Implements exponential backoff retry mechanism for reconnection attempts.
/// After successful reconnection, resubscribes to previous topics to restore full functionality.
Expand Down
45 changes: 42 additions & 3 deletions mm2src/kdf_walletconnect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ mod metadata;
pub mod session;
mod storage;

use crate::connection_handler::{handle_disconnections, MAX_BACKOFF};
use crate::session::rpc::propose::send_proposal_request;

use chain::{WcChainId, WcRequestMethods, SUPPORTED_PROTOCOL};
use common::custom_futures::timeout::FutureTimerExt;
use common::executor::abortable_queue::AbortableQueue;
use common::executor::{AbortableSystem, Timer};
use common::log::{debug, info};
use common::log::{debug, info, LogOnError};
use common::{executor::SpawnFuture, log::error};
use connection_handler::{spawn_connection_initialization, Handler};
use connection_handler::Handler;
use error::WalletConnectError;
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::lock::Mutex;
Expand Down Expand Up @@ -120,10 +121,11 @@ impl WalletConnectCtx {
});

// Connect to relayer client and spawn a watcher loop for disconnection.
let inner_clone = inner.clone();
inner
.abortable_system
.weak_spawner()
.spawn(spawn_connection_initialization(inner.clone(), conn_live_receiver));
.spawn(inner_clone.spawn_connection_initialization(conn_live_receiver));
// spawn message handler event loop
let inner_clone = inner.clone();
inner_clone.abortable_system.weak_spawner().spawn(async move {
Expand All @@ -146,6 +148,43 @@ impl WalletConnectCtx {
}

impl WalletConnectCtxImpl {
/// Establishes initial connection to WalletConnect relay server with linear retry mechanism.
/// Uses increasing delay between retry attempts starting from 1sec.
/// After successful connection, attempts to restore previous session state from storage.
pub(crate) async fn spawn_connection_initialization(
self: Arc<Self>,
connection_live_rx: UnboundedReceiver<Option<String>>,
) {
info!("Initializing WalletConnect connection");
let mut retry_count = 0;
let mut retry_secs = 1;

while let Err(err) = self.connect_client().await {
retry_count += 1;
error!(
"Error during initial connection attempt {}: {:?}. Retrying in {retry_secs} seconds...",
retry_count, err
);
Timer::sleep(retry_secs as f64).await;
retry_secs = std::cmp::min(retry_secs * 2, MAX_BACKOFF);
}

// Initialize storage
if let Err(err) = self.session_manager.storage().init().await {
error!("Failed to initialize WalletConnect storage, shutting down: {err:?}");
self.abortable_system.abort_all().error_log();
};

// load session from storage
if let Err(err) = self.load_session_from_storage().await {
error!("Failed to load sessions from storage, shutting down: {err:?}");
self.abortable_system.abort_all().error_log();
};

// Spawn session disconnection watcher.
handle_disconnections(&self, connection_live_rx).await;
}

pub async fn connect_client(&self) -> MmResult<(), WalletConnectError> {
let auth = {
let key = SigningKey::generate(&mut rand::thread_rng());
Expand Down
9 changes: 2 additions & 7 deletions mm2src/kdf_walletconnect/src/pairing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ pub(crate) async fn reply_pairing_extend_response(
message_id: &MessageId,
extend: PairingExtendRequest,
) -> MmResult<(), WalletConnectError> {
{
ctx.pairing.activate(topic)?;
}
ctx.pairing.activate(topic)?;
let param = ResponseParamsSuccess::PairingExtend(true);
ctx.publish_response_ok(topic, param, message_id).await?;

Expand All @@ -42,10 +40,7 @@ pub(crate) async fn reply_pairing_delete_response(
message_id: &MessageId,
_delete: PairingDeleteRequest,
) -> MmResult<(), WalletConnectError> {
{
ctx.pairing.disconnect_rpc(topic, &ctx.client).await?;
}

ctx.pairing.disconnect_rpc(topic, &ctx.client).await?;
let param = ResponseParamsSuccess::PairingDelete(true);
ctx.publish_response_ok(topic, param, message_id).await?;

Expand Down
19 changes: 9 additions & 10 deletions mm2src/kdf_walletconnect/src/session/rpc/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,16 @@ pub(crate) async fn process_session_propose_response(
session.controller.public_key = response.responder_public_key.clone();
session
};
{
// save session to storage
ctx.session_manager
.storage()
.save_session(&session)
.await
.mm_err(|err| WalletConnectError::StorageError(err.to_string()))?;

// Add session to session lists
ctx.session_manager.add_session(session.clone());
};
// save session to storage
ctx.session_manager
.storage()
.save_session(&session)
.await
.mm_err(|err| WalletConnectError::StorageError(err.to_string()))?;

// Add session to session lists
ctx.session_manager.add_session(session.clone());

// Activate pairing_topic
ctx.pairing.activate(pairing_topic)?;
Expand Down
4 changes: 2 additions & 2 deletions mm2src/kdf_walletconnect/src/session/rpc/settle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ pub(crate) async fn reply_session_settle_request(
session.expiry = settle.expiry;

if let Some(value) = settle.session_properties {
let session_properties = serde_json::from_str::<SessionProperties>(&value.to_string())?;
session.session_properties = Some(session_properties);
let session_properties = serde_json::from_str::<Option<SessionProperties>>(&value.to_string())?;
session.session_properties = session_properties;
};
};

Expand Down
3 changes: 0 additions & 3 deletions mm2src/kdf_walletconnect/src/session/rpc/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use mm2_err_handle::prelude::*;
use relay_rpc::domain::{MessageId, Topic};
use relay_rpc::rpc::params::{session_update::SessionUpdateRequest, ResponseParamsSuccess};

// TODO: Handle properly when multi chain is supported.
// Hanlding for only cosmos support.
pub(crate) async fn reply_session_update_request(
ctx: &WalletConnectCtxImpl,
topic: &Topic,
Expand All @@ -23,7 +21,6 @@ pub(crate) async fn reply_session_update_request(
.namespaces
.caip2_validate()
.map_to_mm(|err| WalletConnectError::InternalError(err.to_string()))?;
//TODO: session.namespaces.supported(update.namespaces.0)
session.namespaces = update.namespaces.0;
let session = session;
info!("Updated extended, info: {:?}", session.topic);
Expand Down
2 changes: 1 addition & 1 deletion mm2src/kdf_walletconnect/src/storage/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn create_sessions_table() -> SqlResult<String> {
validate_table_name(SESSION_TABLE_NAME)?;
Ok(format!(
"CREATE TABLE IF NOT EXISTS {SESSION_TABLE_NAME} (
topic VARCHAR(255) PRIMARY KEY,
topic char(32) PRIMARY KEY,
data TEXT NOT NULL,
expiry BIGINT NOT NULL
);"
Expand Down

0 comments on commit 5f47368

Please sign in to comment.