Skip to content

Commit

Permalink
Expose GetTransactionsByAddress to JS
Browse files Browse the repository at this point in the history
Provide functionality that queries the network in order to get transactions that belong to some specific address.
For this particular request, history nodes would provide the transaction details and an inclusion proof of those transactions.
The final list of transactions are serialized and returned to JS via a JsValue
  • Loading branch information
viquezclaudio committed Feb 28, 2023
1 parent 2daee6a commit f2097ca
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions blockchain/src/history/history_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ use nimiq_mmr::mmr::proof::RangeProof;
use nimiq_mmr::mmr::MerkleMountainRange;
use nimiq_mmr::store::memory::MemoryStore;
use nimiq_primitives::policy::Policy;
use nimiq_transaction::history_proof::HistoryTreeProof;
use nimiq_transaction::{
extended_transaction::{ExtTxData, ExtendedTransaction},
inherent::InherentType,
};

use crate::history::{
mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk, HistoryTreeProof,
};
use crate::history::{mmr_store::MMRStore, ordered_hash::OrderedHash, HistoryTreeChunk};

/// A struct that contains databases to store history trees (which are Merkle Mountain Ranges
/// constructed from the list of extended transactions in an epoch) and extended transactions (which
Expand Down
2 changes: 0 additions & 2 deletions blockchain/src/history/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
pub use history_store::HistoryStore;
pub use history_tree_chunk::{HistoryTreeChunk, CHUNK_SIZE};
pub use history_tree_proof::HistoryTreeProof;

mod history_store;
mod history_tree_chunk;
mod history_tree_proof;
mod mmr_store;
mod ordered_hash;
113 changes: 112 additions & 1 deletion consensus/src/consensus/consensus_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use nimiq_keys::Address;
use nimiq_network_interface::peer_info::Services;
use nimiq_network_interface::request::OutboundRequestError;
use nimiq_network_interface::request::RequestError;
use nimiq_primitives::policy::Policy;
use nimiq_transaction::extended_transaction::ExtendedTransaction;
use tokio::sync::broadcast::Sender as BroadcastSender;
use tokio_stream::wrappers::BroadcastStream;

use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_network_interface::network::Network;
use nimiq_network_interface::network::{CloseReason, Network};
use nimiq_primitives::account::AccountType;
use nimiq_transaction::{ControlTransactionTopic, Transaction, TransactionTopic};

use crate::messages::{RequestTransactionsByAddress, RequestTransactionsProof};
use crate::ConsensusEvent;

pub struct ConsensusProxy<N: Network> {
Expand Down Expand Up @@ -44,4 +52,107 @@ impl<N: Network> ConsensusProxy<N> {
pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
}

pub async fn request_transactions_by_address(
&self,
address: Address,
max: Option<u16>,
) -> Result<Vec<ExtendedTransaction>, RequestError> {
// First we tell the network to provide us with a vector that contains all the connected peers that support such services
// Note: If the network could not connect to any peer that satisfies our requirement, then an error would be returned
let peers = self
.network
.get_peers_by_services(Services::TRANSACTION_INDEX)
.await
.map_err(|error| {
log::error!(
err = %error,
"The transactions by address request couldn't be fulfilled"
);

RequestError::OutboundRequest(OutboundRequestError::SendError)
})?;

let mut verified_transactions = HashMap::new();

// At this point we obtained a list of connected peers that could satisfy our request,
// so we perform the request to each of those peers:
for peer_id in peers {
let response = self
.network
.request::<RequestTransactionsByAddress>(
RequestTransactionsByAddress {
address: address.clone(),
max,
},
peer_id,
)
.await;

match response {
Ok(response) => {
//Now we request proofs for each transaction we requested.
for transaction in response.transactions {
// If the transaction was already verified, then we don't need to verify it again
if verified_transactions.contains_key(&transaction.tx_hash()) {
continue;
}

let response = self
.network
.request::<RequestTransactionsProof>(
RequestTransactionsProof {
hashes: vec![transaction.tx_hash()],
epoch_number: Policy::epoch_at(transaction.block_number),
},
peer_id,
)
.await;
match response {
Ok(proof_response) => {
//We verify the transaction using the proof
if let Some(proof) = proof_response.proof {
if let Some(block) = proof_response.block {
//TODO: We are currently assuming that the provided block was included in the chain
//but we also need some additional information to prove the block is part of the chain.
let verification_result = proof
.verify(block.history_root().clone())
.map_or(false, |result| result);

if verification_result {
verified_transactions
.insert(transaction.tx_hash(), transaction);
} else {
//The proof didnt verify so we disconnect from this peer
self.network
.disconnect_peer(peer_id, CloseReason::Other)
.await;
break;
}
} else {
//If we receive a proof but we do not recieve a block, we disconnect from the peer
self.network
.disconnect_peer(peer_id, CloseReason::Other)
.await;
break;
}
}
}
Err(error) => {
//If there was a request error with this peer we dont request anymore proofs from it
log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions proof from peer");
break;
}
}
}
}
Err(error) => {
//If there was a request error with this peer we log an error
log::error!(peer=%peer_id, err=%error,"There was an error requesting transactions from peer");
}
}
}

Ok(verified_transactions.into_values().collect())
}
}
25 changes: 9 additions & 16 deletions consensus/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use std::fmt::{Debug, Formatter};
use beserial::{Deserialize, Serialize};
use nimiq_block::{Block, MacroBlock};
#[cfg(feature = "full")]
use nimiq_blockchain::{HistoryTreeChunk, HistoryTreeProof};
use nimiq_blockchain::HistoryTreeChunk;
use nimiq_hash::Blake2bHash;
#[cfg(feature = "full")]
use nimiq_keys::Address;
use nimiq_network_interface::request::{RequestCommon, RequestMarker};
#[cfg(feature = "full")]
use nimiq_transaction::extended_transaction::ExtendedTransaction;
use nimiq_transaction::history_proof::HistoryTreeProof;

