Skip to content

Commit

Permalink
Subscribe for tx events instead of polling when sending a transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
sisou committed Mar 27, 2023
1 parent 913c1e0 commit 2819043
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion web-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ maintenance = { status = "experimental" }
crate-type = ["cdylib"]

[dependencies]
futures = { package = "futures-util", version = "0.3" }
futures = "0.3"
futures-util = "0.3"
hex = "0.4"
js-sys = "0.3"
log = { package = "tracing", version = "0.1", features = ["log"] }
Expand Down
147 changes: 90 additions & 57 deletions web-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use std::{
str::FromStr,
};

use futures::StreamExt;
use js_sys::{Array, Date, Promise};
use futures::{channel::oneshot, future::select, future::Either};
use futures_util::StreamExt;
use js_sys::{Array, Promise};
use log::level_filters::LevelFilter;
use tsify::Tsify;
use wasm_bindgen::prelude::*;
Expand All @@ -33,7 +34,7 @@ use nimiq_network_interface::{
network::{CloseReason, Network, NetworkEvent},
Multiaddr,
};
use nimiq_primitives::networks::NetworkId;
use nimiq_primitives::{networks::NetworkId, policy::Policy};

use crate::account::{
PlainAccount, PlainAccountArrayType, PlainAccountType, PlainStaker, PlainStakerArrayType,
Expand Down Expand Up @@ -182,6 +183,10 @@ pub struct Client {
peer_changed_listeners: Rc<RefCell<HashMap<usize, js_sys::Function>>>,
transaction_listeners:
Rc<RefCell<HashMap<usize, (js_sys::Function, HashSet<nimiq_keys::Address>)>>>,

/// Map from transaction hash as hex string to oneshot sender.
/// Used to await transaction events in `send_transaction`.
transaction_oneshots: Rc<RefCell<HashMap<String, oneshot::Sender<PlainTransactionDetails>>>>,
}

#[wasm_bindgen]
Expand Down Expand Up @@ -252,6 +257,7 @@ impl Client {
head_changed_listeners: Rc::new(RefCell::new(HashMap::with_capacity(1))),
peer_changed_listeners: Rc::new(RefCell::new(HashMap::with_capacity(1))),
transaction_listeners: Rc::new(RefCell::new(HashMap::new())),
transaction_oneshots: Rc::new(RefCell::new(HashMap::new())),
};

client.setup_offline_online_event_handlers();
Expand Down Expand Up @@ -575,71 +581,86 @@ impl Client {

tx.verify(Some(self.network_id))?;

let current_height = self.get_head_height().await;
// Check if we are already subscribed to the sender or recipient
let already_subscribed = self
.subscribed_addresses
.borrow()
// Check sender first, as apps are usually subscribed to the sender already
.contains_key(tx.sender().native_ref())
|| self
.subscribed_addresses
.borrow()
.contains_key(tx.recipient().native_ref());
let mut subscribed_address = None;

self.inner
.consensus_proxy()
.send_transaction(tx.native())
.await?;
let consensus = self.inner.consensus_proxy();

// Until we have a proper way of subscribing & listening for inclusion events of transactions,
// we poll the sender's transaction receipts until we find the transaction's hash.
// TODO: Instead of polling, subscribe to the transaction's inclusion events, or the sender's tx events.
let tx_hash = tx.hash();
let start = Date::now();

loop {
// Sleep for 0.5s before requesting (again)
JsFuture::from(Promise::new(&mut |resolve, _| {
web_sys::window()
.expect("Unable to get a reference to the JS `Window` object")
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 500)
.unwrap();
}))
.await
.unwrap();

let receipts = self
.inner
.consensus_proxy()
.request_transaction_receipts_by_address(tx.sender().take_native(), 1, Some(10))
// If not subscribed, subscribe to the sender or recipient
if !already_subscribed {
// Subscribe to the recipient by default
subscribed_address = Some(tx.recipient().native());
if subscribed_address == Some(Policy::STAKING_CONTRACT_ADDRESS) {
// If the recipient is the staking contract, subscribe to the sender instead
// to not get flooded with notifications.
subscribed_address = Some(tx.sender().native());
}
let address = subscribed_address.clone().unwrap();
consensus
.subscribe_to_addresses(vec![address], 1, None)
.await?;
}

for receipt in receipts {
// The receipts are ordered newest first, so we can break the loop once receipts are older than
// the blockchain height when we started to avoid looping over receipts that cannot be the one
// we are looking for.
if receipt.1 <= current_height {
break;
}
let hash = &tx.hash();

if receipt.0.to_hex() == tx_hash {
// Get the full transaction
let ext_tx = self
.inner
.consensus_proxy()
.request_transaction_by_hash_and_block_number(receipt.0, receipt.1, 1)
.await?;
let details =
PlainTransactionDetails::from_extended_transaction(&ext_tx, receipt.1);
return Ok(serde_wasm_bindgen::to_value(&details)?.into());
}
}
// Set a oneshot sender to receive the transaction when its notification arrives
let (sender, receiver) = oneshot::channel();
self.transaction_oneshots
.borrow_mut()
.insert(hash.clone(), sender);

// Actually send the transaction
consensus.send_transaction(tx.native()).await?;

let timeout = JsFuture::from(Promise::new(&mut |resolve, _| {
Client::window()
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 10_000)
.unwrap();
}));

// Wait for the transaction (will be None if the timeout is reached first)
let res = select(receiver, timeout).await;

let maybe_details = if let Either::Left((res, _)) = res {
res.ok()
} else {
// If the timeout triggered, delete our oneshot sender
self.transaction_oneshots.borrow_mut().remove(hash);
None
};

if Date::now() - start >= 10_000.0 {
break;
}
// Unsubscribe from any address we subscribed to, without caring about the result
if let Some(address) = subscribed_address {
let owned_consensus = consensus.clone();
spawn_local(async move {
let _ = owned_consensus
.unsubscribe_from_addresses(vec![address], 1)
.await;
});
}

// If the transaction did not get included, return it as TransactionState::New
let details =
PlainTransactionDetails::new(&tx, TransactionState::New, None, None, None, None);
Ok(serde_wasm_bindgen::to_value(&details)?.into())
if let Some(details) = maybe_details {
// If we got a transactions, return it
Ok(serde_wasm_bindgen::to_value(&details)?.into())
} else {
// If the transaction did not get included, return it as TransactionState::New
let details =
PlainTransactionDetails::new(&tx, TransactionState::New, None, None, None, None);
Ok(serde_wasm_bindgen::to_value(&details)?.into())
}
}

