Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State Recovery Swapd Implementation #485

Merged
merged 33 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f20ea8b
Swapd: Serialize state for checkpointing
TheCharlatan Jun 9, 2022
6001e84
Swapd: Add state checkpointing
TheCharlatan Jun 9, 2022
c55c80e
Swapd: Handle checkpoint state restore
TheCharlatan Jun 9, 2022
76fc897
Checkpoint: Start syncers on restore
TheCharlatan Jun 9, 2022
d5b9ef2
Checkpoint: Restore watch height and watch tx
TheCharlatan Jun 9, 2022
bbeb0db
Checkpoint: Patch restore workflow
TheCharlatan Jun 10, 2022
63b1bdb
Checkpoint: Report failure if no checkpoint was found to restore
TheCharlatan Jun 10, 2022
e09379b
Swapd: Add progress message on successful restore
TheCharlatan Jun 10, 2022
f384089
Swapd: Improve restore checkpoint data
TheCharlatan Jun 10, 2022
a6bed90
Walletd: Send Tx's before BuyProcedureSig
TheCharlatan Jun 10, 2022
e89d38e
Sequence Diagram: Reflect reality of swap cancel and refund tx retrieval
TheCharlatan Jun 11, 2022
6c22c37
Walletd: Use the same fee amount as swapd
TheCharlatan Jun 11, 2022
e2a8991
fixup! Swapd: Improve restore checkpoint data
TheCharlatan Jun 11, 2022
e47c0bd
Checkpoint: Add xmr addrs to wallet checkpoint
TheCharlatan Jun 12, 2022
679668e
Swap test: Add restore swap refund test
TheCharlatan Jun 12, 2022
8b613aa
Swap test: Add restore swap buy test
TheCharlatan Jun 13, 2022
7863290
Database: Boost lmdb map size
TheCharlatan Jun 13, 2022
437f29c
Swapd: Add xmr address to state and checkpoint
TheCharlatan Jun 14, 2022
0672d28
Request: improve display for logs
TheCharlatan Jun 14, 2022
fb37022
Walletd: Correct bob pre buy checkpoint
TheCharlatan Jun 14, 2022
5fadcae
Checkpointing: Remove debugging sleeps and logs
TheCharlatan Jun 14, 2022
a71195a
Swapd: Use exported functions to handle multipart messages
TheCharlatan Jun 21, 2022
64c90b9
Swapd: Correct checkpoint data after rebase
TheCharlatan Jun 21, 2022
0fd7b72
Doc: Update state recovery sequencediagram
TheCharlatan Jun 26, 2022
6304670
Farcasterd: Check if swapd is not running before restoring checkpoint
TheCharlatan Jun 26, 2022
f2ec729
Farcasterd: Incr initiated swaps on checkpoint restore
TheCharlatan Jun 26, 2022
b711b76
Walletd: Log reason for failed tx refund retrieval
TheCharlatan Jun 27, 2022
ce66697
Rpc: Correct swapd checkpoint display
TheCharlatan Jun 27, 2022
18a9d92
Swapd: Revert changes to tx removal to invalidate prior states
TheCharlatan Jun 28, 2022
8660617
Swapd: Revert changes to tx retrieval to ensure tx's are not re-broad…
TheCharlatan Jun 29, 2022
70dfa6c
Swapd: On Checkpoint restore, do not re-watch tx's, only previously w…
TheCharlatan Jun 29, 2022
b00f632
Walletd: Log warn if wallet already exists on checkpoint restore
TheCharlatan Jun 29, 2022
c9c7d62
syncer_client: homogenize the way tasks are produced
Jun 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/sequencediagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 5 additions & 3 deletions doc/sequencediagram.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ t_wallet -> t_swap:Ctl Datum::SignedArbitratingLock
// DONE: do we know that same inputs are being used in case of replay?
// -> yes, but create different sig
t_wallet -> t_checkpoint : Ctl CheckpointWalletBobPreBuySig
t_wallet -> t_swap : Ctl Tx::Lock
t_swap -> t_syncer: Ctl Broadcast Arbitrating Lock
t_swap -> t_syncer : Watch Accordant Lock
t_wallet -> t_swap: Ctl Tx::Cancel
t_wallet -> t_swap: Ctl Tx::Refund
t_wallet -> t_swap : Ctl BuyProcedureSignature
t_swap -> t_checkpoint : Ctl CheckpointSwapBobPreBuySig
t_syncer <- t_swap : Broadcast Arbitrating Lock
t_swap -> t_syncer : Watch Accordant Lock
t_swap -> t_syncer : Watch Buy