mod handlers;

Expand Down Expand Up @@ -211,47 +210,41 @@ impl RequestCommon for RequestHead {
const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_HEAD;
}

#[cfg(feature = "full")]
#[derive(Serialize, Deserialize)]
pub struct ResponseTransactionsProof {
proof: Option<HistoryTreeProof>,
block: Option<Block>,
pub proof: Option<HistoryTreeProof>,
pub block: Option<Block>,
}

#[cfg(feature = "full")]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestTransactionsProof {
#[beserial(len_type(u16, limit = 128))]
hashes: Vec<Blake2bHash>,
epoch_number: u32,
pub hashes: Vec<Blake2bHash>,
pub epoch_number: u32,
}

#[cfg(feature = "full")]
impl RequestCommon for RequestTransactionsProof {
type Kind = RequestMarker;
const TYPE_ID: u16 = 213;
type Response = ResponseTransactionsProof;
const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_PROOF;
}

#[cfg(feature = "full")]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestTransactionsByAddress {
address: Address,
max: Option<u16>,
pub address: Address,
pub max: Option<u16>,
}

#[cfg(feature = "full")]
impl RequestCommon for RequestTransactionsByAddress {
type Kind = RequestMarker;
const TYPE_ID: u16 = 214;
type Response = ResponseTransactionsByAddress;
const MAX_REQUESTS: u32 = MAX_REQUEST_TRANSACTIONS_BY_ADDRESS;
}

#[cfg(feature = "full")]
#[derive(Serialize, Deserialize)]
pub struct ResponseTransactionsByAddress {
#[beserial(len_type(u16, limit = 128))]
transactions: Vec<ExtendedTransaction>,
pub transactions: Vec<ExtendedTransaction>,
}
9 changes: 9 additions & 0 deletions network-interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ pub trait Network: Send + Sync + Unpin + 'static {
/// If the peer isn't found, `None` is returned.
fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo>;

/// Gets the set of connected peers that provide the supplied services
/// If we currently don't have any connected peer that provides those services,
/// we dial peers.
/// If there is no peer in the network that provides those services, we return an error
async fn get_peers_by_services(
&self,
services: Services,
) -> Result<Vec<Self::PeerId>, Self::Error>;

/// Returns true when the given peer provides the services flags that are required by us
fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;

Expand Down
25 changes: 25 additions & 0 deletions network-libp2p/src/connection_pool/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,31 @@ impl ConnectionPoolBehaviour {
.choose_multiple(&mut thread_rng(), num_peers)
}

