Skip to content

Commit

Permalink
Merge pull request #485 from TheCharlatan/state_recovery3
Browse files Browse the repository at this point in the history
State Recovery Swapd Implementation

merging this now, issues from the discussion will be treated in separate PRs
  • Loading branch information
zkao authored Jun 30, 2022
2 parents 8cb4e93 + c9c7d62 commit dcc9b06
Show file tree
Hide file tree
Showing 15 changed files with 1,315 additions and 227 deletions.
2 changes: 1 addition & 1 deletion doc/sequencediagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 5 additions & 3 deletions doc/sequencediagram.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ t_wallet -> t_swap:Ctl Datum::SignedArbitratingLock
// DONE: do we know that same inputs are being used in case of replay?
// -> yes, but create different sig
t_wallet -> t_checkpoint : Ctl CheckpointWalletBobPreBuySig
t_wallet -> t_swap : Ctl Tx::Lock
t_swap -> t_syncer: Ctl Broadcast Arbitrating Lock
t_swap -> t_syncer : Watch Accordant Lock
t_wallet -> t_swap: Ctl Tx::Cancel
t_wallet -> t_swap: Ctl Tx::Refund
t_wallet -> t_swap : Ctl BuyProcedureSignature
t_swap -> t_checkpoint : Ctl CheckpointSwapBobPreBuySig
t_syncer <- t_swap : Broadcast Arbitrating Lock
t_swap -> t_syncer : Watch Accordant Lock
t_swap -> t_syncer : Watch Buy

parallel
Expand Down Expand Up @@ -156,7 +159,6 @@ t_wallet <- t_swap : Ctl Buy signature
t_wallet -> t_wallet : recover accordant keys

==Cancel Init t > t0: Bob is left, Alice right, either have a fully signed and valid cancel tx, and can publish==
// TODO: insert Ctl Tx::Cancel from wallet to swap (or do it after CoreArbitratingSetup, as in the code atm)
parallel
t_swap <- t_syncer : Ctl Cancel valid
m_swap <- m_syncer : Ctl Cancel valid
Expand Down
2 changes: 1 addition & 1 deletion doc/staterecovery_sequencediagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 6 additions & 1 deletion doc/staterecovery_sequencediagram.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
title State Recovery Procedure

// TODO: remove walletd once it's stateless
participant swapd
participant walletd
participant databased
participant farcasterd
participant cli

