Skip to content

Commit

Permalink
Expose GetTransactionsByAddress to JS
Browse files Browse the repository at this point in the history
Provide functionality that queries the network in order to get transactions that belong to some specific address.
For this particular request, history nodes would provide the transaction details and an inclusion proof of those transactions.
The final list of transactions are serialized and returned to JS via a JsValue
  • Loading branch information
viquezclaudio committed Mar 1, 2023
1 parent ecfd42a commit 3b74370
Show file tree
Hide file tree
Showing 17 changed files with 318 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions blockchain/src/blockchain/history_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use nimiq_blockchain_interface::{
use nimiq_database::WriteTransaction;
use nimiq_primitives::coin::Coin;
use nimiq_primitives::policy::Policy;
use nimiq_transaction::extended_transaction::{ExtTxData, ExtendedTransaction};
use nimiq_transaction::inherent::{Inherent, InherentType};
use nimiq_transaction::Transaction;
use nimiq_transaction::{
extended_transaction::{ExtTxData, ExtendedTransaction},
inherent::{Inherent, InherentType},
Transaction,
};

use crate::Blockchain;

Expand Down
5 changes: 2 additions & 3 deletions blockchain/src/history/history_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ use nimiq_mmr::mmr::proof::RangeProof;
use nimiq_mmr::mmr::MerkleMountainRange;
use nimiq_mmr::store::memory::MemoryStore;
use nimiq_primitives::policy::Policy;
use nimiq_transaction::history_proof::HistoryTreeProof;
use nimiq_transaction::{
extended_transaction::{ExtTxData, ExtendedTransaction},
inherent::InherentType,
};

use crate::history::{
mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk, HistoryTreeProof,
};
use crate::history::{mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk};

/// A struct that contains databases to store history trees (which are Merkle Mountain Ranges
/// constructed from the list of extended transactions in an epoch) and extended transactions (which
Expand Down
2 changes: 0 additions & 2 deletions blockchain/src/history/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
pub use history_store::HistoryStore;
pub use history_tree_chunk::{HistoryTreeChunk, CHUNK_SIZE};
pub use history_tree_proof::HistoryTreeProof;

mod history_store;
mod history_tree_chunk;
mod history_tree_proof;
mod mmr_store;
mod ordered_hash;
120 changes: 117 additions & 3 deletions consensus/src/consensus/consensus_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use tokio::sync::broadcast::Sender as BroadcastSender;
use tokio_stream::wrappers::BroadcastStream;

use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_network_interface::network::Network;
use nimiq_primitives::account::AccountType;
use nimiq_transaction::{ControlTransactionTopic, Transaction, TransactionTopic};
use nimiq_keys::Address;
use nimiq_network_interface::{
network::{CloseReason, Network},
peer_info::Services,
request::{OutboundRequestError, RequestError},
};
use nimiq_primitives::{account::AccountType, policy::Policy};
use nimiq_transaction::{
extended_transaction::ExtendedTransaction, ControlTransactionTopic, Transaction,
TransactionTopic,
};

use crate::messages::{RequestTransactionsByAddress, RequestTransactionsProof};
use crate::ConsensusEvent;

pub struct ConsensusProxy<N: Network> {
Expand Down Expand Up @@ -44,4 +54,108 @@ impl<N: Network> ConsensusProxy<N> {
pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
}

pub async fn request_transactions_by_address(
&self,
address: Address,
min_peers: usize,
max: Option<u16>,
) -> Result<Vec<ExtendedTransaction>, RequestError> {
// First we tell the network to provide us with a vector that contains all the connected peers that support such services
// Note: If the network could not provide enough peers that satisfies our requirement, then an error would be returned
let peers = self
.network
.get_peers_by_services(Services::TRANSACTION_INDEX, min_peers)
.await
.map_err(|error| {
log::error!(
err = %error,
"The transactions by address request couldn't be fulfilled"
);

RequestError::OutboundRequest(OutboundRequestError::SendError)
})?;

let mut verified_transactions = HashMap::new();

// At this point we obtained a list of connected peers that could satisfy our request,
// so we perform the request to each of those peers:
for peer_id in peers {
let response = self
.network
.request::<RequestTransactionsByAddress>(
RequestTransactionsByAddress {
address: address.clone(),
max,
},
peer_id,
)
.await;

match response {
Ok(response) => {
// Now we request proofs for each transaction we requested.
for transaction in response.transactions {
// If the transaction was already verified, then we don't need to verify it again
if verified_transactions.contains_key(&transaction.tx_hash()) {
continue;
}

let response = self
.network
.request::<RequestTransactionsProof>(
RequestTransactionsProof {
hashes: vec![transaction.tx_hash()],
epoch_number: Policy::epoch_at(transaction.block_number),
},
peer_id,
)
.await;
match response {
Ok(proof_response) => {
// We verify the transaction using the proof
if let Some(proof) = proof_response.proof {
if let Some(block) = proof_response.block {
// TODO: We are currently assuming that the provided block was included in the chain
// but we also need some additional information to prove the block is part of the chain.
let verification_result = proof
.verify(block.history_root().clone())
.map_or(false, |result| result);

if verification_result {
verified_transactions
.insert(transaction.tx_hash(), transaction);
} else {
// The proof didn't verify so we disconnect from this peer
self.network
.disconnect_peer(peer_id, CloseReason::Other)
.await;
break;
}
} else {
// If we receive a proof but we do not recieve a block, we disconnect from the peer
self.network
.disconnect_peer(peer_id, CloseReason::Other)
.await;
break;
}
}
}
Err(error) => {
//If there was a request error with this peer we dont request anymore proofs from it
log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions proof from peer");
break;
}
}
}
}
Err(error) => {
// If there was a request error with this peer we log an error
log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions from peer");
}
}
}

Ok(verified_transactions.into_values().collect())
}
}
28 changes: 11 additions & 17 deletions consensus/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::fmt::{Debug, Formatter};
use beserial::{Deserialize, Serialize};
use nimiq_block::{Block, MacroBlock};
#[cfg(feature = "full")]
use nimiq_blockchain::{HistoryTreeChunk, HistoryTreeProof};
use nimiq_blockchain::HistoryTreeChunk;
use nimiq_hash::Blake2bHash;
#[cfg(feature = "full")]
use nimiq_keys::Address;
use nimiq_network_interface::request::{RequestCommon, RequestMarker};
#[cfg(feature = "full")]
use nimiq_transaction::extended_transaction::ExtendedTransaction;
use nimiq_transaction::{
extended_transaction::ExtendedTransaction, history_proof::HistoryTreeProof,
};

mod handlers;

Expand Down Expand Up @@ -211,47 +211,41 @@ impl RequestCommon for RequestHead {
const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_HEAD;
}

#[cfg(feature = "full")]
#[derive(Serialize, Deserialize)]
pub struct ResponseTransactionsProof {
proof: Option<HistoryTreeProof>,
block: Option<Block>,
pub proof: Option<HistoryTreeProof>,
pub block: Option<Block>,
}

#[cfg(feature = "full")]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestTransactionsProof {
#[beserial(len_type(u16, limit = 128))]
hashes: Vec<Blake2bHash>,
epoch_number: u32,
pub hashes: Vec<Blake2bHash>,
pub epoch_number: u32,
}