fn setup_offline_online_event_handlers(&self) {
let window =
web_sys::window().expect("Unable to get a reference to the JS `Window` object");
let window = Client::window();
let network = self.inner.network();
let network1 = self.inner.network();

Expand Down Expand Up @@ -882,6 +903,7 @@ impl Client {
let consensus = self.inner.consensus_proxy();

let transaction_listeners = Rc::clone(&self.transaction_listeners);
let transaction_oneshots = Rc::clone(&self.transaction_oneshots);

spawn_local(async move {
let mut address_notifications = consensus.subscribe_address_notifications().await;
Expand Down Expand Up @@ -937,6 +959,13 @@ impl Client {
Some(1),
);

if let Some(sender) = transaction_oneshots
.borrow_mut()
.remove(&details.transaction.transaction_hash)
{
let _ = sender.send(details.clone());
}

if let Ok(js_value) = serde_wasm_bindgen::to_value(&details) {
for (listener, addresses) in transaction_listeners.borrow().values() {
if addresses.contains(&tx.sender)
Expand Down Expand Up @@ -1196,6 +1225,10 @@ impl Client {

Ok(ordered_validators)
}

fn window() -> web_sys::Window {
web_sys::window().expect("Unable to get a reference to the JS `Window` object")
}
}

#[wasm_bindgen]
Expand Down
10 changes: 5 additions & 5 deletions web-client/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,21 +431,21 @@ impl Transaction {
}

/// Placeholder struct to serialize data of transactions as hex strings in the style of the Nimiq 1.0 library.
#[derive(serde::Serialize, serde::Deserialize, Tsify)]
#[derive(Clone, serde::Serialize, serde::Deserialize, Tsify)]
pub struct PlainTransactionData {
pub raw: String,
}

/// Placeholder struct to serialize proofs of transactions as hex strings in the style of the Nimiq 1.0 library.
#[derive(serde::Serialize, serde::Deserialize, Tsify)]
#[derive(Clone, serde::Serialize, serde::Deserialize, Tsify)]
pub struct PlainTransactionProof {
pub raw: String,
}

/// JSON-compatible and human-readable format of transactions. E.g. addresses are presented in their human-readable
/// format and address types and the network are represented as strings. Data and proof are serialized as an object
/// describing their contents (not yet implemented, only the `{ raw: string }` fallback is available).
#[derive(serde::Serialize, serde::Deserialize, Tsify)]
#[derive(Clone, serde::Serialize, serde::Deserialize, Tsify)]
#[serde(rename_all = "camelCase")]
pub struct PlainTransaction {
/// The transaction's unique hash, used as its identifier. Sometimes also called `txId`.
Expand Down Expand Up @@ -501,7 +501,7 @@ pub struct PlainTransaction {
}

/// Describes the state of a transaction as known by the client.
#[derive(serde::Serialize, serde::Deserialize, Tsify)]
#[derive(Clone, serde::Serialize, serde::Deserialize, Tsify)]
#[serde(rename_all = "lowercase")]
pub enum TransactionState {
/// The transaction only exists locally and has not been broadcast or accepted by any peers.
Expand All @@ -526,7 +526,7 @@ pub enum TransactionState {
/// JSON-compatible and human-readable format of transactions, including details about its state in the
/// blockchain. Contains all fields from {@link PlainTransaction}, plus additional fields such as
/// `blockHeight` and `timestamp` if the transaction is included in the blockchain.
#[derive(serde::Deserialize, Tsify)]
#[derive(Clone, serde::Deserialize, Tsify)]
#[serde(rename_all = "camelCase")]
pub struct PlainTransactionDetails {
#[serde(flatten)]
Expand Down

0 comments on commit 2819043

Please sign in to comment.