Skip to content

Commit

Permalink
Merge pull request #595 from TheCharlatan/rmChunks
Browse files Browse the repository at this point in the history
Checkpointing: Remove multipart message chunking
  • Loading branch information
h4sh3d authored Jul 27, 2022
2 parents 5252857 + 1b9103f commit a2ee64f
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 152 deletions.
1 change: 0 additions & 1 deletion src/databased/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,5 @@ mod runtime;

#[cfg(feature = "shell")]
pub use opts::Opts;
pub use runtime::checkpoint_handle_multipart_receive;
pub use runtime::checkpoint_send;
pub use runtime::run;
130 changes: 9 additions & 121 deletions src/databased/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use bitcoin::secp256k1::SecretKey;
use crate::{
rpc::{
request::{
self, BitcoinAddress, Checkpoint, CheckpointChunk, CheckpointEntry,
CheckpointMultipartChunk, CheckpointState, Commit, Failure, FailureCode, Keys,
LaunchSwap, List, MoneroAddress, Msg, NodeId, Params, Reveal, Token, Tx,
self, BitcoinAddress, Checkpoint, CheckpointEntry, CheckpointState, Commit, Failure,
FailureCode, Keys, LaunchSwap, List, MoneroAddress, Msg, NodeId, Params, Reveal, Token,
Tx,
},
Request, ServiceBus,
},
Expand All @@ -45,7 +45,6 @@ pub fn run(config: ServiceConfig, data_dir: PathBuf) -> Result<(), Error> {
let runtime = Runtime {
identity: ServiceId::Database,
database: Database::new(data_dir).unwrap(),
pending_checkpoint_chunks: map![],
};

Service::run(config, runtime, false)
Expand All @@ -54,7 +53,6 @@ pub fn run(config: ServiceConfig, data_dir: PathBuf) -> Result<(), Error> {
pub struct Runtime {
identity: ServiceId,
database: Database,
pending_checkpoint_chunks: HashMap<[u8; 20], HashSet<CheckpointChunk>>,
}

impl Runtime {}
Expand Down Expand Up @@ -126,14 +124,6 @@ impl Runtime {
}
},

Request::CheckpointMultipartChunk(checkpoint_multipart_chunk) => {
if let Some(checkpoint_request) = checkpoint_handle_multipart_receive(
checkpoint_multipart_chunk,
&mut self.pending_checkpoint_chunks,
)? {
self.handle_rpc_ctl(endpoints, source, checkpoint_request)?;
}
}
Request::Checkpoint(Checkpoint { swap_id, state }) => {
match state {
CheckpointState::CheckpointWallet(_) => {
Expand Down Expand Up @@ -401,121 +391,19 @@ impl Runtime {
}
}

pub fn checkpoint_handle_multipart_receive(
checkpoint_multipart_chunk: request::CheckpointMultipartChunk,
pending_checkpoint_chunks: &mut HashMap<[u8; 20], HashSet<CheckpointChunk>>,
) -> Result<Option<request::Request>, Error> {
let request::CheckpointMultipartChunk {
checksum,
msg_index,
msgs_total,
serialized_state_chunk,
swap_id,
} = checkpoint_multipart_chunk;
debug!("received checkpoint multipart message");
if pending_checkpoint_chunks.contains_key(&checksum) {
let chunks = pending_checkpoint_chunks
.get_mut(&checksum)
.expect("checked with contains_key");
chunks.insert(CheckpointChunk {
msg_index,
serialized_state_chunk,
});
} else {
let mut chunk = HashSet::new();
chunk.insert(CheckpointChunk {
msg_index,
serialized_state_chunk,
});
pending_checkpoint_chunks.insert(checksum, chunk);
}
let mut chunks = pending_checkpoint_chunks
.get(&checksum)
.unwrap_or(&HashSet::new())
.clone();
if chunks.len() >= msgs_total {
let mut chunk_tup_vec = chunks
.drain()
.map(|chunk| (chunk.msg_index, chunk.serialized_state_chunk))
.collect::<Vec<(usize, Vec<u8>)>>(); // map the hashset to a vec for sorting
chunk_tup_vec
.sort_by(|(msg_number_a, _), (msg_number_b, _)| msg_number_a.cmp(&msg_number_b)); // sort in ascending order
let chunk_vec = chunk_tup_vec
.drain(..)
.map(|(_, chunk)| chunk)
.collect::<Vec<Vec<u8>>>(); // drop the extra integer index
let serialized_checkpoint = chunk_vec.into_iter().flatten().collect::<Vec<u8>>(); // collect the chunked messages into a single serialized message
if ripemd160::Hash::hash(&serialized_checkpoint).into_inner() != checksum {
// this should never happen
return Err(Error::Farcaster(
"Unable to checkpoint the message, checksum did not match".to_string(),
));
}
// serialize request and return it
let request = Request::Checkpoint(request::Checkpoint {
swap_id,
state: request::CheckpointState::strict_decode(std::io::Cursor::new(
serialized_checkpoint,
))
.map_err(|err| Error::Farcaster(err.to_string()))?,
});
Ok(Some(request))
} else {
Ok(None)
}
}

pub fn checkpoint_send(
endpoints: &mut Endpoints,
swap_id: SwapId,
source: ServiceId,
destination: ServiceId,
state: CheckpointState,
) -> Result<(), Error> {
let mut serialized_state = vec![];
let size = state
.strict_encode(&mut serialized_state)
.expect("strict encode of a checkpoint should not fail");

// if the size exceeds a boundary, send a multi-part message
let max_chunk_size = internet2::transport::MAX_FRAME_SIZE - 1024;
debug!("checkpointing wallet state");
if size > max_chunk_size {
let checksum: [u8; 20] = ripemd160::Hash::hash(&serialized_state).into_inner();
debug!("need to chunk the checkpoint message");
let chunks: Vec<(usize, Vec<u8>)> = serialized_state
.chunks_mut(max_chunk_size)
.enumerate()
.map(|(n, chunk)| (n, chunk.to_vec()))
.collect();
let chunks_total = chunks.len();
for (n, chunk) in chunks {
debug!(
"sending chunked checkpoint message {} of a total {}",
n + 1,
chunks_total
);
endpoints.send_to(
ServiceBus::Ctl,
source.clone(),
destination.clone(),
Request::CheckpointMultipartChunk(CheckpointMultipartChunk {
checksum,
msg_index: n,
msgs_total: chunks_total,
serialized_state_chunk: chunk,
swap_id,
}),
)?;
}
} else {
endpoints.send_to(
ServiceBus::Ctl,
source,
destination,
Request::Checkpoint(Checkpoint { swap_id, state }),
)?;
}
endpoints.send_to(
ServiceBus::Ctl,
source,
destination,
Request::Checkpoint(Checkpoint { swap_id, state }),
)?;
Ok(())
}

Expand Down
17 changes: 1 addition & 16 deletions src/swapd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// along with this software.
// If not, see <https://opensource.org/licenses/MIT>.

use crate::databased::checkpoint_handle_multipart_receive;
use crate::databased::checkpoint_send;
use crate::service::Endpoints;
use crate::syncerd::bitcoin_syncer::p2wpkh_signed_tx_fee;
Expand Down Expand Up @@ -85,10 +84,7 @@ use internet2::{
};
use microservices::esb::{self, Handler};
use monero::{cryptonote::hash::keccak_256, PrivateKey, ViewPair};
use request::{
Checkpoint, CheckpointChunk, CheckpointMultipartChunk, CheckpointState, Commit, InitSwap,
Params, Reveal, TakeCommit, Tx,
};
use request::{Checkpoint, CheckpointState, Commit, InitSwap, Params, Reveal, TakeCommit, Tx};
use std::net::SocketAddr;
use strict_encoding::{StrictDecode, StrictEncode};

Expand Down Expand Up @@ -186,7 +182,6 @@ pub fn run(
)?),
pending_requests: none!(),
pending_peer_request: none!(),
pending_checkpoint_chunks: map![],
txs: none!(),
public_offer,
};
Expand All @@ -208,7 +203,6 @@ pub struct Runtime {
temporal_safety: TemporalSafety,
pending_requests: PendingRequests,
pending_peer_request: Vec<request::Msg>, // Peer requests that failed and are waiting for reconnection
pending_checkpoint_chunks: HashMap<[u8; 20], HashSet<CheckpointChunk>>,
txs: HashMap<TxLabel, bitcoin::Transaction>,
#[allow(dead_code)]
storage: Box<dyn storage::Driver>,
Expand Down Expand Up @@ -2364,15 +2358,6 @@ impl Runtime {
self.pending_peer_request.clear();
}

Request::CheckpointMultipartChunk(checkpoint_multipart_chunk) => {
if let Some(checkpoint_request) = checkpoint_handle_multipart_receive(
checkpoint_multipart_chunk,
&mut self.pending_checkpoint_chunks,
)? {
self.handle_rpc_ctl(endpoints, source, checkpoint_request)?;
}
}

Request::Checkpoint(request::Checkpoint { swap_id, state }) => match state {
CheckpointState::CheckpointSwapd(CheckpointSwapd {
state,
Expand Down
16 changes: 2 additions & 14 deletions src/walletd/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::databased::checkpoint_handle_multipart_receive;
use crate::databased::checkpoint_send;
use crate::service::Endpoints;
use crate::syncerd::SweepBitcoinAddress;
use lmdb::{Cursor, Transaction as LMDBTransaction};
use monero::consensus::{Decodable as MoneroDecodable, Encodable as MoneroEncodable};
use request::{Checkpoint, CheckpointMultipartChunk};
use request::Checkpoint;
use std::path::PathBuf;
use std::{
any::Any,
Expand Down Expand Up @@ -67,7 +66,7 @@ use farcaster_core::{
};
use internet2::{addr::LocalNode, TypedEnum};
use microservices::esb::{self, Handler};
use request::{CheckpointChunk, CheckpointState, LaunchSwap, NodeId};
use request::{CheckpointState, LaunchSwap, NodeId};

pub fn run(
config: ServiceConfig,
Expand All @@ -82,7 +81,6 @@ pub fn run(
swaps: none!(),
btc_addrs: none!(),
xmr_addrs: none!(),
pending_checkpoint_chunks: map![],
};

Service::run(config, runtime, false)
Expand All @@ -96,7 +94,6 @@ pub struct Runtime {
swaps: HashMap<SwapId, Option<Request>>,
btc_addrs: HashMap<SwapId, bitcoin::Address>,
xmr_addrs: HashMap<SwapId, monero::Address>,
pending_checkpoint_chunks: HashMap<[u8; 20], HashSet<CheckpointChunk>>,
}

impl Runtime {
Expand Down Expand Up @@ -1702,15 +1699,6 @@ impl Runtime {
self.clean_up_after_swap(&swap_id);
}

Request::CheckpointMultipartChunk(checkpoint_multipart_chunk) => {
if let Some(checkpoint_request) = checkpoint_handle_multipart_receive(
checkpoint_multipart_chunk,
&mut self.pending_checkpoint_chunks,
)? {
self.handle_rpc_ctl(endpoints, source, checkpoint_request)?;
}
}

Request::Checkpoint(request::Checkpoint { swap_id, state }) => {
match state {
CheckpointState::CheckpointWallet(CheckpointWallet { wallet, xmr_addr }) => {
Expand Down

0 comments on commit a2ee64f

Please sign in to comment.