pub fn choose_peers_to_dial_by_services(&self, services: Services) -> Vec<PeerId> {
let num_peers = usize::min(
self.config.peer_count_desired - self.peer_ids.num_connected(),
self.config.dialing_count_max - self.peer_ids.num_dialing(),
);
let contacts = self.contacts.read();
let own_contact = contacts.get_own_contact();
let own_peer_id = own_contact.peer_id();

contacts
.query(services)
.filter_map(|contact| {
let peer_id = contact.peer_id();
if peer_id != own_peer_id
&& self.peer_ids.can_dial(peer_id)
&& contact.addresses().count() > 0
{
Some(*peer_id)
} else {
None
}
})
.choose_multiple(&mut thread_rng(), num_peers)
}

fn choose_seeds_to_dial(&self) -> Vec<Multiaddr> {
// We prefer to connect to non-seed peers. Thus, we only choose any seeds here if we're
// not already dialing any peers and at most one seed at a time.
Expand Down
3 changes: 3 additions & 0 deletions network-libp2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum NetworkError {
#[error("Network action was cancelled")]
Cancelled,

#[error("We coudnt find any peer that satisfies the desired services")]
PeersNotFound,

#[error("Serialization error: {0}")]
Serialization(#[from] beserial::SerializingError),

Expand Down
61 changes: 61 additions & 0 deletions network-libp2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ pub(crate) enum NetworkAction {
ListenOn {
listen_addresses: Vec<Multiaddr>,
},
ConnectPeersByServices {
services: Services,
output: oneshot::Sender<Vec<PeerId>>,
},
StartConnecting,
RestartConnecting,
StopConnecting,
Expand Down Expand Up @@ -1296,6 +1300,23 @@ impl Network {
NetworkAction::StopConnecting => {
swarm.behaviour_mut().pool.stop_connecting();
}
NetworkAction::ConnectPeersByServices { services, output } => {
let peers_candidates = swarm
.behaviour_mut()
.pool
.choose_peers_to_dial_by_services(services);
let mut successful_peers = vec![];

for peer_id in peers_candidates {
if Swarm::dial(swarm, DialOpts::peer_id(peer_id).build()).is_ok() {
successful_peers.push(peer_id);
}
}

if output.send(successful_peers).is_err() {
error!("Could not send sucessful peers vector");
}
}
NetworkAction::DisconnectPeer { peer_id } => {
if swarm.disconnect_peer_id(peer_id).is_err() {
warn!(%peer_id, "Peer already closed");
Expand Down Expand Up @@ -1737,6 +1758,46 @@ impl NetworkInterface for Network {
}
}

async fn get_peers_by_services(
&self,
services: Services,
) -> Result<Vec<Self::PeerId>, NetworkError> {
let (output_tx, output_rx) = oneshot::channel();
let connected_peers = self.get_peers();
let mut filtered_peers = vec![];

// First we try to get the connected peers that support the desired services
for peer_id in connected_peers.iter() {
if let Some(peer_info) = self.get_peer_info(*peer_id) {
if peer_info.get_services().contains(services) {
filtered_peers.push(*peer_id);
}
}
}

// If we dont have any connected peer that support the desired services,
// we tell the network to connect to new peers that support such services.
if connected_peers.is_empty() {
self.action_tx
.send(NetworkAction::ConnectPeersByServices {
services,
output: output_tx,
})
.await?;

filtered_peers.extend(output_rx.await?.iter());
}

// If filtered_peers is still empty at this point,
// it means that currently there is no peer that support the services in the network,
// so we return an error.
if filtered_peers.is_empty() {
return Err(NetworkError::PeersNotFound);
}

Ok(filtered_peers)
}

async fn disconnect_peer(&self, peer_id: PeerId, _close_reason: CloseReason) {
if let Err(error) = self
.action_tx
Expand Down
7 changes: 7 additions & 0 deletions network-mock/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,4 +581,11 @@ impl Network for MockNetwork {
fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo> {
self.peers.read().get(&peer_id).cloned()
}

async fn get_peers_by_services(
&self,
_services: Services,
) -> Result<Vec<Self::PeerId>, MockNetworkError> {
Ok(self.get_peers())
}
}
Loading

0 comments on commit f2097ca

Please sign in to comment.