From 84a04505290adadf4fe56c0011296bc2730f39f9 Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Fri, 2 Sep 2022 09:26:37 +0200 Subject: [PATCH] Database: Add database index for Checkpoint entries --- src/databased/runtime.rs | 166 ++++++++++++++++++++++----------------- 1 file changed, 94 insertions(+), 72 deletions(-) diff --git a/src/databased/runtime.rs b/src/databased/runtime.rs index c8be2d3b4..f4992c4f1 100644 --- a/src/databased/runtime.rs +++ b/src/databased/runtime.rs @@ -1,7 +1,7 @@ use crate::databased::runtime::request::{ Address, OfferStatus, OfferStatusPair, OfferStatusSelector, }; -use crate::walletd::runtime::{CheckpointWallet, Wallet}; +use crate::walletd::runtime::{AliceState, BobState, Wallet}; use farcaster_core::blockchain::Blockchain; use farcaster_core::swap::btcxmr::PublicOffer; use farcaster_core::swap::SwapId; @@ -61,7 +61,8 @@ impl esb::Handler for Runtime { } } - fn handle_err(&mut self, _: &mut Endpoints, _: esb::Error) -> Result<(), Error> { + fn handle_err(&mut self, _: &mut Endpoints, error: esb::Error) -> Result<(), Error> { + error!("Error during Request processing: {:?}", error); // We do nothing and do not propagate error; it's already being reported // with `error!` macro by the controller. If we propagate error here // this will make whole daemon panic @@ -103,8 +104,32 @@ impl Runtime { } Request::Checkpoint(Checkpoint { swap_id, state }) => { - match state { - CheckpointState::CheckpointWallet(_) => { + match &state { + CheckpointState::CheckpointWallet(wallet_checkpoint) => { + let info = match &wallet_checkpoint.wallet { + Wallet::Alice(AliceState { + local_trade_role, + pub_offer, + .. + }) => CheckpointEntry { + swap_id, + public_offer: pub_offer.clone(), + trade_role: local_trade_role.clone(), + }, + Wallet::Bob(BobState { + local_trade_role, + pub_offer, + .. + }) => CheckpointEntry { + swap_id, + public_offer: pub_offer.clone(), + trade_role: local_trade_role.clone(), + }, + }; + debug!("setting checkpoint info entry"); + let mut info_encoded = vec![]; + let _info_size = info.strict_encode(&mut info_encoded); + self.database.set_checkpoint_info(&swap_id, &info_encoded)?; debug!("setting wallet checkpoint"); } CheckpointState::CheckpointSwapd(_) => { @@ -189,42 +214,17 @@ impl Runtime { } Request::RetrieveAllCheckpointInfo => { - let pairs = self.database.get_checkpoint_key_value_pairs()?; + let pairs = self.database.get_all_checkpoint_info()?; + debug!("pairs: {:?}", pairs); let checkpointed_pub_offers: List = pairs .iter() - .filter_map(|(checkpoint_key, state)| { - let state = - CheckpointState::strict_decode(std::io::Cursor::new(state)).ok()?; - match checkpoint_key.service_id { - ServiceId::Wallet => match state { - CheckpointState::CheckpointWallet(CheckpointWallet { - wallet, - .. - }) => match wallet { - Wallet::Bob(wallet) => Some(CheckpointEntry { - swap_id: checkpoint_key.swap_id, - public_offer: wallet.pub_offer, - trade_role: wallet.local_trade_role, - }), - Wallet::Alice(wallet) => Some(CheckpointEntry { - swap_id: checkpoint_key.swap_id, - public_offer: wallet.pub_offer, - trade_role: wallet.local_trade_role, - }), - }, - s => { - error!( - "Checkpoint {} not supported for service {}", - s, - ServiceId::Wallet - ); - None - } - }, - _ => None, - } + .filter_map(|(_, info)| { + CheckpointEntry::strict_decode(std::io::Cursor::new(info)).ok() }) .collect(); + + debug!("checkpointed pub offers: {:?}", checkpointed_pub_offers); + endpoints.send_to( ServiceBus::Ctl, self.identity(), @@ -234,33 +234,17 @@ impl Runtime { } Request::GetCheckpointEntry(swap_id) => { - let entry = match self.database.get_checkpoint_state(&CheckpointKey { - swap_id, - service_id: ServiceId::Wallet, - }) { + let entry = match self.database.get_checkpoint_info(&swap_id) { Ok(raw_state) => { - match CheckpointState::strict_decode(std::io::Cursor::new(raw_state)) { - Ok(CheckpointState::CheckpointWallet(CheckpointWallet { - wallet, - .. - })) => match wallet { - Wallet::Bob(wallet) => Some(CheckpointEntry { - swap_id, - public_offer: wallet.pub_offer, - trade_role: wallet.local_trade_role, - }), - Wallet::Alice(wallet) => Some(CheckpointEntry { - swap_id, - public_offer: wallet.pub_offer, - trade_role: wallet.local_trade_role, - }), - }, + match CheckpointEntry::strict_decode(std::io::Cursor::new(raw_state)) { + Ok(info) => Some(info), _ => None, } } _ => None, }; if let Some(entry) = entry { + debug!("checkpoint entry: {:?}", entry); endpoints.send_to( ServiceBus::Ctl, self.identity(), @@ -285,13 +269,22 @@ impl Runtime { swap_id, service_id: ServiceId::Wallet, }) { - debug!("{} | Did not delete checkpoint entry: {}", swap_id, err); + debug!( + "{} | Did not delete checkpoint wallet entry: {}", + swap_id, err + ); } if let Err(err) = self.database.delete_checkpoint_state(CheckpointKey { swap_id, service_id: ServiceId::Swap(swap_id), }) { - debug!("{} | Did not delete checkpoint entry: {}", swap_id, err); + debug!( + "{} | Did not delete checkpoint swap entry: {}", + swap_id, err + ); + } + if let Err(err) = self.database.delete_checkpoint_info(swap_id) { + debug!("{} | Did not delete checkpoint info: {}", swap_id, err); } } @@ -452,6 +445,7 @@ impl From> for CheckpointKey { struct Database(lmdb::Environment); const LMDB_CHECKPOINTS: &str = "checkpoints"; +const LMDB_CHECKPOINT_INFOS: &str = "checkpoint_infos"; const LMDB_BITCOIN_ADDRESSES: &str = "bitcoin_addresses"; const LMDB_MONERO_ADDRESSES: &str = "monero_addresses"; const LMDB_OFFER_HISTORY: &str = "offer_history"; @@ -460,9 +454,10 @@ impl Database { fn new(path: PathBuf) -> Result { let env = lmdb::Environment::new() .set_map_size(10485760 * 1024 * 64) - .set_max_dbs(4) + .set_max_dbs(10) .open(&path)?; env.create_db(Some(LMDB_CHECKPOINTS), lmdb::DatabaseFlags::empty())?; + env.create_db(Some(LMDB_CHECKPOINT_INFOS), lmdb::DatabaseFlags::empty())?; env.create_db(Some(LMDB_BITCOIN_ADDRESSES), lmdb::DatabaseFlags::empty())?; env.create_db(Some(LMDB_OFFER_HISTORY), lmdb::DatabaseFlags::empty())?; env.create_db(Some(LMDB_MONERO_ADDRESSES), lmdb::DatabaseFlags::empty())?; @@ -638,6 +633,41 @@ impl Database { Ok(()) } + fn set_checkpoint_info(&mut self, key: &SwapId, val: &[u8]) -> Result<(), lmdb::Error> { + let db = self.0.open_db(Some(LMDB_CHECKPOINT_INFOS))?; + let mut tx = self.0.begin_rw_txn()?; + if tx.get(db, &key.as_bytes()).is_ok() { + tx.del(db, &key.as_bytes(), None)?; + } + tx.put(db, &key.as_bytes(), &val, lmdb::WriteFlags::empty())?; + tx.commit()?; + Ok(()) + } + + fn get_checkpoint_info(&mut self, key: &SwapId) -> Result, lmdb::Error> { + let db = self.0.open_db(Some(LMDB_CHECKPOINT_INFOS))?; + let tx = self.0.begin_ro_txn()?; + let val: Vec = tx.get(db, &key.as_bytes())?.into(); + tx.abort(); + Ok(val) + } + + fn get_all_checkpoint_info(&mut self) -> Result)>, lmdb::Error> { + let db = self.0.open_db(Some(LMDB_CHECKPOINT_INFOS))?; + let tx = self.0.begin_ro_txn()?; + let mut cursor = tx.open_ro_cursor(db)?; + let res = cursor + .iter() + .map(|(key, value)| { + debug!("key: {:?}", key); + (SwapId::from_slice(key), value.to_vec()) + }) + .collect(); + drop(cursor); + tx.abort(); + Ok(res) + } + fn get_checkpoint_state(&mut self, key: &CheckpointKey) -> Result, lmdb::Error> { let db = self.0.open_db(Some(LMDB_CHECKPOINTS))?; let tx = self.0.begin_ro_txn()?; @@ -654,19 +684,12 @@ impl Database { Ok(()) } - fn get_checkpoint_key_value_pairs( - &mut self, - ) -> Result)>, lmdb::Error> { + fn delete_checkpoint_info(&mut self, key: SwapId) -> Result<(), lmdb::Error> { let db = self.0.open_db(Some(LMDB_CHECKPOINTS))?; - let tx = self.0.begin_ro_txn()?; - let mut cursor = tx.open_ro_cursor(db)?; - let res = cursor - .iter() - .map(|(key, value)| (CheckpointKey::from(key.to_vec()), value.to_vec())) - .collect(); - drop(cursor); - tx.abort(); - Ok(res) + let mut tx = self.0.begin_rw_txn()?; + tx.del(db, &key.as_bytes(), None)?; + tx.commit()?; + Ok(()) } } @@ -698,7 +721,6 @@ fn test_lmdb_state() { database.delete_checkpoint_state(key2.clone()).unwrap(); let res = database.get_checkpoint_state(&key2); assert!(res.is_err()); - database.get_checkpoint_key_value_pairs().unwrap(); let sk = SecretKey::new(&mut bitcoin::secp256k1::rand::thread_rng()); let private_key =