#[cfg(feature = "full")]
impl RequestCommon for RequestTransactionsProof {
type Kind = RequestMarker;
const TYPE_ID: u16 = 213;
type Response = ResponseTransactionsProof;
const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_PROOF;
}

#[cfg(feature = "full")]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestTransactionsByAddress {
address: Address,
max: Option<u16>,
pub address: Address,
pub max: Option<u16>,
}

#[cfg(feature = "full")]
impl RequestCommon for RequestTransactionsByAddress {
type Kind = RequestMarker;
const TYPE_ID: u16 = 214;
type Response = ResponseTransactionsByAddress;
const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_BY_ADDRESS;
}

#[cfg(feature = "full")]
#[derive(Serialize, Deserialize)]
pub struct ResponseTransactionsByAddress {
#[beserial(len_type(u16, limit = 128))]
transactions: Vec<ExtendedTransaction>,
pub transactions: Vec<ExtendedTransaction>,
}
10 changes: 10 additions & 0 deletions network-interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ pub trait Network: Send + Sync + Unpin + 'static {
/// If the peer isn't found, `None` is returned.
fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo>;

/// Gets the set of connected peers that provide the supplied services.
/// If we currently don't have min number of connected peer that provides those services,
/// we dial peers.
/// If there aren't enough peers in the network that provides the required services, we return an error
async fn get_peers_by_services(
&self,
services: Services,
min_peers: usize,
) -> Result<Vec<Self::PeerId>, Self::Error>;

/// Returns true when the given peer provides the services flags that are required by us
fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;

Expand Down
25 changes: 25 additions & 0 deletions network-libp2p/src/connection_pool/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,31 @@ impl ConnectionPoolBehaviour {
.choose_multiple(&mut thread_rng(), num_peers)
}

pub fn choose_peers_to_dial_by_services(
&self,
services: Services,
num_peers: usize,
) -> Vec<PeerId> {
let contacts = self.contacts.read();
let own_contact = contacts.get_own_contact();
let own_peer_id = own_contact.peer_id();

contacts
.query(services)
.filter_map(|contact| {
let peer_id = contact.peer_id();
if peer_id != own_peer_id
&& self.peer_ids.can_dial(peer_id)
&& contact.addresses().count() > 0
{
Some(*peer_id)
} else {
None
}
})
.choose_multiple(&mut thread_rng(), num_peers)
}

fn choose_seeds_to_dial(&self) -> Vec<Multiaddr> {
// We prefer to connect to non-seed peers. Thus, we only choose any seeds here if we're
// not already dialing any peers and at most one seed at a time.
Expand Down
3 changes: 3 additions & 0 deletions network-libp2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum NetworkError {
#[error("Network action was cancelled")]
Cancelled,

#[error("We could not find any peer that satisfies the desired services")]
PeersNotFound,

#[error("Serialization error: {0}")]
Serialization(#[from] beserial::SerializingError),

Expand Down
Loading

0 comments on commit 3b74370

Please sign in to comment.