Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: simplify ledger synchronization protocol #86

Merged
merged 13 commits into from
Jun 9, 2022
Merged
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: CI
on:
pull_request:
push:
branches:
- 'main'
schedule:
- cron: '0 0 * * */2'
env:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [\#93](https://github.com/Manta-Network/manta-rs/pull/93) Add Changelog and Update Contributing Guidelines

### Changed
- [\#86](https://github.com/Manta-Network/manta-rs/pull/86) Allow Wallet to Synchronize with Signer before talking to Ledger

### Deprecated

Expand Down
42 changes: 19 additions & 23 deletions manta-accounting/src/wallet/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use manta_util::serde::{Deserialize, Serialize};
/// Ledger Connection
///
/// This is the base `trait` for defining a connection with a ledger. To communicate with the
/// ledger, you can establish such a connection first and then interact via the [`Read`] and
/// [`Write`] traits which send messages along the connection.
/// ledger, you can establish a connection first and then interact via the [`Read`] and [`Write`]
/// `trait`s which send messages along the connection.
pub trait Connection {
/// Error Type
///
/// This error type corresponds to the communication channel setup by the [`Connection`] rather
/// than any errors introduced by [`read`] or [`write`] methods. Instead those methods should
/// than any errors introduced by [`read`] or [`write`] methods. Instead, those methods should
/// return errors in their `Response` types.
///
/// [`read`]: Read::read
Expand All @@ -44,12 +44,21 @@ pub trait Connection {
/// The checkpoint type is responsible for keeping the ledger, signer, and wallet in sync with each
/// other making sure that they all have the same view of the ledger state. Checkpoints should
/// be orderable with a bottom element returned by [`Default::default`]. Types implementing this
/// `trait` must also implement [`Clone`] as it must be safe (but not necessarily efficient) to
/// copy a checkpoint value.
pub trait Checkpoint: Clone + Default + PartialOrd {}
/// `trait` must also implement [`Clone`], [`Send`], and [`Sync`] as it must be safe
/// (but not necessarily efficient) to copy a checkpoint value and share it across threads.
pub trait Checkpoint: Clone + Default + PartialOrd + Send + Sync {}

/// Ledger Data Pruning
pub trait Prune<T>
/// Ledger Data
///
/// In order to keep track of updates from [`read`] calls through the [`Read`] `trait`, all data
/// that comes from the ledger must be compatible with the checkpoints that request them. This
/// `trait` requires that data can be pruned to meet a known checkpoint and can also update its own
/// origin checkpoint to the state that it would be at after the state would return itself from
/// the [`read`] call. In the same way that the [`Checkpoint`]s represent monotonically increasing
/// state markers, the data returned from the ledger must fit into this increasing set.
///
/// [`read`]: Read::read
pub trait Data<T>
where
T: Checkpoint,
{
Expand All @@ -69,7 +78,7 @@ pub trait Read<D>: Connection {
fn read<'s>(
&'s mut self,
checkpoint: &'s Self::Checkpoint,
) -> LocalBoxFutureResult<'s, ReadResponse<Self::Checkpoint, D>, Self::Error>;
) -> LocalBoxFutureResult<'s, ReadResponse<D>, Self::Error>;
}

/// Ledger Connection Read Response
Expand All @@ -82,26 +91,13 @@ pub trait Read<D>: Connection {
serde(crate = "manta_util::serde", deny_unknown_fields)
)]
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct ReadResponse<T, D>
where
T: Checkpoint,
{
pub struct ReadResponse<D> {
/// Read Continuation Flag
///
/// The `should_continue` flag is set to `true` if the client should request more data from the
/// ledger to finish the requested [`read`](Read::read).
pub should_continue: bool,

/// Next Ledger Checkpoint
///
/// This checkpoint represents the new checkpoint that the client should be synchronized to when
/// incorporating the [`data`] returned by the [`read`] request. To continue the request the
/// client should send this checkpoint in their next call to [`read`].
///
/// [`data`]: Self::data
/// [`read`]: Read::read
pub next_checkpoint: T,

/// Data Payload
///
/// This is the data payload that was returned by the ledger corresponding to the
Expand Down
144 changes: 82 additions & 62 deletions manta-accounting/src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
//! This module defines the notion of a "wallet" which can store and manage accounts that control
//! private assets, those defined in [`crate::asset`] and [`crate::transfer`]. The [`Wallet`]
//! abstraction implements the main interface to an account and requires two asynchronous
//! connections, one to a zero-knowledge signing source and secret manager called the [`Signer`] and
//! another connection to the [`Ledger`] itself. The wallet itself only stores the information
//! related to the current balances of any particular account and queries the [`Signer`] and the
//! [`Ledger`] to get the newest balances from incoming transactions and to send out transactions of
//! its own.
//! connections, one to a transaction signer and secret manager called the [`Signer`] and another
//! connection to the [`Ledger`] itself. The wallet itself only stores the information related to
//! the current balances of any particular account and queries the [`Signer`] and the [`Ledger`] to
//! get the newest balances from incoming transactions and to send out transactions of its own.
//!
//! [`Signer`]: signer::Connection
//! [`Ledger`]: ledger::Connection
Expand All @@ -38,8 +37,8 @@ use crate::{
balance::{BTreeMapBalanceState, BalanceState},
ledger::ReadResponse,
signer::{
ReceivingKeyRequest, SignError, SignRequest, SignResponse, SyncData, SyncError,
SyncRequest, SyncResponse,
BalanceUpdate, ReceivingKeyRequest, SignError, SignRequest, SignResponse, SyncData,
SyncError, SyncRequest, SyncResponse,
},
},
};
Expand Down Expand Up @@ -107,17 +106,35 @@ where
/// # Setting Up the Wallet
///
/// Creating a [`Wallet`] using this method should be followed with a call to [`sync`] or
/// [`recover`] to retrieve the current checkpoint and balance for this [`Wallet`]. If the
/// [`restart`] to retrieve the current checkpoint and balance for this [`Wallet`]. If the
/// backing `signer` is known to be already initialized, a call to [`sync`] is enough,
/// otherwise, a call to [`recover`] is necessary to retrieve the full balance state.
/// otherwise, a call to [`restart`] is necessary to retrieve the full balance state.
///
/// [`sync`]: Self::sync
/// [`recover`]: Self::recover
/// [`restart`]: Self::restart
#[inline]
pub fn new(ledger: L, signer: S) -> Self {
Self::new_unchecked(ledger, Default::default(), signer, Default::default())
}

/// Starts a new wallet with `ledger` and `signer` connections.
#[inline]
pub async fn start(ledger: L, signer: S) -> Result<Self, Error<C, L, S>>
where
L: ledger::Read<SyncData<C>, Checkpoint = S::Checkpoint>,
{
let mut wallet = Self::new(ledger, signer);
wallet.restart().await?;
Ok(wallet)
}

/// Resets the state of the wallet to the default starting state.
#[inline]
fn reset_state(&mut self) {
self.checkpoint = Default::default();
self.assets = Default::default();
}

/// Returns the current balance associated with this `id`.
#[inline]
pub fn balance(&self, id: AssetId) -> AssetValue {
Expand Down Expand Up @@ -161,20 +178,8 @@ where
&self.checkpoint
}

/// Resets `self` to the default checkpoint and no balance. A call to this method should be
/// followed by a call to [`sync`](Self::sync) to retrieve the correct checkpoint and balance.
///
/// # Note
///
/// This is not a "full wallet recovery" which would involve resetting the signer as well as
/// this wallet state. See the [`recover`](Self::recover) method for more.
#[inline]
pub fn reset(&mut self) {
self.checkpoint = Default::default();
self.assets = Default::default();
}

/// Performs full wallet recovery.
/// Restarts `self` with an empty state and performs a synchronization against the signer and
/// ledger to catch up to the current checkpoint and balance state.
///
/// # Failure Conditions
///
Expand All @@ -183,15 +188,24 @@ where
/// [`InconsistencyError`] type for more information on the kinds of errors that can occur and
/// how to resolve them.
#[inline]
pub async fn recover(&mut self) -> Result<(), Error<C, L, S>>
pub async fn restart(&mut self) -> Result<(), Error<C, L, S>>
where
L: ledger::Read<SyncData<C>, Checkpoint = S::Checkpoint>,
{
self.reset();
self.reset_state();
self.load_initial_state().await?;
while self.sync_with(true).await?.is_continue() {}
Ok(())
}

/// Loads initial checkpoint and balance state from the signer. This method is used by
/// [`restart`](Self::restart) to avoid querying the ledger at genesis when a known later
/// checkpoint exists.
#[inline]
async fn load_initial_state(&mut self) -> Result<(), Error<C, L, S>> {
self.signer_sync(Default::default()).await
}

/// Pulls data from the ledger, synchronizing the wallet and balance state. This method loops
/// continuously calling [`sync_partial`](Self::sync_partial) until all the ledger data has
/// arrived at and has been synchronized with the wallet.
Expand Down Expand Up @@ -237,47 +251,61 @@ where
{
let ReadResponse {
should_continue,
next_checkpoint,
data,
} = self
.ledger
.read(&self.checkpoint)
.await
.map_err(Error::LedgerConnectionError)?;
if next_checkpoint < self.checkpoint {
return Err(Error::Inconsistency(InconsistencyError::LedgerCheckpoint));
}
self.signer_sync(SyncRequest {
with_recovery,
origin_checkpoint: self.checkpoint.clone(),
data,
})
.await?;
Ok(ControlFlow::should_continue(should_continue))
}

/// Performs a synchronization with the signer against the given `request`.
#[inline]
async fn signer_sync(
&mut self,
request: SyncRequest<C, S::Checkpoint>,
) -> Result<(), Error<C, L, S>> {
match self
.signer
.sync(SyncRequest {
with_recovery,
origin_checkpoint: self.checkpoint.clone(),
data,
})
.sync(request)
.await
.map_err(Error::SignerConnectionError)?
{
Ok(SyncResponse::Partial { deposit, withdraw }) => {
self.assets.deposit_all(deposit);
if !self.assets.withdraw_all(withdraw) {
return Err(Error::Inconsistency(InconsistencyError::WalletBalance));
Ok(SyncResponse {
checkpoint,
balance_update,
}) => {
match balance_update {
BalanceUpdate::Partial { deposit, withdraw } => {
self.assets.deposit_all(deposit);
if !self.assets.withdraw_all(withdraw) {
return Err(Error::Inconsistency(InconsistencyError::WalletBalance));
}
}
BalanceUpdate::Full { assets } => {
self.assets.clear();
self.assets.deposit_all(assets);
}
}
}
Ok(SyncResponse::Full { assets }) => {
self.assets.clear();
self.assets.deposit_all(assets);
self.checkpoint = checkpoint;
Ok(())
}
Err(SyncError::InconsistentSynchronization { checkpoint }) => {
if checkpoint < self.checkpoint {
self.checkpoint = checkpoint;
}
return Err(Error::Inconsistency(
Err(Error::Inconsistency(
InconsistencyError::SignerSynchronization,
));
))
}
}
self.checkpoint = next_checkpoint;
Ok(ControlFlow::should_continue(should_continue))
}

/// Checks if `transaction` can be executed on the balance state of `self`, returning the
Expand Down Expand Up @@ -373,23 +401,16 @@ where
)]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum InconsistencyError {
/// Ledger Checkpoint Inconsistency
///
/// This error state arises when the ledger checkpoint is behind the checkpoint of the wallet.
/// To resolve this error, ensure that the ledger connection is correct and try again. This
/// error does not result in a bad wallet state.
LedgerCheckpoint,

/// Wallet Balance Inconsistency
///
/// ⚠️ This error causes the wallet system to enter an inconsistent state. ⚠️
///
/// This error state arises whenever the signer requests a withdraw from the wallet that would
/// overdraw the balance. To resolve this error, ensure that the signer connection is correct
/// and perform a wallet reset by resetting the checkpoint and balance state with a call to
/// [`reset`](Wallet::reset). If other errors continue or if there is reason to suspect that the
/// signer or ledger connections (or their true state) are corrupted, a full recovery is
/// required. See the [`recover`](Wallet::recover) method for more.
/// and perform a wallet restart by resetting the checkpoint and balance state with a call to
/// [`restart`](Wallet::restart). If other errors continue or if there is reason to suspect that
/// the signer or ledger connections (or their true state) are corrupted, a full recovery is
/// required.
WalletBalance,

/// Signer Synchronization Inconsistency
Expand All @@ -398,10 +419,9 @@ pub enum InconsistencyError {
///
/// This error state arises whenever the signer gets behind the wallet checkpoint. To resolve
/// this error, ensure that the signer connection is correct and perform a wallet reset by
/// resetting the checkpoint and balance state with a call to [`reset`](Wallet::reset). If other
/// errors continue or if there is reason to suspect that the signer or ledger connections (or
/// their true state) are corrupted, a full recovery is required. See the
/// [`recover`](Wallet::recover) method for more.
/// resetting the checkpoint and balance state with a call to [`restart`](Wallet::restart). If
/// other errors continue or if there is reason to suspect that the signer or ledger connections
/// (or their true state) are corrupted, a full recovery is required.
SignerSynchronization,
}

Expand Down
Loading