== State Recovery Procedure
entryspacing 0.8
farcasterd -> walletd : launch
farcasterd -> databased : launch
cli -> databased : RetrieveAllCheckpointInfo
databased -> cli : CheckpointList
databased -> farcasterd : CheckpointList
farcasterd -> cli : CheckpointList
cli -> farcasterd : RestoreCheckpoint
farcasterd -> swapd : launch
farcasterd -> databased : RestoreCheckpoint
databased -> walletd : Checkpoint
databased -> swapd: Checkpoint
119 changes: 89 additions & 30 deletions src/databased/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::farcaster_core::consensus::Encodable;
use crate::walletd::runtime::{CheckpointWallet, Wallet};
use farcaster_core::negotiation::PublicOffer;
use farcaster_core::swap::btcxmr::BtcXmr;
use farcaster_core::swap::SwapId;
Expand All @@ -24,8 +25,8 @@ use crate::{
rpc::{
request::{
self, BitcoinAddress, Checkpoint, CheckpointChunk, CheckpointEntry,
CheckpointMultipartChunk, Commit, Keys, List, MoneroAddress, Msg, Params, Reveal,
Token, Tx,
CheckpointMultipartChunk, CheckpointState, Commit, Keys, LaunchSwap, List,
MoneroAddress, Msg, NodeId, Params, Reveal, Token, Tx,
},
Request, ServiceBus,
},
Expand All @@ -35,7 +36,6 @@ use crate::{CtlServer, Error, Service, ServiceConfig, ServiceId};
use colored::Colorize;
use internet2::TypedEnum;
use microservices::esb::{self, Handler};
use request::{LaunchSwap, NodeId};

pub fn run(config: ServiceConfig, data_dir: PathBuf) -> Result<(), Error> {
let runtime = Runtime {
Expand Down Expand Up @@ -130,8 +130,15 @@ impl Runtime {
self.handle_rpc_ctl(endpoints, source, checkpoint_request)?;
}
}

Request::Checkpoint(request::Checkpoint { swap_id, state }) => {
Request::Checkpoint(Checkpoint { swap_id, state }) => {
match state {
CheckpointState::CheckpointWallet(_) => {
debug!("setting wallet checkpoint");
}
CheckpointState::CheckpointSwapd(_) => {
debug!("setting swap checkpoint");
}
};
let key = CheckpointKey {
swap_id,
service_id: source,
Expand All @@ -148,17 +155,57 @@ impl Runtime {
service_id: ServiceId::Wallet,
}) {
Ok(raw_state) => {
let state = request::CheckpointState::strict_decode(std::io::Cursor::new(
raw_state,
))
.expect("decoding the checkpoint should not fail");
checkpoint_send(
endpoints,
swap_id,
ServiceId::Database,
ServiceId::Wallet,
state,
)?;
match CheckpointState::strict_decode(std::io::Cursor::new(raw_state)) {
Ok(CheckpointState::CheckpointWallet(wallet)) => {
checkpoint_send(
endpoints,
swap_id,
ServiceId::Database,
ServiceId::Wallet,
CheckpointState::CheckpointWallet(wallet),
)?;
}
Ok(CheckpointState::CheckpointSwapd(_)) => {
error!(
"Decoded swapd checkpoint where walletd checkpoint was stored"
);
}
Err(err) => {
error!("Decoding the checkpoint failed: {}", err);
}
}
}
Err(err) => {
error!(
"Failed to retrieve checkpointed state for swap {}: {}",
swap_id, err
);
}
}
match self.database.get_checkpoint_state(&CheckpointKey {
swap_id,
service_id: ServiceId::Swap(swap_id),
}) {
Ok(raw_state) => {
match CheckpointState::strict_decode(std::io::Cursor::new(raw_state)) {
Ok(CheckpointState::CheckpointSwapd(state)) => {
checkpoint_send(
endpoints,
swap_id,
ServiceId::Database,
ServiceId::Swap(swap_id),
CheckpointState::CheckpointSwapd(state),
)?;
}
Ok(CheckpointState::CheckpointWallet(_)) => {
error!(
"Decoded walletd checkpoint were swapd checkpoint was stored"
);
}
Err(err) => {
error!("Decoding the checkpoint failed: {}", err);
}
}
}
Err(err) => {
error!(
Expand All @@ -175,21 +222,31 @@ impl Runtime {
.iter()
.filter_map(|(checkpoint_key, state)| {
let state =
request::CheckpointState::strict_decode(std::io::Cursor::new(state))
.ok()?;
CheckpointState::strict_decode(std::io::Cursor::new(state)).ok()?;
match checkpoint_key.service_id {
ServiceId::Wallet => match state {
request::CheckpointState::CheckpointWalletAlice(checkpoint) => {
Some(CheckpointEntry {
CheckpointState::CheckpointWallet(CheckpointWallet {
wallet,
..
}) => match wallet {
Wallet::Bob(wallet) => Some(CheckpointEntry {
swap_id: checkpoint_key.swap_id,
public_offer: checkpoint.pub_offer,
})
}
request::CheckpointState::CheckpointWalletBob(checkpoint) => {
Some(CheckpointEntry {
public_offer: wallet.pub_offer,
trade_role: wallet.local_trade_role,
}),
Wallet::Alice(wallet) => Some(CheckpointEntry {
swap_id: checkpoint_key.swap_id,
public_offer: checkpoint.pub_offer,
})
public_offer: wallet.pub_offer,
trade_role: wallet.local_trade_role,
}),
},
s => {
error!(
"Checkpoint {} not supported for service {}",
s,
ServiceId::Wallet
);
None
}
},
_ => None,
Expand All @@ -198,8 +255,8 @@ impl Runtime {
.collect();
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Database,
source,
ServiceId::Farcasterd,
Request::CheckpointList(checkpointed_pub_offers),
)?;
}
Expand Down Expand Up @@ -292,7 +349,7 @@ pub fn checkpoint_send(
swap_id: SwapId,
source: ServiceId,
destination: ServiceId,
state: request::CheckpointState,
state: CheckpointState,
) -> Result<(), Error> {
let mut serialized_state = vec![];
let size = state
Expand Down Expand Up @@ -374,7 +431,9 @@ struct Database(lmdb::Environment);

impl Database {
fn new(path: PathBuf) -> Result<Database, lmdb::Error> {
let env = lmdb::Environment::new().open(&path)?;
let env = lmdb::Environment::new()
.set_map_size(10485760 * 1024 * 64)
.open(&path)?;
env.create_db(None, lmdb::DatabaseFlags::empty())?;
Ok(Database(env))
}
Expand Down
108 changes: 103 additions & 5 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// along with this software.
// If not, see <https://opensource.org/licenses/MIT>.

use crate::farcasterd::runtime::request::CheckpointEntry;
use crate::farcasterd::runtime::request::MadeOffer;
use crate::farcasterd::runtime::request::TookOffer;
use crate::farcasterd::runtime::request::{ProgressEvent, SwapProgress};
Expand Down Expand Up @@ -147,6 +148,8 @@ pub fn run(
stats: none!(),
funding_xmr: none!(),
funding_btc: none!(),
checkpointed_pub_offers: vec![].into(),
restoring_swap_id: none!(),
config,
};

Expand Down Expand Up @@ -178,6 +181,8 @@ pub struct Runtime {
progress_subscriptions: HashMap<ServiceId, HashSet<ServiceId>>,
funding_btc: HashMap<SwapId, (bitcoin::Address, bitcoin::Amount, bool)>,
funding_xmr: HashMap<SwapId, (monero::Address, monero::Amount, bool)>,
checkpointed_pub_offers: List<CheckpointEntry>,
restoring_swap_id: HashSet<SwapId>,
stats: Stats,
config: Config,
}
Expand Down Expand Up @@ -549,6 +554,16 @@ impl Runtime {
}
}
ServiceId::Swap(swap_id) => {
if self.restoring_swap_id.remove(swap_id) {
info!("Restoring swap {}", swap_id.bright_blue_italic());
self.stats.incr_initiated();
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
ServiceId::Database,
Request::RestoreCheckpoint(*swap_id),
)?;
}
if self.running_swaps.insert(*swap_id) {
info!(
"Swap {} is registered; total {} swaps are known",
Expand Down Expand Up @@ -895,7 +910,18 @@ impl Runtime {
)?;
}

Request::CheckpointList(checkpointed_pub_offers) => {
self.checkpointed_pub_offers = checkpointed_pub_offers.clone();
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
source,
Request::CheckpointList(checkpointed_pub_offers),
)?;
}

Request::RestoreCheckpoint(swap_id) => {
// check if wallet is running
if endpoints
.send_to(
ServiceBus::Msg,
Expand All @@ -916,17 +942,89 @@ impl Runtime {
)?;
return Ok(());
}
endpoints.send_to(
ServiceBus::Ctl,

// check if swapd is not running
if endpoints
.send_to(
ServiceBus::Msg,
ServiceId::Farcasterd,
ServiceId::Swap(swap_id.clone()),
Request::Hello,
)
.is_ok()
{
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
source,
Request::Failure(Failure {
code: 1,
info: "Cannot restore a checkpoint into a running swap.".to_string(),
}),
)?;
return Ok(());
}

let CheckpointEntry {
public_offer,
trade_role,
..
} = match self
.checkpointed_pub_offers
.iter()
.find(|entry| entry.swap_id == swap_id)
{
Some(ce) => ce,
None => {
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
source,
Request::Failure(Failure {
code: 1,
info: "No checkpoint found with given swap id, aborting restore."
.to_string(),
}),
)?;
return Ok(());
}
};
self.restoring_swap_id.insert(swap_id);
syncers_up(
ServiceId::Farcasterd,
ServiceId::Database,
Request::RestoreCheckpoint(swap_id),
&mut self.spawning_services,
&self.syncer_services,
&mut self.syncer_clients,
Coin::Bitcoin,
public_offer.offer.network,
swap_id,
&self.config,
)?;
syncers_up(
ServiceId::Farcasterd,
&mut self.spawning_services,
&self.syncer_services,
&mut self.syncer_clients,
Coin::Monero,
public_offer.offer.network,
swap_id,
&self.config,
)?;

let _child = launch(
"swapd",
&[
swap_id.to_hex(),
public_offer.to_string(),
trade_role.to_string(),
],
)?;

endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
source,
Request::String("Restored checkpoint.".to_string()),
Request::String("Restoring checkpoint.".to_string()),
)?;
}

Expand Down
3 changes: 2 additions & 1 deletion src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ pub use request::Request;

use microservices::esb::BusId;
use microservices::rpc_connection::Api;
use strict_encoding::{StrictDecode, StrictEncode};

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Display)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Display, StrictEncode, StrictDecode)]
pub enum ServiceBus {
#[display("MSG")]
Msg,
Expand Down
Loading

0 comments on commit dcc9b06

Please sign in to comment.