parallel
Expand Down Expand Up @@ -156,7 +159,6 @@ t_wallet <- t_swap : Ctl Buy signature
t_wallet -> t_wallet : recover accordant keys

==Cancel Init t > t0: Bob is left, Alice right, either have a fully signed and valid cancel tx, and can publish==
// TODO: insert Ctl Tx::Cancel from wallet to swap (or do it after CoreArbitratingSetup, as in the code atm)
parallel
t_swap <- t_syncer : Ctl Cancel valid
m_swap <- m_syncer : Ctl Cancel valid
Expand Down
2 changes: 1 addition & 1 deletion doc/staterecovery_sequencediagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 6 additions & 1 deletion doc/staterecovery_sequencediagram.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
title State Recovery Procedure

// TODO: remove walletd once it's stateless
participant swapd
participant walletd
participant databased
participant farcasterd
participant cli

== State Recovery Procedure
entryspacing 0.8
farcasterd -> walletd : launch
farcasterd -> databased : launch
cli -> databased : RetrieveAllCheckpointInfo
databased -> cli : CheckpointList
databased -> farcasterd : CheckpointList
zkao marked this conversation as resolved.
Show resolved Hide resolved
farcasterd -> cli : CheckpointList
cli -> farcasterd : RestoreCheckpoint
farcasterd -> swapd : launch
farcasterd -> databased : RestoreCheckpoint
databased -> walletd : Checkpoint
databased -> swapd: Checkpoint
119 changes: 89 additions & 30 deletions src/databased/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::farcaster_core::consensus::Encodable;
use crate::walletd::runtime::{CheckpointWallet, Wallet};
use farcaster_core::negotiation::PublicOffer;
use farcaster_core::swap::btcxmr::BtcXmr;
use farcaster_core::swap::SwapId;
Expand All @@ -24,8 +25,8 @@ use crate::{
rpc::{
request::{
self, BitcoinAddress, Checkpoint, CheckpointChunk, CheckpointEntry,
CheckpointMultipartChunk, Commit, Keys, List, MoneroAddress, Msg, Params, Reveal,
Token, Tx,
CheckpointMultipartChunk, CheckpointState, Commit, Keys, LaunchSwap, List,
MoneroAddress, Msg, NodeId, Params, Reveal, Token, Tx,
},
Request, ServiceBus,
},
Expand All @@ -35,7 +36,6 @@ use crate::{CtlServer, Error, Service, ServiceConfig, ServiceId};
use colored::Colorize;
use internet2::TypedEnum;
use microservices::esb::{self, Handler};
use request::{LaunchSwap, NodeId};

pub fn run(config: ServiceConfig, data_dir: PathBuf) -> Result<(), Error> {
let runtime = Runtime {
Expand Down Expand Up @@ -130,8 +130,15 @@ impl Runtime {
self.handle_rpc_ctl(endpoints, source, checkpoint_request)?;
}
}

Request::Checkpoint(request::Checkpoint { swap_id, state }) => {
Request::Checkpoint(Checkpoint { swap_id, state }) => {
match state {
CheckpointState::CheckpointWallet(_) => {
debug!("setting wallet checkpoint");
}
CheckpointState::CheckpointSwapd(_) => {
debug!("setting swap checkpoint");
}
};
let key = CheckpointKey {
swap_id,
service_id: source,
Expand All @@ -148,17 +155,57 @@ impl Runtime {
service_id: ServiceId::Wallet,
}) {
Ok(raw_state) => {
let state = request::CheckpointState::strict_decode(std::io::Cursor::new(
raw_state,
))
.expect("decoding the checkpoint should not fail");
checkpoint_send(
endpoints,
swap_id,
ServiceId::Database,
ServiceId::Wallet,
state,
)?;
match CheckpointState::strict_decode(std::io::Cursor::new(raw_state)) {
Ok(CheckpointState::CheckpointWallet(wallet)) => {
checkpoint_send(
endpoints,
swap_id,
ServiceId::Database,
ServiceId::Wallet,
CheckpointState::CheckpointWallet(wallet),
)?;
}
Ok(CheckpointState::CheckpointSwapd(_)) => {
error!(
"Decoded swapd checkpoint where walletd checkpoint was stored"
);
}
Err(err) => {
error!("Decoding the checkpoint failed: {}", err);
}
}
}
Err(err) => {
error!(
"Failed to retrieve checkpointed state for swap {}: {}",
swap_id, err
);
}
}
match self.database.get_checkpoint_state(&CheckpointKey {
swap_id,
service_id: ServiceId::Swap(swap_id),
}) {
Ok(raw_state) => {
match CheckpointState::strict_decode(std::io::Cursor::new(raw_state)) {
Ok(CheckpointState::CheckpointSwapd(state)) => {
checkpoint_send(
endpoints,
swap_id,
ServiceId::Database,
ServiceId::Swap(swap_id),
CheckpointState::CheckpointSwapd(state),
)?;
}
Ok(CheckpointState::CheckpointWallet(_)) => {
error!(
"Decoded walletd checkpoint were swapd checkpoint was stored"
);
}
Err(err) => {
error!("Decoding the checkpoint failed: {}", err);
}
}
}
Err(err) => {
error!(
Expand All @@ -175,21 +222,31 @@ impl Runtime {
.iter()
.filter_map(|(checkpoint_key, state)| {
let state =
request::CheckpointState::strict_decode(std::io::Cursor::new(state))
.ok()?;
CheckpointState::strict_decode(std::io::Cursor::new(state)).ok()?;
match checkpoint_key.service_id {
ServiceId::Wallet => match state {
request::CheckpointState::CheckpointWalletAlice(checkpoint) => {
Some(CheckpointEntry {
CheckpointState::CheckpointWallet(CheckpointWallet {
wallet,
..
}) => match wallet {
Wallet::Bob(wallet) => Some(CheckpointEntry {
swap_id: checkpoint_key.swap_id,
public_offer: checkpoint.pub_offer,
})
}
request::CheckpointState::CheckpointWalletBob(checkpoint) => {
Some(CheckpointEntry {
public_offer: wallet.pub_offer,
trade_role: wallet.local_trade_role,
}),
Wallet::Alice(wallet) => Some(CheckpointEntry {
swap_id: checkpoint_key.swap_id,
public_offer: checkpoint.pub_offer,
})
public_offer: wallet.pub_offer,
trade_role: wallet.local_trade_role,
}),
},
s => {
error!(
"Checkpoint {} not supported for service {}",
s,
ServiceId::Wallet
);
None
}
},
_ => None,
Expand All @@ -198,8 +255,8 @@ impl Runtime {
.collect();
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Database,
source,
ServiceId::Farcasterd,
Request::CheckpointList(checkpointed_pub_offers),
)?;
}
Expand Down Expand Up @@ -292,7 +349,7 @@ pub fn checkpoint_send(
swap_id: SwapId,
source: ServiceId,
destination: ServiceId,
state: request::CheckpointState,
state: CheckpointState,
) -> Result<(), Error> {
let mut serialized_state = vec![];
let size = state
Expand Down Expand Up @@ -374,7 +431,9 @@ struct Database(lmdb::Environment);

impl Database {
fn new(path: PathBuf) -> Result<Database, lmdb::Error> {
let env = lmdb::Environment::new().open(&path)?;
let env = lmdb::Environment::new()
.set_map_size(10485760 * 1024 * 64)
.open(&path)?;
env.create_db(None, lmdb::DatabaseFlags::empty())?;
Ok(Database(env))
}
Expand Down
108 changes: 103 additions & 5 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// along with this software.
// If not, see <https://opensource.org/licenses/MIT>.

use crate::farcasterd::runtime::request::CheckpointEntry;
use crate::farcasterd::runtime::request::MadeOffer;
use crate::farcasterd::runtime::request::TookOffer;
use crate::farcasterd::runtime::request::{ProgressEvent, SwapProgress};
Expand Down Expand Up @@ -147,6 +148,8 @@ pub fn run(
stats: none!(),
funding_xmr: none!(),
funding_btc: none!(),
checkpointed_pub_offers: vec![].into(),
restoring_swap_id: none!(),
config,
};

Expand Down Expand Up @@ -178,6 +181,8 @@ pub struct Runtime {
progress_subscriptions: HashMap<ServiceId, HashSet<ServiceId>>,
funding_btc: HashMap<SwapId, (bitcoin::Address, bitcoin::Amount, bool)>,
funding_xmr: HashMap<SwapId, (monero::Address, monero::Amount, bool)>,
checkpointed_pub_offers: List<CheckpointEntry>,
restoring_swap_id: HashSet<SwapId>,
stats: Stats,
config: Config,
}
Expand Down Expand Up @@ -549,6 +554,16 @@ impl Runtime {
}
}
ServiceId::Swap(swap_id) => {
if self.restoring_swap_id.remove(swap_id) {
info!("Restoring swap {}", swap_id.bright_blue_italic());
self.stats.incr_initiated();
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
ServiceId::Database,
Request::RestoreCheckpoint(*swap_id),
)?;
}
if self.running_swaps.insert(*swap_id) {
info!(
"Swap {} is registered; total {} swaps are known",
Expand Down Expand Up @@ -895,7 +910,18 @@ impl Runtime {
)?;
}

Request::CheckpointList(checkpointed_pub_offers) => {
self.checkpointed_pub_offers = checkpointed_pub_offers.clone();
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
source,
Request::CheckpointList(checkpointed_pub_offers),
)?;
}

Request::RestoreCheckpoint(swap_id) => {
// check if wallet is running
if endpoints
.send_to(
ServiceBus::Msg,
Expand All @@ -916,17 +942,89 @@ impl Runtime {
)?;
return Ok(());
}
endpoints.send_to(
ServiceBus::Ctl,

// check if swapd is not running
if endpoints
.send_to(
ServiceBus::Msg,
ServiceId::Farcasterd,
ServiceId::Swap(swap_id.clone()),
Request::Hello,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced reusing Request::Hello is better than creating a new Request::NewVariant, that inherits no further semantics

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

farcasterd being the broker will send (or only receive?) this Hello request on the background as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can add a ConnectionTest?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

)
.is_ok()
{
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
source,
Request::Failure(Failure {
code: 1,
info: "Cannot restore a checkpoint into a running swap.".to_string(),
}),
)?;
return Ok(());
}

let CheckpointEntry {
public_offer,
trade_role,
..
Comment on lines +968 to +971
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the way we launch swapd is appropriate on the context of state recovery

passing public_offer and trade_role made sense previously, but should probably go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nice to have these, so you can retrieve them when you do GetInfo on the swap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my point is not about not having them. It's about not passing them as command line args, passing them through the Ctl bus instead

} = match self
.checkpointed_pub_offers
.iter()
.find(|entry| entry.swap_id == swap_id)
{
Some(ce) => ce,
None => {
endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
source,
Request::Failure(Failure {
code: 1,
info: "No checkpoint found with given swap id, aborting restore."
.to_string(),
}),
)?;
return Ok(());
}
};
self.restoring_swap_id.insert(swap_id);
syncers_up(
ServiceId::Farcasterd,
ServiceId::Database,
Request::RestoreCheckpoint(swap_id),
&mut self.spawning_services,
&self.syncer_services,
&mut self.syncer_clients,
Coin::Bitcoin,
public_offer.offer.network,
swap_id,
&self.config,
)?;
syncers_up(
ServiceId::Farcasterd,
&mut self.spawning_services,
&self.syncer_services,
&mut self.syncer_clients,
Coin::Monero,
public_offer.offer.network,
swap_id,
&self.config,
)?;

let _child = launch(
TheCharlatan marked this conversation as resolved.
Show resolved Hide resolved
"swapd",
&[
swap_id.to_hex(),
public_offer.to_string(),
trade_role.to_string(),
],
Comment on lines +1014 to +1020
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unconvinced weather public_offer and trade_role should be command line args here

)?;

endpoints.send_to(
ServiceBus::Ctl,
ServiceId::Farcasterd,
source,
Request::String("Restored checkpoint.".to_string()),
Request::String("Restoring checkpoint.".to_string()),
)?;
}

Expand Down
3 changes: 2 additions & 1 deletion src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ pub use request::Request;

use microservices::esb::BusId;
use microservices::rpc_connection::Api;
use strict_encoding::{StrictDecode, StrictEncode};

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Display)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Display, StrictEncode, StrictDecode)]
pub enum ServiceBus {
#[display("MSG")]
Msg,
Expand Down
Loading