Skip to content

Commit

Permalink
Database: Add database index for Checkpoint entries
Browse files Browse the repository at this point in the history
  • Loading branch information
TheCharlatan committed Oct 9, 2022
1 parent 4ed3af4 commit 076afff
Showing 1 changed file with 94 additions and 72 deletions.
166 changes: 94 additions & 72 deletions src/databased/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::walletd::runtime::{CheckpointWallet, Wallet};
use crate::walletd::runtime::{AliceState, BobState, Wallet};
use farcaster_core::blockchain::Blockchain;
use farcaster_core::swap::btcxmr::PublicOffer;
use farcaster_core::swap::SwapId;
Expand Down Expand Up @@ -62,7 +62,8 @@ impl esb::Handler<ServiceBus> for Runtime {
}
}

fn handle_err(&mut self, _: &mut Endpoints, _: esb::Error<ServiceId>) -> Result<(), Error> {
fn handle_err(&mut self, _: &mut Endpoints, error: esb::Error<ServiceId>) -> Result<(), Error> {
error!("Error during Request processing: {:?}", error);
// We do nothing and do not propagate error; it's already being reported
// with `error!` macro by the controller. If we propagate error here
// this will make whole daemon panic
Expand Down Expand Up @@ -104,8 +105,32 @@ impl Runtime {
}

BusMsg::Ctl(Ctl::Checkpoint(Checkpoint { swap_id, state })) => {
match state {
CheckpointState::CheckpointWallet(_) => {
match &state {
CheckpointState::CheckpointWallet(wallet_checkpoint) => {
let info = match &wallet_checkpoint.wallet {
Wallet::Alice(AliceState {
local_trade_role,
pub_offer,
..
}) => CheckpointEntry {
swap_id,
public_offer: pub_offer.clone(),
trade_role: local_trade_role.clone(),
},
Wallet::Bob(BobState {
local_trade_role,
pub_offer,
..
}) => CheckpointEntry {
swap_id,
public_offer: pub_offer.clone(),
trade_role: local_trade_role.clone(),
},
};
debug!("setting checkpoint info entry");
let mut info_encoded = vec![];
let _info_size = info.strict_encode(&mut info_encoded);
self.database.set_checkpoint_info(&swap_id, &info_encoded)?;
debug!("setting wallet checkpoint");
}
CheckpointState::CheckpointSwapd(_) => {
Expand Down Expand Up @@ -194,13 +219,22 @@ impl Runtime {
swap_id,
service_id: ServiceId::Wallet,
}) {
debug!("{} | Did not delete checkpoint entry: {}", swap_id, err);
debug!(
"{} | Did not delete checkpoint wallet entry: {}",
swap_id, err
);
}
if let Err(err) = self.database.delete_checkpoint_state(CheckpointKey {
swap_id,
service_id: ServiceId::Swap(swap_id),
}) {
debug!("{} | Did not delete checkpoint entry: {}", swap_id, err);
debug!(
"{} | Did not delete checkpoint swap entry: {}",
swap_id, err
);
}
if let Err(err) = self.database.delete_checkpoint_info(swap_id) {
debug!("{} | Did not delete checkpoint info: {}", swap_id, err);
}
}

Expand Down Expand Up @@ -249,42 +283,17 @@ impl Runtime {
}

Rpc::RetrieveAllCheckpointInfo => {
let pairs = self.database.get_checkpoint_key_value_pairs()?;
let pairs = self.database.get_all_checkpoint_info()?;
debug!("pairs: {:?}", pairs);
let checkpointed_pub_offers: List<CheckpointEntry> = pairs
.iter()
.filter_map(|(checkpoint_key, state)| {
let state =
CheckpointState::strict_decode(std::io::Cursor::new(state)).ok()?;
match checkpoint_key.service_id {
ServiceId::Wallet => match state {
CheckpointState::CheckpointWallet(CheckpointWallet {
wallet,
..
}) => match wallet {
Wallet::Bob(wallet) => Some(CheckpointEntry {
swap_id: checkpoint_key.swap_id,
public_offer: wallet.pub_offer,
trade_role: wallet.local_trade_role,
}),
Wallet::Alice(wallet) => Some(CheckpointEntry {
swap_id: checkpoint_key.swap_id,
public_offer: wallet.pub_offer,
trade_role: wallet.local_trade_role,
}),
},
s => {
error!(
"Checkpoint {} not supported for service {}",
s,
ServiceId::Wallet
);
None
}
},
_ => None,
}
.filter_map(|(_, info)| {
CheckpointEntry::strict_decode(std::io::Cursor::new(info)).ok()
})
.collect();

debug!("checkpointed pub offers: {:?}", checkpointed_pub_offers);

endpoints.send_to(
ServiceBus::Rpc,
self.identity(),
Expand All @@ -294,33 +303,17 @@ impl Runtime {
}

Rpc::GetCheckpointEntry(swap_id) => {
let entry = match self.database.get_checkpoint_state(&CheckpointKey {
swap_id,
service_id: ServiceId::Wallet,
}) {
let entry = match self.database.get_checkpoint_info(&swap_id) {
Ok(raw_state) => {
match CheckpointState::strict_decode(std::io::Cursor::new(raw_state)) {
Ok(CheckpointState::CheckpointWallet(CheckpointWallet {
wallet,
..
})) => match wallet {
Wallet::Bob(wallet) => Some(CheckpointEntry {
swap_id,
public_offer: wallet.pub_offer,
trade_role: wallet.local_trade_role,
}),
Wallet::Alice(wallet) => Some(CheckpointEntry {
swap_id,
public_offer: wallet.pub_offer,
trade_role: wallet.local_trade_role,
}),
},
match CheckpointEntry::strict_decode(std::io::Cursor::new(raw_state)) {
Ok(info) => Some(info),
_ => None,
}
}
_ => None,
};
if let Some(entry) = entry {
debug!("checkpoint entry: {:?}", entry);
endpoints.send_to(
ServiceBus::Rpc,
self.identity(),
Expand Down Expand Up @@ -468,6 +461,7 @@ impl From<Vec<u8>> for CheckpointKey {
struct Database(lmdb::Environment);

const LMDB_CHECKPOINTS: &str = "checkpoints";
const LMDB_CHECKPOINT_INFOS: &str = "checkpoint_infos";
const LMDB_BITCOIN_ADDRESSES: &str = "bitcoin_addresses";
const LMDB_MONERO_ADDRESSES: &str = "monero_addresses";
const LMDB_OFFER_HISTORY: &str = "offer_history";
Expand All @@ -476,9 +470,10 @@ impl Database {
fn new(path: PathBuf) -> Result<Database, lmdb::Error> {
let env = lmdb::Environment::new()
.set_map_size(10485760 * 1024 * 64)
.set_max_dbs(4)
.set_max_dbs(10)
.open(&path)?;
env.create_db(Some(LMDB_CHECKPOINTS), lmdb::DatabaseFlags::empty())?;
env.create_db(Some(LMDB_CHECKPOINT_INFOS), lmdb::DatabaseFlags::empty())?;
env.create_db(Some(LMDB_BITCOIN_ADDRESSES), lmdb::DatabaseFlags::empty())?;
env.create_db(Some(LMDB_OFFER_HISTORY), lmdb::DatabaseFlags::empty())?;
env.create_db(Some(LMDB_MONERO_ADDRESSES), lmdb::DatabaseFlags::empty())?;
Expand Down Expand Up @@ -654,6 +649,41 @@ impl Database {
Ok(())
}

fn set_checkpoint_info(&mut self, key: &SwapId, val: &[u8]) -> Result<(), lmdb::Error> {
let db = self.0.open_db(Some(LMDB_CHECKPOINT_INFOS))?;
let mut tx = self.0.begin_rw_txn()?;
if tx.get(db, &key.as_bytes()).is_ok() {
tx.del(db, &key.as_bytes(), None)?;
}
tx.put(db, &key.as_bytes(), &val, lmdb::WriteFlags::empty())?;
tx.commit()?;
Ok(())
}

fn get_checkpoint_info(&mut self, key: &SwapId) -> Result<Vec<u8>, lmdb::Error> {
let db = self.0.open_db(Some(LMDB_CHECKPOINT_INFOS))?;
let tx = self.0.begin_ro_txn()?;
let val: Vec<u8> = tx.get(db, &key.as_bytes())?.into();
tx.abort();
Ok(val)
}

fn get_all_checkpoint_info(&mut self) -> Result<Vec<(SwapId, Vec<u8>)>, lmdb::Error> {
let db = self.0.open_db(Some(LMDB_CHECKPOINT_INFOS))?;
let tx = self.0.begin_ro_txn()?;
let mut cursor = tx.open_ro_cursor(db)?;
let res = cursor
.iter()
.map(|(key, value)| {
debug!("key: {:?}", key);
(SwapId::from_slice(key), value.to_vec())
})
.collect();
drop(cursor);
tx.abort();
Ok(res)
}

fn get_checkpoint_state(&mut self, key: &CheckpointKey) -> Result<Vec<u8>, lmdb::Error> {
let db = self.0.open_db(Some(LMDB_CHECKPOINTS))?;
let tx = self.0.begin_ro_txn()?;
Expand All @@ -670,19 +700,12 @@ impl Database {
Ok(())
}

fn get_checkpoint_key_value_pairs(
&mut self,
) -> Result<Vec<(CheckpointKey, Vec<u8>)>, lmdb::Error> {
fn delete_checkpoint_info(&mut self, key: SwapId) -> Result<(), lmdb::Error> {
let db = self.0.open_db(Some(LMDB_CHECKPOINTS))?;
let tx = self.0.begin_ro_txn()?;
let mut cursor = tx.open_ro_cursor(db)?;
let res = cursor
.iter()
.map(|(key, value)| (CheckpointKey::from(key.to_vec()), value.to_vec()))
.collect();
drop(cursor);
tx.abort();
Ok(res)
let mut tx = self.0.begin_rw_txn()?;
tx.del(db, &key.as_bytes(), None)?;
tx.commit()?;
Ok(())
}
}

Expand Down Expand Up @@ -715,7 +738,6 @@ fn test_lmdb_state() {
database.delete_checkpoint_state(key2.clone()).unwrap();
let res = database.get_checkpoint_state(&key2);
assert!(res.is_err());
database.get_checkpoint_key_value_pairs().unwrap();

let sk = SecretKey::new(&mut bitcoin::secp256k1::rand::thread_rng());
let private_key =
Expand Down

0 comments on commit 076afff

Please sign in to comment.