From 93cb9483036e64bc9d55c17a15292bac3edab88d Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Mon, 24 Oct 2022 14:45:08 +0100 Subject: [PATCH 1/8] Implement event log This commit patches in the diff from #587, adjusted to main. --- Cargo.lock | 24 +- apps/Cargo.toml | 2 +- apps/src/lib/client/mod.rs | 1 - apps/src/lib/client/rpc.rs | 115 ++- apps/src/lib/client/tendermint_rpc_types.rs | 147 ++-- .../lib/client/tendermint_websocket_client.rs | 712 ------------------ apps/src/lib/client/tx.rs | 143 ++-- apps/src/lib/node/ledger/events.rs | 10 +- apps/src/lib/node/ledger/events/log.rs | 198 +++++ .../node/ledger/events/log/dumb_queries.rs | 108 +++ .../lib/node/ledger/shell/finalize_block.rs | 3 + apps/src/lib/node/ledger/shell/mod.rs | 17 + shared/Cargo.toml | 1 + shared/src/types/hash.rs | 44 +- wasm/Cargo.lock | 1 + wasm/checksums.json | 2 +- wasm_for_tests/wasm_source/Cargo.lock | 1 + 17 files changed, 614 insertions(+), 915 deletions(-) delete mode 100644 apps/src/lib/client/tendermint_websocket_client.rs create mode 100644 apps/src/lib/node/ledger/events/log.rs create mode 100644 apps/src/lib/node/ledger/events/log/dumb_queries.rs diff --git a/Cargo.lock b/Cargo.lock index 3d7588b5b7..40068c3ff9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -837,6 +837,15 @@ dependencies = [ "generic-array 0.14.6", ] +[[package]] +name = "circular-queue" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d34327ead1c743a10db339de35fb58957564b99d248a67985c55638b22c59b5" +dependencies = [ + "version_check 0.9.4", +] + [[package]] name = "clang-sys" version = "1.4.0" @@ -2443,17 +2452,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "jsonpath_lib" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaa63191d68230cccb81c5aa23abd53ed64d83337cacbb25a7b8c7979523774f" -dependencies = [ - "log 0.4.17", - "serde 1.0.145", - "serde_json", -] - [[package]] name = "keccak" version = "0.1.2" @@ -2929,6 +2927,7 @@ dependencies = [ "ferveo", "ferveo-common", "group-threshold-cryptography", + "hex", "ibc 0.14.0 (git+https://github.com/heliaxdev/ibc-rs?rev=9fcc1c8c19db6af50806ffe5b2f6c214adcbfd5d)", "ibc 0.14.0 (git+https://github.com/heliaxdev/ibc-rs.git?rev=f4703dfe2c1f25cc431279ab74f10f3e0f6827e2)", "ibc-proto 0.17.1 (git+https://github.com/heliaxdev/ibc-rs?rev=9fcc1c8c19db6af50806ffe5b2f6c214adcbfd5d)", @@ -2992,6 +2991,7 @@ dependencies = [ "borsh", "byte-unit", "byteorder", + "circular-queue", "clap", "color-eyre", "config", @@ -3006,7 +3006,6 @@ dependencies = [ "futures 0.3.25", "git2", "itertools", - "jsonpath_lib", "libc", "libloading", "namada", @@ -4624,7 +4623,6 @@ version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" dependencies = [ - "indexmap", "itoa", "ryu", "serde 1.0.145", diff --git a/apps/Cargo.toml b/apps/Cargo.toml index 87ad660beb..ba297bacd9 100644 --- a/apps/Cargo.toml +++ b/apps/Cargo.toml @@ -78,6 +78,7 @@ blake2b-rs = "0.2.0" borsh = "0.9.0" byte-unit = "4.0.13" byteorder = "1.4.2" +circular-queue = "0.2.6" # https://github.com/clap-rs/clap/issues/1037 clap = {git = "https://github.com/clap-rs/clap/", tag = "v3.0.0-beta.2", default-features = false, features = ["std", "suggestions", "color", "cargo"]} color-eyre = "0.5.10" @@ -92,7 +93,6 @@ flate2 = "1.0.22" file-lock = "2.0.2" futures = "0.3" itertools = "0.10.1" -jsonpath_lib = "0.3.0" libc = "0.2.97" libloading = "0.7.2" num-derive = "0.3.3" diff --git a/apps/src/lib/client/mod.rs b/apps/src/lib/client/mod.rs index a909cf9120..9807ca6a30 100644 --- a/apps/src/lib/client/mod.rs +++ b/apps/src/lib/client/mod.rs @@ -1,6 +1,5 @@ pub mod rpc; pub mod signing; pub mod tendermint_rpc_types; -mod tendermint_websocket_client; pub mod tx; pub mod utils; diff --git a/apps/src/lib/client/rpc.rs b/apps/src/lib/client/rpc.rs index ce6a542897..3331376a2c 100644 --- a/apps/src/lib/client/rpc.rs +++ b/apps/src/lib/client/rpc.rs @@ -33,6 +33,7 @@ use namada::types::key::*; use namada::types::storage::{Epoch, Key, KeySeg, PrefixValue}; use namada::types::token::{balance_key, Amount}; use namada::types::{address, storage, token}; +use tokio::time::{Duration, Instant}; use crate::cli::{self, args, Context}; use crate::client::tendermint_rpc_types::TxResponse; @@ -42,6 +43,82 @@ use crate::facade::tendermint_rpc::query::Query; use crate::facade::tendermint_rpc::{ Client, HttpClient, Order, SubscriptionClient, WebSocketClient, }; +use crate::node::ledger::events::Event; +use crate::node::ledger::rpc::Path; + +/// Query the status of a given transaction. +/// +/// If a response is not delivered until `deadline`, we exit the cli with an +/// error. +pub async fn query_tx_status( + status: TxEventQuery<'_>, + address: TendermintAddress, + deadline: Instant, +) -> Event { + const ONE_SECOND: Duration = Duration::from_secs(1); + // sleep for the duration of `backoff`, + // and update the underlying value + async fn sleep_update(query: TxEventQuery<'_>, backoff: &mut Duration) { + tracing::debug!( + ?query, + duration = ?backoff, + "Retrying tx status query after timeout", + ); + // simple linear backoff - if an event is not available, + // increase the backoff duration by one second + tokio::time::sleep(*backoff).await; + *backoff += ONE_SECOND; + } + tokio::time::timeout_at(deadline, async move { + let client = HttpClient::new(address).unwrap(); + let mut backoff = ONE_SECOND; + + loop { + let data = vec![]; + tracing::debug!(query = ?status, "Querying tx status"); + let response = match client + .abci_query(Some(status.into()), data, None, false) + .await + { + Ok(response) => response, + Err(err) => { + tracing::debug!(%err, "ABCI query failed"); + sleep_update(status, &mut backoff).await; + continue; + } + }; + let mut events = match response.code { + Code::Ok => { + match Vec::::try_from_slice(&response.value[..]) { + Ok(events) => events, + Err(err) => { + eprintln!("Error decoding the event value: {err}"); + break Err(()); + } + } + } + Code::Err(err) => { + eprintln!( + "Error in the query {} (error code {})", + response.info, err + ); + break Err(()); + } + }; + if let Some(e) = events.pop() { + // we should only have one event matching the query + break Ok(e); + } + sleep_update(status, &mut backoff).await; + } + }) + .await + .map_err(|_| { + eprintln!("Transaction status query deadline of {deadline:?} exceeded"); + }) + .and_then(|result| result) + .unwrap_or_else(|_| cli::safe_exit(1)) +} /// Query the epoch of the last committed block pub async fn query_epoch(args: args::Query) -> Epoch { @@ -1339,23 +1416,23 @@ pub async fn query_has_storage_key( } /// Represents a query for an event pertaining to the specified transaction -#[derive(Debug, Clone)] -pub enum TxEventQuery { - Accepted(String), - Applied(String), +#[derive(Debug, Copy, Clone)] +pub enum TxEventQuery<'a> { + Accepted(&'a str), + Applied(&'a str), } -impl TxEventQuery { +impl<'a> TxEventQuery<'a> { /// The event type to which this event query pertains - fn event_type(&self) -> &'static str { + fn event_type(self) -> &'static str { match self { - TxEventQuery::Accepted(_tx_hash) => "accepted", - TxEventQuery::Applied(_tx_hash) => "applied", + TxEventQuery::Accepted(_) => "accepted", + TxEventQuery::Applied(_) => "applied", } } /// The transaction to which this event query pertains - fn tx_hash(&self) -> &String { + fn tx_hash(self) -> &'a str { match self { TxEventQuery::Accepted(tx_hash) => tx_hash, TxEventQuery::Applied(tx_hash) => tx_hash, @@ -1363,9 +1440,17 @@ impl TxEventQuery { } } +impl<'a> From> for crate::facade::tendermint::abci::Path { + fn from(tx_query: TxEventQuery<'a>) -> Self { + format!("{}/{}", tx_query.event_type(), tx_query.tx_hash()) + .parse() + .expect("This operation is infallible") + } +} + /// Transaction event queries are semantically a subset of general queries -impl From for Query { - fn from(tx_query: TxEventQuery) -> Self { +impl<'a> From> for Query { + fn from(tx_query: TxEventQuery<'a>) -> Self { match tx_query { TxEventQuery::Accepted(tx_hash) => { Query::default().and_eq("accepted.hash", tx_hash) @@ -1380,14 +1465,14 @@ impl From for Query { /// Lookup the full response accompanying the specified transaction event pub async fn query_tx_response( ledger_address: &TendermintAddress, - tx_query: TxEventQuery, + tx_query: TxEventQuery<'_>, ) -> Result { // Connect to the Tendermint server holding the transactions let (client, driver) = WebSocketClient::new(ledger_address.clone()).await?; let driver_handle = tokio::spawn(async move { driver.run().await }); // Find all blocks that apply a transaction with the specified hash let blocks = &client - .block_search(Query::from(tx_query.clone()), 1, 255, Order::Ascending) + .block_search(tx_query.into(), 1, 255, Order::Ascending) .await .expect("Unable to query for transaction with given hash") .blocks; @@ -1463,7 +1548,7 @@ pub async fn query_result(_ctx: Context, args: args::QueryResult) { // First try looking up application event pertaining to given hash. let tx_response = query_tx_response( &args.query.ledger_address, - TxEventQuery::Applied(args.tx_hash.clone()), + TxEventQuery::Applied(&args.tx_hash), ) .await; match tx_response { @@ -1477,7 +1562,7 @@ pub async fn query_result(_ctx: Context, args: args::QueryResult) { // If this fails then instead look for an acceptance event. let tx_response = query_tx_response( &args.query.ledger_address, - TxEventQuery::Accepted(args.tx_hash), + TxEventQuery::Accepted(&args.tx_hash), ) .await; match tx_response { diff --git a/apps/src/lib/client/tendermint_rpc_types.rs b/apps/src/lib/client/tendermint_rpc_types.rs index 32454c0440..0e94155378 100644 --- a/apps/src/lib/client/tendermint_rpc_types.rs +++ b/apps/src/lib/client/tendermint_rpc_types.rs @@ -1,10 +1,11 @@ -use jsonpath_lib as jsonpath; +use std::convert::TryFrom; + use namada::proto::Tx; use namada::types::address::Address; use serde::Serialize; use crate::cli::safe_exit; -use crate::node::ledger::events::EventType as NamadaEventType; +use crate::node::ledger::events::Event; /// Data needed for broadcasting a tx and /// monitoring its progress on chain @@ -12,7 +13,7 @@ use crate::node::ledger::events::EventType as NamadaEventType; /// Txs may be either a dry run or else /// they should be encrypted and included /// in a wrapper. -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum TxBroadcastData { DryRun(Tx), Wrapper { @@ -34,83 +35,69 @@ pub struct TxResponse { pub initialized_accounts: Vec
, } -impl TxResponse { - /// Parse the JSON payload received from a subscription - /// - /// Searches for custom events emitted from the ledger and converts - /// them back to thin wrapper around a hashmap for further parsing. - pub fn parse( - json: serde_json::Value, - event_type: NamadaEventType, - tx_hash: &str, - ) -> Self { - let tx_hash_json = serde_json::Value::String(tx_hash.to_string()); - let mut selector = jsonpath::selector(&json); - let mut index = 0; - let evt_key = event_type.to_string(); - // Find the tx with a matching hash - let hash = loop { - if let Ok(hash) = - selector(&format!("$.events.['{}.hash'][{}]", evt_key, index)) - { - let hash = hash[0].clone(); - if hash == tx_hash_json { - break hash; - } else { - index += 1; - } - } else { - eprintln!( - "Couldn't find tx with hash {} in the event string {}", - tx_hash, json - ); - safe_exit(1) - } - }; - let info = - selector(&format!("$.events.['{}.info'][{}]", evt_key, index)) - .unwrap(); - let log = selector(&format!("$.events.['{}.log'][{}]", evt_key, index)) - .unwrap(); - let height = - selector(&format!("$.events.['{}.height'][{}]", evt_key, index)) - .unwrap(); - let code = - selector(&format!("$.events.['{}.code'][{}]", evt_key, index)) - .unwrap(); - let gas_used = - selector(&format!("$.events.['{}.gas_used'][{}]", evt_key, index)) - .unwrap(); - let initialized_accounts = selector(&format!( - "$.events.['{}.initialized_accounts'][{}]", - evt_key, index - )); - let initialized_accounts = match initialized_accounts { - Ok(values) if !values.is_empty() => { - // In a response, the initialized accounts are encoded as e.g.: - // ``` - // "applied.initialized_accounts": Array([ - // String( - // "[\"atest1...\"]", - // ), - // ]), - // ... - // So we need to decode the inner string first ... - let raw: String = - serde_json::from_value(values[0].clone()).unwrap(); - // ... and then decode the vec from the array inside the string - serde_json::from_str(&raw).unwrap() - } - _ => vec![], - }; - TxResponse { - info: serde_json::from_value(info[0].clone()).unwrap(), - log: serde_json::from_value(log[0].clone()).unwrap(), - height: serde_json::from_value(height[0].clone()).unwrap(), - hash: serde_json::from_value(hash).unwrap(), - code: serde_json::from_value(code[0].clone()).unwrap(), - gas_used: serde_json::from_value(gas_used[0].clone()).unwrap(), - initialized_accounts, +impl TryFrom for TxResponse { + type Error = String; + + fn try_from(event: Event) -> Result { + fn missing_field_err(field: &str) -> String { + format!("Field \"{field}\" not present in event") } + + let hash = event + .get("hash") + .ok_or_else(|| missing_field_err("hash"))? + .clone(); + let info = event + .get("info") + .ok_or_else(|| missing_field_err("info"))? + .clone(); + let log = event + .get("log") + .ok_or_else(|| missing_field_err("log"))? + .clone(); + let height = event + .get("height") + .ok_or_else(|| missing_field_err("height"))? + .clone(); + let code = event + .get("code") + .ok_or_else(|| missing_field_err("code"))? + .clone(); + let gas_used = event + .get("gas_used") + .ok_or_else(|| missing_field_err("gas_used"))? + .clone(); + let initialized_accounts = event + .get("initialized_accounts") + .map(String::as_str) + // TODO: fix finalize block, to return initialized accounts, + // even when we reject a tx? + .or(Some("[]")) + // NOTE: at this point we only have `Some(vec)`, not `None` + .ok_or_else(|| unreachable!()) + .and_then(|initialized_accounts| { + serde_json::from_str(initialized_accounts) + .map_err(|err| format!("JSON decode error: {err}")) + })?; + + Ok(TxResponse { + hash, + info, + log, + height, + code, + gas_used, + initialized_accounts, + }) + } +} + +impl TxResponse { + /// Convert an [`Event`] to a [`TxResponse`], or error out. + pub fn from_event(event: Event) -> Self { + event.try_into().unwrap_or_else(|err| { + eprintln!("Error fetching TxResponse: {err}"); + safe_exit(1); + }) } } diff --git a/apps/src/lib/client/tendermint_websocket_client.rs b/apps/src/lib/client/tendermint_websocket_client.rs deleted file mode 100644 index 61bea1d990..0000000000 --- a/apps/src/lib/client/tendermint_websocket_client.rs +++ /dev/null @@ -1,712 +0,0 @@ -use std::collections::HashMap; -use std::convert::TryFrom; -use std::fmt::{Display, Formatter}; -use std::net::TcpStream; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -use async_trait::async_trait; -use thiserror::Error; -use tokio::time::Instant; -use websocket::result::WebSocketError; -use websocket::{ClientBuilder, Message, OwnedMessage}; - -use crate::facade::tendermint_config::net::Address; -use crate::facade::tendermint_rpc::query::Query; -use crate::facade::tendermint_rpc::{ - Client, Error as RpcError, Request, Response, SimpleRequest, -}; - -#[derive(Error, Debug)] -pub enum Error { - #[error("Could not convert into websocket address: {0:?}")] - Address(Address), - #[error("Websocket Error: {0:?}")] - Websocket(WebSocketError), - #[error("Failed to subscribe to the event: {0}")] - Subscribe(String), - #[error("Failed to unsubscribe to the event: {0}")] - Unsubscribe(String), - #[error("Unexpected response from query: {0:?}")] - UnexpectedResponse(OwnedMessage), - #[error("More then one subscription at a time is not supported")] - AlreadySubscribed, - #[error("Cannot wait on a response if not subscribed to an event")] - NotSubscribed, - #[error("Received an error response: {0}")] - Response(String), - #[error("Encountered JSONRPC request/response without an id")] - MissingId, - #[error("Connection timed out")] - ConnectionTimeout, - #[error("Received malformed JSON from websocket: {0:?}")] - MalformedJson(crate::node::ledger::events::Error), - #[error("Event for transaction {0} was not received")] - MissingEvent(String), -} - -type Json = serde_json::Value; - -/// Module that brings in the basic building blocks from tendermint_rpc -/// and adds the necessary functionality and wrappers to them. -mod rpc_types { - use std::collections::HashMap; - use std::fmt; - use std::str::FromStr; - - use serde::{de, Deserialize, Serialize, Serializer}; - - use super::Json; - use crate::facade::tendermint_rpc::method::Method; - use crate::facade::tendermint_rpc::query::{EventType, Query}; - use crate::facade::tendermint_rpc::{request, response}; - - #[derive(Debug, Deserialize, Serialize)] - pub struct RpcRequest { - #[serde(skip_serializing)] - method: Method, - params: HashMap, - } - - #[derive(Debug, Deserialize, Serialize)] - pub enum SubscribeType { - Subscribe, - Unsubscribe, - } - - #[derive(Debug, Deserialize, Serialize)] - pub struct RpcSubscription( - #[serde(skip_serializing)] pub SubscribeType, - #[serde(serialize_with = "serialize_query")] - #[serde(deserialize_with = "deserialize_query")] - pub Query, - ); - - pub(super) fn serialize_query( - query: &Query, - serialize: S, - ) -> Result - where - S: Serializer, - { - serialize.serialize_str(&query.to_string()) - } - - pub(super) fn deserialize_query<'de, D>( - deserializer: D, - ) -> Result - where - D: de::Deserializer<'de>, - { - struct QueryVisitor; - - impl<'de> de::Visitor<'de> for QueryVisitor { - type Value = Query; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str( - "a string of params from a valid Tendermint RPC query", - ) - } - - fn visit_str(self, v: &str) -> Result - where - E: de::Error, - { - match EventType::from_str(v) { - Ok(event) => Ok(Query::from(event)), - Err(error) => { - Err(de::Error::custom(format!("{:?}", error))) - } - } - } - } - deserializer.deserialize_any(QueryVisitor) - } - - /// This type is required by the tendermint_rs traits but we - /// cannot use it due to a bug in the RPC responses from - /// tendermint - #[derive(Debug, Deserialize, Serialize)] - #[serde(transparent)] - pub struct RpcResponse(pub Json); - - impl response::Response for RpcResponse {} - - impl request::Request for RpcRequest { - type Response = RpcResponse; - - fn method(&self) -> Method { - self.method - } - } - - impl request::Request for RpcSubscription { - type Response = RpcResponse; - - fn method(&self) -> Method { - match self.0 { - SubscribeType::Subscribe => Method::Subscribe, - SubscribeType::Unsubscribe => Method::Unsubscribe, - } - } - } -} - -pub struct WebSocketAddress { - host: String, - port: u16, -} - -impl TryFrom
for WebSocketAddress { - type Error = Error; - - fn try_from(value: Address) -> Result { - match value { - Address::Tcp { host, port, .. } => Ok(Self { host, port }), - _ => Err(Error::Address(value)), - } - } -} - -impl Display for WebSocketAddress { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "ws://{}:{}/websocket", self.host, self.port) - } -} - -use rpc_types::{RpcResponse, RpcSubscription, SubscribeType}; - -/// We need interior mutability since the `perform` method of the `Client` -/// trait from `tendermint_rpc` only takes `&self` as an argument -/// Furthermore, TendermintWebsocketClient must be `Send` since it will be -/// used in async methods -type Websocket = Arc>>; -type ResponseQueue = Arc>>; - -struct Subscription { - id: String, - query: Query, -} - -pub struct TendermintWebsocketClient { - websocket: Websocket, - subscribed: Option, - received_responses: ResponseQueue, - connection_timeout: Duration, -} - -impl TendermintWebsocketClient { - /// Open up a new websocket given a specified URL. - /// If no `connection_timeout` is given, defaults to 5 minutes. - pub fn open( - url: WebSocketAddress, - connection_timeout: Option, - ) -> Result { - match ClientBuilder::new(&url.to_string()) - .unwrap() - .connect_insecure() - { - Ok(websocket) => Ok(Self { - websocket: Arc::new(Mutex::new(websocket)), - subscribed: None, - received_responses: Arc::new(Mutex::new(HashMap::new())), - connection_timeout: connection_timeout - .unwrap_or_else(|| Duration::new(300, 0)), - }), - Err(inner) => Err(Error::Websocket(inner)), - } - } - - /// Shutdown the client. Can still be reused afterwards - pub fn close(&mut self) { - // Even in the case of errors, this will be shutdown - let _ = self.websocket.lock().unwrap().shutdown(); - self.subscribed = None; - self.received_responses.lock().unwrap().clear(); - } - - /// Subscribes to an event specified by the query argument. - pub fn subscribe(&mut self, query: Query) -> Result<(), Error> { - // We do not support more than one subscription currently - // This can be fixed by correlating on ids later - if self.subscribed.is_some() { - return Err(Error::AlreadySubscribed); - } - // send the subscription request - let message = RpcSubscription(SubscribeType::Subscribe, query.clone()) - .into_json(); - let msg_id = get_id(&message).unwrap(); - - self.websocket - .lock() - .unwrap() - .send_message(&Message::text(&message)) - .map_err(Error::Websocket)?; - - // check that the request was received and a success message returned - match self.process_response(|_| Error::Subscribe(message), None) { - Ok(_) => { - self.subscribed = Some(Subscription { id: msg_id, query }); - Ok(()) - } - Err(err) => Err(err), - } - } - - /// Receive a response from the subscribed event or - /// process the response if it has already been received - pub fn receive_response(&self) -> Result { - if let Some(Subscription { id, .. }) = &self.subscribed { - let response = self.process_response( - Error::Response, - self.received_responses.lock().unwrap().remove(id), - )?; - Ok(response) - } else { - Err(Error::NotSubscribed) - } - } - - /// Unsubscribe from the currently subscribed event - /// Note that even if an error is returned, the client - /// will return to an unsubscribed state - pub fn unsubscribe(&mut self) -> Result<(), Error> { - match self.subscribed.take() { - Some(Subscription { query, .. }) => { - // send the subscription request - let message = - RpcSubscription(SubscribeType::Unsubscribe, query) - .into_json(); - - self.websocket - .lock() - .unwrap() - .send_message(&Message::text(&message)) - .map_err(Error::Websocket)?; - // empty out the message queue. Should be empty already - self.received_responses.lock().unwrap().clear(); - // check that the request was received and a success message - // returned - match self - .process_response(|_| Error::Unsubscribe(message), None) - { - Ok(_) => Ok(()), - Err(err) => Err(err), - } - } - _ => Err(Error::NotSubscribed), - } - } - - /// Process the next response received and handle any exceptions that - /// may have occurred. Takes a function to map response to an error - /// as a parameter. - /// - /// Optionally, the response may have been received earlier while - /// handling a different request. In that case, we process it - /// now. - fn process_response( - &self, - f: F, - received: Option, - ) -> Result - where - F: FnOnce(String) -> Error, - { - let resp = match received { - Some(resp) => OwnedMessage::Text(resp), - None => { - let mut websocket = self.websocket.lock().unwrap(); - let start = Instant::now(); - loop { - if Instant::now().duration_since(start) - > self.connection_timeout - { - tracing::error!( - "Websocket connection timed out while waiting for \ - response" - ); - return Err(Error::ConnectionTimeout); - } - match websocket.recv_message().map_err(Error::Websocket)? { - text @ OwnedMessage::Text(_) => break text, - OwnedMessage::Ping(data) => { - tracing::debug!( - "Received websocket Ping, sending Pong" - ); - websocket - .send_message(&OwnedMessage::Pong(data)) - .unwrap(); - continue; - } - OwnedMessage::Pong(_) => { - tracing::debug!( - "Received websocket Pong, ignoring" - ); - continue; - } - other => return Err(Error::UnexpectedResponse(other)), - } - } - } - }; - match resp { - OwnedMessage::Text(raw) => RpcResponse::from_string(raw) - .map(|v| v.0) - .map_err(|e| f(e.to_string())), - other => Err(Error::UnexpectedResponse(other)), - } - } -} - -#[async_trait] -impl Client for TendermintWebsocketClient { - async fn perform(&self, request: R) -> Result - where - R: SimpleRequest, - { - // send the subscription request - // Return an empty response if the request fails to send - let req_json = request.into_json(); - let req_id = get_id(&req_json).unwrap(); - if let Err(error) = self - .websocket - .lock() - .unwrap() - .send_message(&Message::text(&req_json)) - { - tracing::info! { - "Unable to send request: {}\nReceived Error: {:?}", - &req_json, - error - }; - return ::Response::from_string(""); - } - - // Return the response if text is returned, else return empty response - let mut websocket = self.websocket.lock().unwrap(); - let start = Instant::now(); - loop { - let duration = Instant::now().duration_since(start); - if duration > self.connection_timeout { - tracing::error!( - "Websocket connection timed out while waiting for response" - ); - return Err(RpcError::web_socket_timeout(duration)); - } - let response = match websocket - .recv_message() - .expect("Failed to receive message from websocket") - { - OwnedMessage::Text(resp) => resp, - OwnedMessage::Ping(data) => { - tracing::debug!("Received websocket Ping, sending Pong"); - websocket.send_message(&OwnedMessage::Pong(data)).unwrap(); - continue; - } - OwnedMessage::Pong(_) => { - tracing::debug!("Received websocket Pong, ignoring"); - continue; - } - other => { - tracing::info! { - "Received unexpected response to query: {}\nReceived {:?}", - &req_json, - other - }; - String::from("") - } - }; - // Check that we did not accidentally get a response for a - // subscription. If so, store it for later - if let Ok(resp_id) = get_id(&response) { - if resp_id != req_id { - self.received_responses - .lock() - .unwrap() - .insert(resp_id, response); - } else { - return ::Response::from_string(response); - } - } else { - // got an invalid response, just return nothing - return ::Response::from_string(response); - }; - } - } -} - -fn get_id(req_json: &str) -> Result { - if let serde_json::Value::Object(req) = - serde_json::from_str(req_json).unwrap() - { - req.get("id").ok_or(Error::MissingId).map(|v| v.to_string()) - } else { - Err(Error::MissingId) - } -} - -/// The TendermintWebsocketClient has a basic state machine for ensuring -/// at most one subscription at a time. These tests cover that it -/// works as intended. -/// -/// Furthermore, since a client can handle a subscription and a -/// simple request simultaneously, we must test that the correct -/// responses are give for each of the corresponding requests -#[cfg(test)] -mod test_tendermint_websocket_client { - use std::time::Duration; - - use namada::types::transaction::hash_tx as hash_tx_bytes; - use serde::{Deserialize, Serialize}; - use websocket::sync::Server; - use websocket::{Message, OwnedMessage}; - - use crate::client::tendermint_websocket_client::{ - TendermintWebsocketClient, WebSocketAddress, - }; - use crate::facade::tendermint::abci::transaction; - use crate::facade::tendermint_rpc::endpoint::abci_info::AbciInfo; - use crate::facade::tendermint_rpc::query::{EventType, Query}; - use crate::facade::tendermint_rpc::Client; - - #[derive(Debug, Deserialize, Serialize)] - #[serde(rename_all = "snake_case")] - pub enum ReqType { - Subscribe, - Unsubscribe, - AbciInfo, - } - - #[derive(Debug, Deserialize, Serialize)] - pub struct RpcRequest { - pub jsonrpc: String, - pub id: String, - pub method: ReqType, - pub params: Option>, - } - - fn address() -> WebSocketAddress { - WebSocketAddress { - host: "localhost".into(), - port: 26657, - } - } - - #[derive(Default)] - struct Handle { - subscription_id: Option, - } - - impl Handle { - /// Mocks responses to queries. Fairly arbitrary with just enough - /// variety to test the TendermintWebsocketClient state machine and - /// message synchronization - fn handle(&mut self, msg: String) -> Vec { - let id = super::get_id(&msg).unwrap(); - let request: RpcRequest = serde_json::from_str(&msg).unwrap(); - match request.method { - ReqType::Unsubscribe => { - self.subscription_id = None; - vec![format!( - r#"{{"jsonrpc": "2.0", "id": {}, "error": "error"}}"#, - id - )] - } - ReqType::Subscribe => { - self.subscription_id = Some(id); - let id = self.subscription_id.as_ref().unwrap(); - if request.params.unwrap()[0] - == Query::from(EventType::NewBlock).to_string() - { - vec![format!( - r#"{{"jsonrpc": "2.0", "id": {}, "error": "error"}}"#, - id - )] - } else { - vec![format!( - r#"{{"jsonrpc": "2.0", "id": {}, "result": {{}}}}"#, - id - )] - } - } - ReqType::AbciInfo => { - // Mock a subscription result returning on the wire before - // the simple request result - let info = AbciInfo { - last_block_app_hash: transaction::Hash::new( - hash_tx_bytes("Testing".as_bytes()).0, - ) - .as_ref() - .into(), - ..AbciInfo::default() - }; - let resp = serde_json::to_string(&info).unwrap(); - if let Some(prev_id) = self.subscription_id.take() { - vec![ - format!( - r#"{{"jsonrpc": "2.0", "id": {}, "result": {{"subscription": "result!"}}}}"#, - prev_id - ), - format!( - r#"{{"jsonrpc": "2.0", "id": {}, "result": {{"response": {}}}}}"#, - id, resp - ), - ] - } else { - vec![format!( - r#"{{"jsonrpc": "2.0", "id": {}, "result": {{"response": {}}}}}"#, - id, resp - )] - } - } - } - } - } - - /// A mock tendermint node. This is just a basic websocket server - /// TODO: When the thread drops from scope, we may get an ignorable - /// panic as we did not shut the loop down. But we should. - fn start() { - let node = Server::bind("localhost:26657").unwrap(); - for connection in node.filter_map(Result::ok) { - std::thread::spawn(move || { - let mut handler = Handle::default(); - let mut client = connection.accept().unwrap(); - loop { - for resp in match client.recv_message().unwrap() { - OwnedMessage::Text(msg) => handler.handle(msg), - _ => panic!("Unexpected request"), - } { - let msg = Message::text(resp); - let _ = client.send_message(&msg); - } - } - }); - } - } - - /// Test that we cannot subscribe to a new event - /// if we have an active subscription - #[test] - fn test_subscribe_twice() { - std::thread::spawn(start); - // need to make sure that the mock tendermint node has time to boot up - std::thread::sleep(std::time::Duration::from_secs(1)); - let mut rpc_client = TendermintWebsocketClient::open( - address(), - Some(Duration::new(10, 0)), - ) - .expect("Client could not start"); - // Check that subscription was successful - rpc_client.subscribe(Query::from(EventType::Tx)).unwrap(); - assert_eq!( - rpc_client.subscribed.as_ref().expect("Test failed").query, - Query::from(EventType::Tx) - ); - // Check that we cannot subscribe while we still have an active - // subscription - assert!(rpc_client.subscribe(Query::from(EventType::Tx)).is_err()); - } - - /// Test that even if there is an error on the protocol layer, - /// the client still unsubscribes and returns control - #[test] - fn test_unsubscribe_even_on_protocol_error() { - std::thread::spawn(start); - // need to make sure that the mock tendermint node has time to boot up - std::thread::sleep(std::time::Duration::from_secs(1)); - let mut rpc_client = TendermintWebsocketClient::open( - address(), - Some(Duration::new(10, 0)), - ) - .expect("Client could not start"); - // Check that subscription was successful - rpc_client.subscribe(Query::from(EventType::Tx)).unwrap(); - assert_eq!( - rpc_client.subscribed.as_ref().expect("Test failed").query, - Query::from(EventType::Tx) - ); - // Check that unsubscribe was successful even though it returned an - // error - assert!(rpc_client.unsubscribe().is_err()); - assert!(rpc_client.subscribed.is_none()); - } - - /// Test that if we unsubscribe from an event, we can - /// reuse the client to subscribe to a new event - #[test] - fn test_subscribe_after_unsubscribe() { - std::thread::spawn(start); - // need to make sure that the mock tendermint node has time to boot up - std::thread::sleep(std::time::Duration::from_secs(1)); - let mut rpc_client = TendermintWebsocketClient::open( - address(), - Some(Duration::new(10, 0)), - ) - .expect("Client could not start"); - // Check that subscription was successful - rpc_client.subscribe(Query::from(EventType::Tx)).unwrap(); - assert_eq!( - rpc_client.subscribed.as_ref().expect("Test failed").query, - Query::from(EventType::Tx) - ); - // Check that unsubscribe was successful - let _ = rpc_client.unsubscribe(); - assert!(rpc_client.subscribed.as_ref().is_none()); - // Check that we can now subscribe to new event - rpc_client.subscribe(Query::from(EventType::Tx)).unwrap(); - assert_eq!( - rpc_client.subscribed.expect("Test failed").query, - Query::from(EventType::Tx) - ); - } - - /// In this test we first subscribe to an event and then - /// make a simple request. - /// - /// The mock node is set up so that while the request is waiting - /// for its response, it receives the response for the subscription. - /// - /// This test checks that methods correctly return the correct - /// responses. - #[test] - fn test_subscription_returns_before_request_handled() { - std::thread::spawn(start); - // need to make sure that the mock tendermint node has time to boot up - std::thread::sleep(std::time::Duration::from_secs(1)); - let mut rpc_client = TendermintWebsocketClient::open( - address(), - Some(Duration::new(10, 0)), - ) - .expect("Client could not start"); - // Check that subscription was successful - rpc_client.subscribe(Query::from(EventType::Tx)).unwrap(); - assert_eq!( - rpc_client.subscribed.as_ref().expect("Test failed").query, - Query::from(EventType::Tx) - ); - // Check that there are no pending subscription responses - assert!(rpc_client.received_responses.lock().unwrap().is_empty()); - // If the wrong response is returned, json deserialization will fail the - // test - let _ = - tokio_test::block_on(rpc_client.abci_info()).expect("Test failed"); - // Check that we received the subscription response and it has been - // stored - assert!( - rpc_client - .received_responses - .lock() - .unwrap() - .contains_key(&rpc_client.subscribed.as_ref().unwrap().id) - ); - - // check that we receive the expected response to the subscription - let response = rpc_client.receive_response().expect("Test failed"); - assert_eq!(response.to_string(), r#"{"subscription":"result!"}"#); - // Check that there are no pending subscription responses - assert!(rpc_client.received_responses.lock().unwrap().is_empty()); - } -} diff --git a/apps/src/lib/client/tx.rs b/apps/src/lib/client/tx.rs index 0d369ba6b7..a0ec44ee89 100644 --- a/apps/src/lib/client/tx.rs +++ b/apps/src/lib/client/tx.rs @@ -1,8 +1,6 @@ use std::borrow::Cow; -use std::convert::TryFrom; use std::env; use std::fs::File; -use std::time::Duration; use async_std::io::prelude::WriteExt; use async_std::io::{self}; @@ -23,24 +21,19 @@ use namada::types::transaction::governance::{ use namada::types::transaction::{pos, InitAccount, InitValidator, UpdateVp}; use namada::types::{address, storage, token}; use namada::{ledger, vm}; +use tokio::time::{Duration, Instant}; use super::rpc; use crate::cli::context::WalletAddress; use crate::cli::{args, safe_exit, Context}; use crate::client::signing::{find_keypair, sign_tx}; use crate::client::tendermint_rpc_types::{TxBroadcastData, TxResponse}; -use crate::client::tendermint_websocket_client::{ - Error as WsError, TendermintWebsocketClient, WebSocketAddress, -}; use crate::facade::tendermint_config::net::Address as TendermintAddress; use crate::facade::tendermint_rpc::endpoint::broadcast::tx_sync::Response; -use crate::facade::tendermint_rpc::query::{EventType, Query}; +use crate::facade::tendermint_rpc::error::Error as RpcError; use crate::facade::tendermint_rpc::{Client, HttpClient}; -use crate::node::ledger::events::EventType as NamadaEventType; use crate::node::ledger::tendermint_node; -const ACCEPTED_QUERY_KEY: &str = "accepted.hash"; -const APPLIED_QUERY_KEY: &str = "applied.hash"; const TX_INIT_ACCOUNT_WASM: &str = "tx_init_account.wasm"; const TX_INIT_VALIDATOR_WASM: &str = "tx_init_validator.wasm"; const TX_INIT_PROPOSAL: &str = "tx_init_proposal.wasm"; @@ -52,8 +45,14 @@ const TX_BOND_WASM: &str = "tx_bond.wasm"; const TX_UNBOND_WASM: &str = "tx_unbond.wasm"; const TX_WITHDRAW_WASM: &str = "tx_withdraw.wasm"; -const ENV_VAR_ANOMA_TENDERMINT_WEBSOCKET_TIMEOUT: &str = - "ANOMA_TENDERMINT_WEBSOCKET_TIMEOUT"; +/// Timeout for requests to the `/accepted` and `/applied` +/// ABCI query endpoints. +const ENV_VAR_NAMADA_EVENTS_MAX_WAIT_TIME_SECONDS: &str = + "NAMADA_EVENTS_MAX_WAIT_TIME_SECONDS"; + +/// Default timeout in seconds for requests to the `/accepted` +/// and `/applied` ABCI query endpoints. +const DEFAULT_NAMADA_EVENTS_MAX_WAIT_TIME_SECONDS: u64 = 60; pub async fn submit_custom(ctx: Context, args: args::TxCustom) { let tx_code = ctx.read_wasm(args.code_path); @@ -1134,7 +1133,7 @@ async fn save_initialized_accounts( pub async fn broadcast_tx( address: TendermintAddress, to_broadcast: &TxBroadcastData, -) -> Result { +) -> Result { let (tx, wrapper_tx_hash, decrypted_tx_hash) = match to_broadcast { TxBroadcastData::Wrapper { tx, @@ -1144,28 +1143,17 @@ pub async fn broadcast_tx( _ => panic!("Cannot broadcast a dry-run transaction"), }; - let websocket_timeout = - if let Ok(val) = env::var(ENV_VAR_ANOMA_TENDERMINT_WEBSOCKET_TIMEOUT) { - if let Ok(timeout) = val.parse::() { - Duration::new(timeout, 0) - } else { - Duration::new(300, 0) - } - } else { - Duration::new(300, 0) - }; + tracing::debug!( + tendermint_rpc_address = ?address, + transaction = ?to_broadcast, + "Broadcasting transaction", + ); + let rpc_cli = HttpClient::new(address)?; - let mut wrapper_tx_subscription = TendermintWebsocketClient::open( - WebSocketAddress::try_from(address.clone())?, - Some(websocket_timeout), - )?; - - let response = wrapper_tx_subscription - .broadcast_tx_sync(tx.to_bytes().into()) - .await - .map_err(|err| WsError::Response(format!("{:?}", err)))?; - - wrapper_tx_subscription.close(); + // TODO: configure an explicit timeout value? we need to hack away at + // `tendermint-rs` for this, which is currently using a hard-coded 30s + // timeout. + let response = rpc_cli.broadcast_tx_sync(tx.to_bytes().into()).await?; if response.code == 0.into() { println!("Transaction added to mempool: {:?}", response); @@ -1177,7 +1165,7 @@ pub async fn broadcast_tx( } Ok(response) } else { - Err(WsError::Response(response.log.to_string())) + Err(RpcError::server(serde_json::to_string(&response).unwrap())) } } @@ -1192,7 +1180,7 @@ pub async fn broadcast_tx( pub async fn submit_tx( address: TendermintAddress, to_broadcast: TxBroadcastData, -) -> Result { +) -> Result { let (_, wrapper_hash, decrypted_hash) = match &to_broadcast { TxBroadcastData::Wrapper { tx, @@ -1202,52 +1190,30 @@ pub async fn submit_tx( _ => panic!("Cannot broadcast a dry-run transaction"), }; - let websocket_timeout = - if let Ok(val) = env::var(ENV_VAR_ANOMA_TENDERMINT_WEBSOCKET_TIMEOUT) { - if let Ok(timeout) = val.parse::() { - Duration::new(timeout, 0) - } else { - Duration::new(300, 0) - } - } else { - Duration::new(300, 0) - }; - tracing::debug!("Tenderming address: {:?}", address); - let mut wrapper_tx_subscription = TendermintWebsocketClient::open( - WebSocketAddress::try_from(address.clone())?, - Some(websocket_timeout), - )?; - - // It is better to subscribe to the transaction before it is broadcast - // - // Note that the `APPLIED_QUERY_KEY` key comes from a custom event - // created by the shell - let query = Query::from(EventType::NewBlock) - .and_eq(ACCEPTED_QUERY_KEY, wrapper_hash.as_str()); - wrapper_tx_subscription.subscribe(query)?; - - // We also subscribe to the event emitted when the encrypted - // payload makes its way onto the blockchain - let mut decrypted_tx_subscription = { - let mut decrypted_tx_subscription = TendermintWebsocketClient::open( - WebSocketAddress::try_from(address.clone())?, - Some(websocket_timeout), - )?; - let query = Query::from(EventType::NewBlock) - .and_eq(APPLIED_QUERY_KEY, decrypted_hash.as_str()); - decrypted_tx_subscription.subscribe(query)?; - decrypted_tx_subscription - }; - // Broadcast the supplied transaction - broadcast_tx(address, &to_broadcast).await?; + broadcast_tx(address.clone(), &to_broadcast).await?; + + let max_wait_time = Duration::from_secs( + env::var(ENV_VAR_NAMADA_EVENTS_MAX_WAIT_TIME_SECONDS) + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_NAMADA_EVENTS_MAX_WAIT_TIME_SECONDS), + ); + let deadline = Instant::now() + max_wait_time; + + tracing::debug!( + tendermint_rpc_address = ?address, + transaction = ?to_broadcast, + ?deadline, + "Awaiting transaction approval", + ); let parsed = { - let parsed = TxResponse::parse( - wrapper_tx_subscription.receive_response()?, - NamadaEventType::Accepted, - wrapper_hash, - ); + let wrapper_query = rpc::TxEventQuery::Accepted(wrapper_hash.as_str()); + let event = + rpc::query_tx_status(wrapper_query, address.clone(), deadline) + .await; + let parsed = TxResponse::from_event(event); println!( "Transaction accepted with result: {}", @@ -1256,11 +1222,13 @@ pub async fn submit_tx( // The transaction is now on chain. We wait for it to be decrypted // and applied if parsed.code == 0.to_string() { - let parsed = TxResponse::parse( - decrypted_tx_subscription.receive_response()?, - NamadaEventType::Applied, - decrypted_hash.as_str(), - ); + // We also listen to the event emitted when the encrypted + // payload makes its way onto the blockchain + let decrypted_query = + rpc::TxEventQuery::Applied(decrypted_hash.as_str()); + let event = + rpc::query_tx_status(decrypted_query, address, deadline).await; + let parsed = TxResponse::from_event(event); println!( "Transaction applied with result: {}", serde_json::to_string_pretty(&parsed).unwrap() @@ -1271,9 +1239,10 @@ pub async fn submit_tx( } }; - wrapper_tx_subscription.unsubscribe()?; - wrapper_tx_subscription.close(); - decrypted_tx_subscription.unsubscribe()?; - decrypted_tx_subscription.close(); + tracing::debug!( + transaction = ?to_broadcast, + "Transaction approved", + ); + parsed } diff --git a/apps/src/lib/node/ledger/events.rs b/apps/src/lib/node/ledger/events.rs index 3adefc59d7..0a8a50ad60 100644 --- a/apps/src/lib/node/ledger/events.rs +++ b/apps/src/lib/node/ledger/events.rs @@ -1,9 +1,11 @@ +pub mod log; + use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::{self, Display}; use std::ops::{Index, IndexMut}; -use borsh::BorshSerialize; +use borsh::{BorshDeserialize, BorshSerialize}; use namada::ledger::governance::utils::ProposalEvent; use namada::types::ibc::IbcEvent; use namada::types::transaction::{hash_tx, TxType}; @@ -13,7 +15,7 @@ use crate::facade::tendermint_proto::abci::EventAttribute; /// Indicates if an event is emitted do to /// an individual Tx or the nature of a finalized block -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub enum EventLevel { Block, Tx, @@ -21,7 +23,7 @@ pub enum EventLevel { /// Custom events that can be queried from Tendermint /// using a websocket client -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub struct Event { pub event_type: EventType, pub level: EventLevel, @@ -29,7 +31,7 @@ pub struct Event { } /// The two types of custom events we currently use -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub enum EventType { // The transaction was accepted to be included in a block Accepted, diff --git a/apps/src/lib/node/ledger/events/log.rs b/apps/src/lib/node/ledger/events/log.rs new file mode 100644 index 0000000000..f3e655469a --- /dev/null +++ b/apps/src/lib/node/ledger/events/log.rs @@ -0,0 +1,198 @@ +//! A log to store events emitted by `FinalizeBlock` calls in the ledger. +//! +//! The log can only hold `N` events at a time, where `N` is a configurable +//! parameter. If the log is holding `N` events, and a new event is logged, +//! old events are pruned. + +use std::default::Default; + +use circular_queue::CircularQueue; + +use crate::node::ledger::events::Event; + +pub mod dumb_queries; + +/// Parameters to configure the pruning of the event log. +#[derive(Debug, Copy, Clone)] +pub struct Params { + /// Soft limit on the maximum number of events the event log can hold. + /// + /// If the number of events in the log exceeds this value, the log + /// will be pruned. + pub max_log_events: usize, +} + +impl Default for Params { + fn default() -> Self { + // TODO: tune the default params + Self { + max_log_events: 50000, + } + } +} + +/// Represents a log of [`Event`] instances emitted by +/// `FinalizeBlock` calls, in the ledger. +#[derive(Debug)] +pub struct EventLog { + queue: CircularQueue, +} + +impl Default for EventLog { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl EventLog { + /// Return a new event log. + pub fn new(params: Params) -> Self { + Self { + queue: CircularQueue::with_capacity(params.max_log_events), + } + } + + /// Log a new batch of events into the event log. + pub fn log_events(&mut self, events: E) + where + E: IntoIterator, + { + let mut num_entries = 0; + for event in events.into_iter() { + self.queue.push(event); + num_entries += 1; + } + tracing::debug!(num_entries, "Added new entries to the event log"); + } + + /// Returns a new iterator over this [`EventLog`]. + #[inline] + pub fn iter_with_matcher( + &self, + matcher: dumb_queries::QueryMatcher, + ) -> impl Iterator { + self.queue + .iter() + .filter(move |&event| matcher.matches(event)) + } +} + +#[cfg(test)] +mod tests { + use namada::types::hash::Hash; + + use super::*; + use crate::node::ledger::events::{EventLevel, EventType}; + + const HASH: &str = + "DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF"; + + /// An accepted tx hash query. + macro_rules! accepted { + ($hash:expr) => { + dumb_queries::QueryMatcher::accepted(Hash::try_from($hash).unwrap()) + }; + } + + /// Return a vector of mock `FinalizeBlock` events. + fn mock_tx_events(hash: &str) -> Vec { + let event_1 = Event { + event_type: EventType::Accepted, + level: EventLevel::Block, + attributes: { + let mut attrs = std::collections::HashMap::new(); + attrs.insert("hash".to_string(), hash.to_string()); + attrs + }, + }; + let event_2 = Event { + event_type: EventType::Applied, + level: EventLevel::Block, + attributes: { + let mut attrs = std::collections::HashMap::new(); + attrs.insert("hash".to_string(), hash.to_string()); + attrs + }, + }; + vec![event_1, event_2] + } + + /// Test adding a couple of events to the event log, and + /// reading those events back. + #[test] + fn test_log_add() { + const NUM_HEIGHTS: usize = 4; + + let mut log = EventLog::new(Params::default()); + + // add new events to the log + let events = mock_tx_events(HASH); + + for _ in 0..NUM_HEIGHTS { + log.log_events(events.clone()); + } + + // inspect log + let events_in_log: Vec<_> = + log.iter_with_matcher(accepted!(HASH)).cloned().collect(); + + assert_eq!(events_in_log.len(), NUM_HEIGHTS); + + for event in events_in_log { + assert_eq!(events[0], event); + } + } + + /// Test pruning old events from the log. + #[test] + fn test_log_prune() { + const LOG_CAP: usize = 4; + + // log cap has to be a multiple of two + // for this test + if LOG_CAP < 2 || LOG_CAP & 1 != 0 { + panic!(); + } + + const MATCHED_EVENTS: usize = LOG_CAP / 2; + + let mut log = EventLog::new(Params { + max_log_events: LOG_CAP, + }); + + // completely fill the log with events + // + // `mock_tx_events` returns 2 events, so + // we do `LOG_CAP / 2` iters to fill the log + let events = mock_tx_events(HASH); + assert_eq!(events.len(), 2); + + for _ in 0..(LOG_CAP / 2) { + log.log_events(events.clone()); + } + + // inspect log - it should be full + let events_in_log: Vec<_> = + log.iter_with_matcher(accepted!(HASH)).cloned().collect(); + + assert_eq!(events_in_log.len(), MATCHED_EVENTS); + + for event in events_in_log { + assert_eq!(events[0], event); + } + + // add a new APPLIED event to the log, + // pruning the first ACCEPTED event we added + log.log_events(Some(events[1].clone())); + + let events_in_log: Vec<_> = + log.iter_with_matcher(accepted!(HASH)).cloned().collect(); + + const ACCEPTED_EVENTS: usize = MATCHED_EVENTS - 1; + assert_eq!(events_in_log.len(), ACCEPTED_EVENTS); + + for event in events_in_log { + assert_eq!(events[0], event); + } + } +} diff --git a/apps/src/lib/node/ledger/events/log/dumb_queries.rs b/apps/src/lib/node/ledger/events/log/dumb_queries.rs new file mode 100644 index 0000000000..d7e2c51630 --- /dev/null +++ b/apps/src/lib/node/ledger/events/log/dumb_queries.rs @@ -0,0 +1,108 @@ +//! Silly simple Tendermint query parser. +//! +//! This parser will only work with simple queries of the form: +//! +//! ```text +//! tm.event='NewBlock' AND .<$attr>='<$value>' +//! ``` + +use namada::types::hash::Hash; + +use crate::node::ledger::events::{Event, EventType}; + +/// A [`QueryMatcher`] verifies if a Namada event matches a +/// given Tendermint query. +#[derive(Debug, Clone)] +pub struct QueryMatcher { + event_type: EventType, + attr: String, + value: Hash, +} + +impl QueryMatcher { + /// Checks if this [`QueryMatcher`] validates the + /// given [`Event`]. + pub fn matches(&self, event: &Event) -> bool { + event.event_type == self.event_type + && event + .attributes + .get(&self.attr) + .and_then(|value| { + value + .as_str() + .try_into() + .map(|v: Hash| v == self.value) + .ok() + }) + .unwrap_or_default() + } + + /// Returns a query matching the given accepted transaction hash. + pub fn accepted(tx_hash: Hash) -> Self { + Self { + event_type: EventType::Accepted, + attr: "hash".to_string(), + value: tx_hash, + } + } + + /// Returns a query matching the given applied transaction hash. + pub fn applied(tx_hash: Hash) -> Self { + Self { + event_type: EventType::Applied, + attr: "hash".to_string(), + value: tx_hash, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::node::ledger::events::EventLevel; + + /// Test if query matching is working as expected. + #[test] + fn test_tm_query_matching() { + const HASH: &str = + "DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF"; + + let matcher = QueryMatcher { + event_type: EventType::Accepted, + attr: "hash".to_string(), + value: HASH.try_into().unwrap(), + }; + + let tests = { + let event_1 = Event { + event_type: EventType::Accepted, + level: EventLevel::Block, + attributes: { + let mut attrs = std::collections::HashMap::new(); + attrs.insert("hash".to_string(), HASH.to_string()); + attrs + }, + }; + let accepted_1 = true; + + let event_2 = Event { + event_type: EventType::Applied, + level: EventLevel::Block, + attributes: { + let mut attrs = std::collections::HashMap::new(); + attrs.insert("hash".to_string(), HASH.to_string()); + attrs + }, + }; + let accepted_2 = false; + + [(event_1, accepted_1), (event_2, accepted_2)] + }; + + for (ref ev, status) in tests { + if matcher.matches(ev) != status { + panic!("Test failed"); + } + } + } +} diff --git a/apps/src/lib/node/ledger/shell/finalize_block.rs b/apps/src/lib/node/ledger/shell/finalize_block.rs index 22ab9661c7..a08bbfea16 100644 --- a/apps/src/lib/node/ledger/shell/finalize_block.rs +++ b/apps/src/lib/node/ledger/shell/finalize_block.rs @@ -238,6 +238,9 @@ where .gas_meter .finalize_transaction() .map_err(|_| Error::GasOverflow)?; + + self.event_log_mut().log_events(response.events.clone()); + Ok(response) } diff --git a/apps/src/lib/node/ledger/shell/mod.rs b/apps/src/lib/node/ledger/shell/mod.rs index ef2160737a..9ac651fb42 100644 --- a/apps/src/lib/node/ledger/shell/mod.rs +++ b/apps/src/lib/node/ledger/shell/mod.rs @@ -55,6 +55,7 @@ use crate::facade::tendermint_proto::abci::{ }; use crate::facade::tendermint_proto::crypto::public_key; use crate::facade::tower_abci::{request, response}; +use crate::node::ledger::events::log::EventLog; use crate::node::ledger::events::Event; use crate::node::ledger::shims::abcipp_shim_types::shim; use crate::node::ledger::shims::abcipp_shim_types::shim::response::TxResult; @@ -213,6 +214,8 @@ where tx_wasm_cache: TxCache, /// Proposal execution tracking pub proposal_data: HashSet, + /// Log of events emitted by `FinalizeBlock`. + event_log: EventLog, } impl Shell @@ -318,9 +321,23 @@ where tx_wasm_compilation_cache as usize, ), proposal_data: HashSet::new(), + // TODO: config event log params + event_log: EventLog::default(), } } + /// Return a reference to the [`EventLog`]. + #[inline] + pub fn event_log(&self) -> &EventLog { + &self.event_log + } + + /// Return a mutable reference to the [`EventLog`]. + #[inline] + pub fn event_log_mut(&mut self) -> &mut EventLog { + &mut self.event_log + } + /// Iterate over the wrapper txs in order #[allow(dead_code)] fn iter_tx_queue(&mut self) -> impl Iterator { diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 87bae5ef94..8094451326 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -95,6 +95,7 @@ ed25519-consensus = "1.2.0" ferveo = {optional = true, git = "https://github.com/anoma/ferveo"} ferveo-common = {git = "https://github.com/anoma/ferveo"} tpke = {package = "group-threshold-cryptography", optional = true, git = "https://github.com/anoma/ferveo"} +hex = "0.4.3" # TODO using the same version of tendermint-rs as we do here. ibc-abcipp = {package = "ibc", git = "https://github.com/heliaxdev/ibc-rs", rev = "9fcc1c8c19db6af50806ffe5b2f6c214adcbfd5d", default-features = false, optional = true} ibc-proto-abcipp = {package = "ibc-proto", git = "https://github.com/heliaxdev/ibc-rs", rev = "9fcc1c8c19db6af50806ffe5b2f6c214adcbfd5d", default-features = false, optional = true} diff --git a/shared/src/types/hash.rs b/shared/src/types/hash.rs index 229cd56063..498fc426bd 100644 --- a/shared/src/types/hash.rs +++ b/shared/src/types/hash.rs @@ -6,6 +6,7 @@ use std::ops::Deref; use arse_merkle_tree::traits::Value; use arse_merkle_tree::Hash as TreeHash; use borsh::{BorshDeserialize, BorshSchema, BorshSerialize}; +use hex::FromHex; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use thiserror::Error; @@ -13,7 +14,7 @@ use thiserror::Error; use crate::tendermint::abci::transaction; use crate::tendermint::Hash as TmHash; -/// The length of the transaction hash string +/// The length of the raw transaction hash. pub const HASH_LENGTH: usize = 32; #[allow(missing_docs)] @@ -23,6 +24,8 @@ pub enum Error { Temporary { error: String }, #[error("Failed trying to convert slice to a hash: {0}")] ConversionFailed(std::array::TryFromSliceError), + #[error("Failed to convert string into a hash: {0}")] + FromStringError(hex::FromHexError), } /// Result for functions that may fail @@ -86,6 +89,25 @@ impl TryFrom<&[u8]> for Hash { } } +impl TryFrom for Hash { + type Error = self::Error; + + fn try_from(string: String) -> HashResult { + string.as_str().try_into() + } +} + +impl TryFrom<&str> for Hash { + type Error = self::Error; + + fn try_from(string: &str) -> HashResult { + Ok(Self( + <[u8; HASH_LENGTH]>::from_hex(string) + .map_err(Error::FromStringError)?, + )) + } +} + impl From for transaction::Hash { fn from(hash: Hash) -> Self { Self::new(hash.0) @@ -116,3 +138,23 @@ impl From for TreeHash { Self::from(hash.0) } } + +#[cfg(test)] +mod tests { + use proptest::prelude::*; + use proptest::string::{string_regex, RegexGeneratorStrategy}; + + use super::*; + + /// Returns a proptest strategy that yields hex encoded hashes. + fn hex_encoded_hash_strat() -> RegexGeneratorStrategy { + string_regex(r"[a-fA-F0-9]{64}").unwrap() + } + + proptest! { + #[test] + fn test_hash_string(hex_hash in hex_encoded_hash_strat()) { + let _: Hash = hex_hash.try_into().unwrap(); + } + } +} diff --git a/wasm/Cargo.lock b/wasm/Cargo.lock index 9d63d52b84..920f7162eb 100644 --- a/wasm/Cargo.lock +++ b/wasm/Cargo.lock @@ -1366,6 +1366,7 @@ dependencies = [ "derivative", "ed25519-consensus", "ferveo-common", + "hex", "ibc", "ibc-proto", "ics23", diff --git a/wasm/checksums.json b/wasm/checksums.json index 496d1c7a0f..2a7fd149a8 100644 --- a/wasm/checksums.json +++ b/wasm/checksums.json @@ -12,4 +12,4 @@ "vp_testnet_faucet.wasm": "vp_testnet_faucet.ae9a681dc2c1bd244b0575474fa4a364af56fa75833950693ca52ab25018c97d.wasm", "vp_token.wasm": "vp_token.468de153dc5ce3af208bd762de3e85be48bc631012ec5f0947af95168da6cb93.wasm", "vp_user.wasm": "vp_user.c101016a85a72f40da7f33e5d9061cfd2e3274eaac75a71c59c9ab4ed9896ffd.wasm" -} \ No newline at end of file +} diff --git a/wasm_for_tests/wasm_source/Cargo.lock b/wasm_for_tests/wasm_source/Cargo.lock index 1cbb2ff069..fbcca7b195 100644 --- a/wasm_for_tests/wasm_source/Cargo.lock +++ b/wasm_for_tests/wasm_source/Cargo.lock @@ -1366,6 +1366,7 @@ dependencies = [ "derivative", "ed25519-consensus", "ferveo-common", + "hex", "ibc", "ibc-proto", "ics23", From 8339255f4e0a347e6829a245a34a3b5421b61bdf Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Wed, 2 Nov 2022 13:46:49 +0000 Subject: [PATCH 2/8] Update apps/src/lib/client/tx.rs Co-authored-by: Tomas Zemanovic --- apps/src/lib/client/tx.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/src/lib/client/tx.rs b/apps/src/lib/client/tx.rs index a0ec44ee89..ac0cdae3e1 100644 --- a/apps/src/lib/client/tx.rs +++ b/apps/src/lib/client/tx.rs @@ -48,7 +48,7 @@ const TX_WITHDRAW_WASM: &str = "tx_withdraw.wasm"; /// Timeout for requests to the `/accepted` and `/applied` /// ABCI query endpoints. const ENV_VAR_NAMADA_EVENTS_MAX_WAIT_TIME_SECONDS: &str = - "NAMADA_EVENTS_MAX_WAIT_TIME_SECONDS"; + "ANOMA_EVENTS_MAX_WAIT_TIME_SECONDS"; /// Default timeout in seconds for requests to the `/accepted` /// and `/applied` ABCI query endpoints. From ee5c5595e4b4dd6f816cfa6f6cc3ffaa02e90ac7 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Wed, 2 Nov 2022 13:49:42 +0000 Subject: [PATCH 3/8] Update apps/src/lib/client/tendermint_rpc_types.rs Co-authored-by: Tomas Zemanovic --- apps/src/lib/client/tendermint_rpc_types.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/src/lib/client/tendermint_rpc_types.rs b/apps/src/lib/client/tendermint_rpc_types.rs index 0e94155378..094efa9bca 100644 --- a/apps/src/lib/client/tendermint_rpc_types.rs +++ b/apps/src/lib/client/tendermint_rpc_types.rs @@ -72,10 +72,7 @@ impl TryFrom for TxResponse { .map(String::as_str) // TODO: fix finalize block, to return initialized accounts, // even when we reject a tx? - .or(Some("[]")) - // NOTE: at this point we only have `Some(vec)`, not `None` - .ok_or_else(|| unreachable!()) - .and_then(|initialized_accounts| { + .map_or(Ok(vec![]), |initialized_accounts| { serde_json::from_str(initialized_accounts) .map_err(|err| format!("JSON decode error: {err}")) })?; From ecf8d7a7f1f22a478d3ceba42b48eb0fbc517769 Mon Sep 17 00:00:00 2001 From: James Hiew Date: Wed, 2 Nov 2022 11:18:39 +0000 Subject: [PATCH 4/8] Move namada_apps::node::ledger::events to the shared crate Move event log from `Shell` to `Storage` Guard less things with the ferveo-tpke feature so that wasm compiles Fix cargo doc link Update client to use new RPC router for accepted/applied queries Use Vec instead of raw Vec Remove no longer used From impl query_tx_events should return eyre::Result --- Cargo.lock | 2 +- apps/Cargo.toml | 1 - apps/src/lib/client/rpc.rs | 64 +++++++++---------- apps/src/lib/client/tendermint_rpc_types.rs | 2 +- apps/src/lib/node/ledger/mod.rs | 26 +++++--- .../lib/node/ledger/shell/finalize_block.rs | 5 +- apps/src/lib/node/ledger/shell/governance.rs | 2 +- apps/src/lib/node/ledger/shell/mod.rs | 19 +----- .../node/ledger/shims/abcipp_shim_types.rs | 7 +- shared/Cargo.toml | 1 + .../lib/node => shared/src}/ledger/events.rs | 31 ++++++--- .../node => shared/src}/ledger/events/log.rs | 7 +- .../src}/ledger/events/log/dumb_queries.rs | 7 +- shared/src/ledger/mod.rs | 1 + shared/src/ledger/queries/shell.rs | 42 +++++++++++- shared/src/ledger/storage/mod.rs | 48 ++++++++++++++ shared/src/types/hash.rs | 9 +++ wasm/Cargo.lock | 10 +++ wasm_for_tests/wasm_source/Cargo.lock | 10 +++ 19 files changed, 206 insertions(+), 88 deletions(-) rename {apps/src/lib/node => shared/src}/ledger/events.rs (86%) rename {apps/src/lib/node => shared/src}/ledger/events/log.rs (97%) rename {apps/src/lib/node => shared/src}/ledger/events/log/dumb_queries.rs (95%) diff --git a/Cargo.lock b/Cargo.lock index 40068c3ff9..169be3e447 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2920,6 +2920,7 @@ dependencies = [ "borsh", "byte-unit", "chrono", + "circular-queue", "clru", "data-encoding", "derivative", @@ -2991,7 +2992,6 @@ dependencies = [ "borsh", "byte-unit", "byteorder", - "circular-queue", "clap", "color-eyre", "config", diff --git a/apps/Cargo.toml b/apps/Cargo.toml index ba297bacd9..db95520cc8 100644 --- a/apps/Cargo.toml +++ b/apps/Cargo.toml @@ -78,7 +78,6 @@ blake2b-rs = "0.2.0" borsh = "0.9.0" byte-unit = "4.0.13" byteorder = "1.4.2" -circular-queue = "0.2.6" # https://github.com/clap-rs/clap/issues/1037 clap = {git = "https://github.com/clap-rs/clap/", tag = "v3.0.0-beta.2", default-features = false, features = ["std", "suggestions", "color", "cargo"]} color-eyre = "0.5.10" diff --git a/apps/src/lib/client/rpc.rs b/apps/src/lib/client/rpc.rs index 3331376a2c..8eebbc360a 100644 --- a/apps/src/lib/client/rpc.rs +++ b/apps/src/lib/client/rpc.rs @@ -12,7 +12,9 @@ use async_std::path::PathBuf; use async_std::prelude::*; use borsh::BorshDeserialize; use data_encoding::HEXLOWER; +use eyre::{eyre, Context as EyreContext}; use itertools::Itertools; +use namada::ledger::events::Event; use namada::ledger::governance::parameters::GovParams; use namada::ledger::governance::storage as gov_storage; use namada::ledger::governance::utils::Votes; @@ -29,6 +31,7 @@ use namada::types::governance::{ OfflineProposal, OfflineVote, ProposalResult, ProposalVote, TallyResult, VotePower, }; +use namada::types::hash::Hash; use namada::types::key::*; use namada::types::storage::{Epoch, Key, KeySeg, PrefixValue}; use namada::types::token::{balance_key, Amount}; @@ -43,8 +46,6 @@ use crate::facade::tendermint_rpc::query::Query; use crate::facade::tendermint_rpc::{ Client, HttpClient, Order, SubscriptionClient, WebSocketClient, }; -use crate::node::ledger::events::Event; -use crate::node::ledger::rpc::Path; /// Query the status of a given transaction. /// @@ -74,12 +75,8 @@ pub async fn query_tx_status( let mut backoff = ONE_SECOND; loop { - let data = vec![]; tracing::debug!(query = ?status, "Querying tx status"); - let response = match client - .abci_query(Some(status.into()), data, None, false) - .await - { + let mut events = match query_tx_events(&client, status).await { Ok(response) => response, Err(err) => { tracing::debug!(%err, "ABCI query failed"); @@ -87,24 +84,6 @@ pub async fn query_tx_status( continue; } }; - let mut events = match response.code { - Code::Ok => { - match Vec::::try_from_slice(&response.value[..]) { - Ok(events) => events, - Err(err) => { - eprintln!("Error decoding the event value: {err}"); - break Err(()); - } - } - } - Code::Err(err) => { - eprintln!( - "Error in the query {} (error code {})", - response.info, err - ); - break Err(()); - } - }; if let Some(e) = events.pop() { // we should only have one event matching the query break Ok(e); @@ -1440,14 +1419,6 @@ impl<'a> TxEventQuery<'a> { } } -impl<'a> From> for crate::facade::tendermint::abci::Path { - fn from(tx_query: TxEventQuery<'a>) -> Self { - format!("{}/{}", tx_query.event_type(), tx_query.tx_hash()) - .parse() - .expect("This operation is infallible") - } -} - /// Transaction event queries are semantically a subset of general queries impl<'a> From> for Query { fn from(tx_query: TxEventQuery<'a>) -> Self { @@ -1462,6 +1433,29 @@ impl<'a> From> for Query { } } +pub async fn query_tx_events( + client: &HttpClient, + tx_event_query: TxEventQuery<'_>, +) -> eyre::Result> { + let tx_hash: Hash = tx_event_query.tx_hash().try_into()?; + match tx_event_query { + TxEventQuery::Accepted(_) => RPC + .shell() + .accepted(client, &tx_hash) + .await + .wrap_err_with(|| { + eyre!("Failed querying whether a transaction was accepted") + }), + TxEventQuery::Applied(_) => RPC + .shell() + .applied(client, &tx_hash) + .await + .wrap_err_with(|| { + eyre!("Error querying whether a transaction was applied") + }), + } +} + /// Lookup the full response accompanying the specified transaction event pub async fn query_tx_response( ledger_address: &TendermintAddress, @@ -1601,7 +1595,7 @@ pub async fn get_proposal_votes( if let Some(vote_iter) = vote_iter { for (key, vote) in vote_iter { let voter_address = gov_storage::get_voter_address(&key) - .expect("Vote key should contains the voting address.") + .expect("Vote key should contain the voting address.") .clone(); if vote.is_yay() && validators.contains(&voter_address) { let amount = @@ -1611,7 +1605,7 @@ pub async fn get_proposal_votes( let validator_address = gov_storage::get_vote_delegation_address(&key) .expect( - "Vote key should contains the delegation address.", + "Vote key should contain the delegation address.", ) .clone(); let delegator_token_amount = get_bond_amount_at( diff --git a/apps/src/lib/client/tendermint_rpc_types.rs b/apps/src/lib/client/tendermint_rpc_types.rs index 094efa9bca..537cca243f 100644 --- a/apps/src/lib/client/tendermint_rpc_types.rs +++ b/apps/src/lib/client/tendermint_rpc_types.rs @@ -1,11 +1,11 @@ use std::convert::TryFrom; +use namada::ledger::events::Event; use namada::proto::Tx; use namada::types::address::Address; use serde::Serialize; use crate::cli::safe_exit; -use crate::node::ledger::events::Event; /// Data needed for broadcasting a tx and /// monitoring its progress on chain diff --git a/apps/src/lib/node/ledger/mod.rs b/apps/src/lib/node/ledger/mod.rs index 9796c18fce..121f47cdeb 100644 --- a/apps/src/lib/node/ledger/mod.rs +++ b/apps/src/lib/node/ledger/mod.rs @@ -1,6 +1,5 @@ mod abortable; mod broadcaster; -pub mod events; mod shell; mod shims; pub mod storage; @@ -55,7 +54,6 @@ const ENV_VAR_RAYON_THREADS: &str = "ANOMA_RAYON_THREADS"; // Poll::Ready(Ok(())) // } //``` - impl Shell { fn load_proposals(&mut self) { let proposals_key = gov_storage::get_commiting_proposals_prefix( @@ -199,10 +197,15 @@ pub fn reset(config: config::Ledger) -> Result<(), shell::Error> { shell::reset(config) } -/// Runs three concurrent tasks: A tendermint node, a shell which contains an -/// ABCI server for talking to the tendermint node, and a broadcaster so that -/// the ledger may submit txs to the chain. All must be alive for correct -/// functioning. +/// Runs and monitors a few concurrent tasks. +/// +/// This includes: +/// - A Tendermint node. +/// - A shell which contains an ABCI server, for talking to the Tendermint +/// node. +/// - A [`Broadcaster`], for the ledger to submit txs to Tendermint's mempool. +/// +/// All must be alive for correct functioning. async fn run_aux(config: config::Ledger, wasm_dir: PathBuf) { let setup_data = run_aux_setup(&config, &wasm_dir).await; @@ -409,8 +412,7 @@ fn start_abci_broadcaster_shell( let _ = bc_abort_send.send(()); }) } else { - // dummy async task, which will resolve instantly - tokio::spawn(async { std::future::ready(()).await }) + spawn_dummy_task(()) }; // Setup DB cache, it must outlive the DB instance that's in the shell @@ -518,7 +520,7 @@ async fn run_abci( } /// Launches a new task managing a Tendermint process into the asynchronous -/// runtime, and returns its `JoinHandle`. +/// runtime, and returns its [`task::JoinHandle`]. fn start_tendermint( spawner: &mut AbortableSpawner, config: &config::Ledger, @@ -578,3 +580,9 @@ fn start_tendermint( } }) } + +/// Spawn a dummy asynchronous task into the runtime, +/// which will resolve instantly. +fn spawn_dummy_task(ready: T) -> task::JoinHandle { + tokio::spawn(async { std::future::ready(ready).await }) +} diff --git a/apps/src/lib/node/ledger/shell/finalize_block.rs b/apps/src/lib/node/ledger/shell/finalize_block.rs index a08bbfea16..5c6f7c2c0a 100644 --- a/apps/src/lib/node/ledger/shell/finalize_block.rs +++ b/apps/src/lib/node/ledger/shell/finalize_block.rs @@ -1,6 +1,7 @@ //! Implementation of the `FinalizeBlock` ABCI++ method for the Shell use namada::ledger::protocol; +use namada::ledger::storage::EventLogExt; use namada::types::storage::{BlockHash, Header}; use super::governance::execute_governance_proposals; @@ -239,7 +240,9 @@ where .finalize_transaction() .map_err(|_| Error::GasOverflow)?; - self.event_log_mut().log_events(response.events.clone()); + self.storage + .event_log_mut() + .log_events(response.events.clone()); Ok(response) } diff --git a/apps/src/lib/node/ledger/shell/governance.rs b/apps/src/lib/node/ledger/shell/governance.rs index d9879d7a20..244747b8f3 100644 --- a/apps/src/lib/node/ledger/shell/governance.rs +++ b/apps/src/lib/node/ledger/shell/governance.rs @@ -1,3 +1,4 @@ +use namada::ledger::events::EventType; use namada::ledger::governance::storage as gov_storage; use namada::ledger::governance::utils::{ compute_tally, get_proposal_votes, ProposalEvent, @@ -13,7 +14,6 @@ use namada::types::storage::Epoch; use namada::types::token; use super::*; -use crate::node::ledger::events::EventType; #[derive(Default)] pub struct ProposalsResult { diff --git a/apps/src/lib/node/ledger/shell/mod.rs b/apps/src/lib/node/ledger/shell/mod.rs index 9ac651fb42..a1ca0222bb 100644 --- a/apps/src/lib/node/ledger/shell/mod.rs +++ b/apps/src/lib/node/ledger/shell/mod.rs @@ -20,6 +20,7 @@ use std::path::{Path, PathBuf}; use std::rc::Rc; use borsh::{BorshDeserialize, BorshSerialize}; +use namada::ledger::events::Event; use namada::ledger::gas::BlockGasMeter; use namada::ledger::pos::namada_proof_of_stake::types::{ ActiveValidator, ValidatorSetUpdate, @@ -55,8 +56,6 @@ use crate::facade::tendermint_proto::abci::{ }; use crate::facade::tendermint_proto::crypto::public_key; use crate::facade::tower_abci::{request, response}; -use crate::node::ledger::events::log::EventLog; -use crate::node::ledger::events::Event; use crate::node::ledger::shims::abcipp_shim_types::shim; use crate::node::ledger::shims::abcipp_shim_types::shim::response::TxResult; use crate::node::ledger::{storage, tendermint_node}; @@ -214,8 +213,6 @@ where tx_wasm_cache: TxCache, /// Proposal execution tracking pub proposal_data: HashSet, - /// Log of events emitted by `FinalizeBlock`. - event_log: EventLog, } impl Shell @@ -321,23 +318,9 @@ where tx_wasm_compilation_cache as usize, ), proposal_data: HashSet::new(), - // TODO: config event log params - event_log: EventLog::default(), } } - /// Return a reference to the [`EventLog`]. - #[inline] - pub fn event_log(&self) -> &EventLog { - &self.event_log - } - - /// Return a mutable reference to the [`EventLog`]. - #[inline] - pub fn event_log_mut(&mut self) -> &mut EventLog { - &mut self.event_log - } - /// Iterate over the wrapper txs in order #[allow(dead_code)] fn iter_tx_queue(&mut self) -> impl Iterator { diff --git a/apps/src/lib/node/ledger/shims/abcipp_shim_types.rs b/apps/src/lib/node/ledger/shims/abcipp_shim_types.rs index 5d9f2c420c..5a66c3c921 100644 --- a/apps/src/lib/node/ledger/shims/abcipp_shim_types.rs +++ b/apps/src/lib/node/ledger/shims/abcipp_shim_types.rs @@ -266,6 +266,10 @@ pub mod shim { /// Custom types for response payloads pub mod response { + use namada::ledger::events::Event; + #[cfg(feature = "abcipp")] + use namada::ledger::events::EventLevel; + use crate::facade::tendermint_proto::abci::{ Event as TmEvent, ResponseProcessProposal, ValidatorUpdate, }; @@ -276,9 +280,6 @@ pub mod shim { abci::{ExecTxResult, ResponseFinalizeBlock}, types::ConsensusParams, }; - use crate::node::ledger::events::Event; - #[cfg(feature = "abcipp")] - use crate::node::ledger::events::EventLevel; #[derive(Debug, Default)] pub struct VerifyHeader; diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 8094451326..320c349a21 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -86,6 +86,7 @@ arse-merkle-tree = {package = "sparse-merkle-tree", git = "https://github.com/he async-trait = {version = "0.1.51", optional = true} bech32 = "0.8.0" borsh = "0.9.0" +circular-queue = "0.2.6" chrono = {version = "0.4.22", default-features = false, features = ["clock", "std"]} # Using unreleased commit on top of version 0.5.0 that adds Sync to the CLruCache clru = {git = "https://github.com/marmeladema/clru-rs.git", rev = "71ca566"} diff --git a/apps/src/lib/node/ledger/events.rs b/shared/src/ledger/events.rs similarity index 86% rename from apps/src/lib/node/ledger/events.rs rename to shared/src/ledger/events.rs index 0a8a50ad60..b17d2db83d 100644 --- a/apps/src/lib/node/ledger/events.rs +++ b/shared/src/ledger/events.rs @@ -1,3 +1,4 @@ +//! Logic to do with events emitted by the ledger. pub mod log; use std::collections::HashMap; @@ -6,18 +7,21 @@ use std::fmt::{self, Display}; use std::ops::{Index, IndexMut}; use borsh::{BorshDeserialize, BorshSerialize}; -use namada::ledger::governance::utils::ProposalEvent; -use namada::types::ibc::IbcEvent; -use namada::types::transaction::{hash_tx, TxType}; +use tendermint_proto::abci::EventAttribute; use thiserror::Error; -use crate::facade::tendermint_proto::abci::EventAttribute; +use crate::ledger::governance::utils::ProposalEvent; +use crate::types::ibc::IbcEvent; +#[cfg(feature = "ferveo-tpke")] +use crate::types::transaction::{hash_tx, TxType}; /// Indicates if an event is emitted do to /// an individual Tx or the nature of a finalized block #[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub enum EventLevel { + /// Indicates an event is to do with a finalized block. Block, + /// Indicates an event is to do with an individual transaction. Tx, } @@ -25,21 +29,25 @@ pub enum EventLevel { /// using a websocket client #[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub struct Event { + /// The type of event. pub event_type: EventType, + /// The level of the event - whether it relates to a block or an individual + /// transaction. pub level: EventLevel, + /// Key-value attributes of the event. pub attributes: HashMap, } /// The two types of custom events we currently use #[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub enum EventType { - // The transaction was accepted to be included in a block + /// The transaction was accepted to be included in a block Accepted, - // The transaction was applied during block finalization + /// The transaction was applied during block finalization Applied, - // The IBC transaction was applied during block finalization + /// The IBC transaction was applied during block finalization Ibc(String), - // The proposal that has been executed + /// The proposal that has been executed Proposal, } @@ -58,6 +66,7 @@ impl Display for EventType { impl Event { /// Creates a new event with the hash and height of the transaction /// already filled in + #[cfg(feature = "ferveo-tpke")] pub fn new_tx_event(tx: &TxType, height: u64) -> Self { let mut event = match tx { TxType::Wrapper(wrapper) => { @@ -153,7 +162,7 @@ impl From for Event { } /// Convert our custom event into the necessary tendermint proto type -impl From for crate::facade::tendermint_proto::abci::Event { +impl From for tendermint_proto::abci::Event { fn from(event: Event) -> Self { Self { r#type: event.event_type.to_string(), @@ -187,12 +196,16 @@ impl Attributes { } } +/// Errors to do with emitting events. #[derive(Error, Debug)] pub enum Error { + /// Error when parsing attributes from an event JSON. #[error("Json missing `attributes` field")] MissingAttributes, + /// Missing key in attributes. #[error("Attributes missing key: {0}")] MissingKey(String), + /// Missing value in attributes. #[error("Attributes missing value: {0}")] MissingValue(String), } diff --git a/apps/src/lib/node/ledger/events/log.rs b/shared/src/ledger/events/log.rs similarity index 97% rename from apps/src/lib/node/ledger/events/log.rs rename to shared/src/ledger/events/log.rs index f3e655469a..931f0088c4 100644 --- a/apps/src/lib/node/ledger/events/log.rs +++ b/shared/src/ledger/events/log.rs @@ -8,7 +8,7 @@ use std::default::Default; use circular_queue::CircularQueue; -use crate::node::ledger::events::Event; +use crate::ledger::events::Event; pub mod dumb_queries; @@ -79,10 +79,9 @@ impl EventLog { #[cfg(test)] mod tests { - use namada::types::hash::Hash; - use super::*; - use crate::node::ledger::events::{EventLevel, EventType}; + use crate::ledger::events::{EventLevel, EventType}; + use crate::types::hash::Hash; const HASH: &str = "DEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEFDEADBEEF"; diff --git a/apps/src/lib/node/ledger/events/log/dumb_queries.rs b/shared/src/ledger/events/log/dumb_queries.rs similarity index 95% rename from apps/src/lib/node/ledger/events/log/dumb_queries.rs rename to shared/src/ledger/events/log/dumb_queries.rs index d7e2c51630..9d48979860 100644 --- a/apps/src/lib/node/ledger/events/log/dumb_queries.rs +++ b/shared/src/ledger/events/log/dumb_queries.rs @@ -6,9 +6,8 @@ //! tm.event='NewBlock' AND .<$attr>='<$value>' //! ``` -use namada::types::hash::Hash; - -use crate::node::ledger::events::{Event, EventType}; +use crate::ledger::events::{Event, EventType}; +use crate::types::hash::Hash; /// A [`QueryMatcher`] verifies if a Namada event matches a /// given Tendermint query. @@ -59,7 +58,7 @@ impl QueryMatcher { #[cfg(test)] mod tests { use super::*; - use crate::node::ledger::events::EventLevel; + use crate::ledger::events::EventLevel; /// Test if query matching is working as expected. #[test] diff --git a/shared/src/ledger/mod.rs b/shared/src/ledger/mod.rs index cbe2528b76..a04a9ded5f 100644 --- a/shared/src/ledger/mod.rs +++ b/shared/src/ledger/mod.rs @@ -1,6 +1,7 @@ //! The ledger modules pub mod eth_bridge; +pub mod events; pub mod gas; pub mod governance; pub mod ibc; diff --git a/shared/src/ledger/queries/shell.rs b/shared/src/ledger/queries/shell.rs index 7491af4945..cf05d925a1 100644 --- a/shared/src/ledger/queries/shell.rs +++ b/shared/src/ledger/queries/shell.rs @@ -1,10 +1,14 @@ use borsh::BorshSerialize; use tendermint_proto::crypto::{ProofOp, ProofOps}; +use crate::ledger::events::log::dumb_queries; +use crate::ledger::events::Event; use crate::ledger::queries::types::{RequestCtx, RequestQuery}; use crate::ledger::queries::{require_latest_height, EncodedResponseQuery}; -use crate::ledger::storage::{DBIter, StorageHasher, DB}; +use crate::ledger::storage::traits::StorageHasher; +use crate::ledger::storage::{DBIter, EventLogExt, DB}; use crate::ledger::storage_api::{self, ResultExt, StorageRead}; +use crate::types::hash::Hash; use crate::types::storage::{self, Epoch, PrefixValue}; #[cfg(all(feature = "wasm-runtime", feature = "ferveo-tpke"))] use crate::types::transaction::TxResult; @@ -30,6 +34,12 @@ router! {SHELL, // Raw storage access - is given storage key present? ( "has_key" / [storage_key: storage::Key] ) -> bool = storage_has_key, + + // was the transaction accepted? + ( "accepted" / [tx_hash: Hash] ) -> Vec = accepted, + + // was the transaction applied? + ( "applied" / [tx_hash: Hash] ) -> Vec = applied, } #[cfg(not(all(feature = "wasm-runtime", feature = "ferveo-tpke")))] @@ -48,6 +58,12 @@ router! {SHELL, // Raw storage access - is given storage key present? ( "has_key" / [storage_key: storage::Key] ) -> bool = storage_has_key, + + // was the transaction accepted? + ( "accepted" / [tx_hash: Hash]) -> Vec = accepted, + + // was the transaction applied? + ( "applied" / [tx_hash: Hash]) -> Vec = applied, } // Handlers: @@ -209,6 +225,30 @@ where Ok(data) } +fn accepted( + ctx: RequestCtx<'_, D, H>, + tx_hash: Hash, +) -> storage_api::Result> +where + D: 'static + DB + for<'iter> DBIter<'iter> + Sync, + H: 'static + StorageHasher + Sync, +{ + let matcher = dumb_queries::QueryMatcher::accepted(tx_hash); + Ok(ctx.storage.query_event_log(matcher)) +} + +fn applied( + ctx: RequestCtx<'_, D, H>, + tx_hash: Hash, +) -> storage_api::Result> +where + D: 'static + DB + for<'iter> DBIter<'iter> + Sync, + H: 'static + StorageHasher + Sync, +{ + let matcher = dumb_queries::QueryMatcher::applied(tx_hash); + Ok(ctx.storage.query_event_log(matcher)) +} + #[cfg(test)] mod test { use borsh::BorshDeserialize; diff --git a/shared/src/ledger/storage/mod.rs b/shared/src/ledger/storage/mod.rs index 16c3ecf180..4cdca104d6 100644 --- a/shared/src/ledger/storage/mod.rs +++ b/shared/src/ledger/storage/mod.rs @@ -13,6 +13,8 @@ use std::array; use thiserror::Error; +use super::events::log::{dumb_queries, EventLog}; +use super::events::Event; use super::parameters::Parameters; use super::storage_api::{ResultExt, StorageRead, StorageWrite}; use super::{parameters, storage_api}; @@ -69,6 +71,8 @@ where /// Wrapper txs to be decrypted in the next block proposal #[cfg(feature = "ferveo-tpke")] pub tx_queue: TxQueue, + /// Log of events emitted by `FinalizeBlock`. + event_log: EventLog, } /// The block storage data @@ -270,6 +274,48 @@ pub trait DBWriteBatch { fn delete>(&mut self, key: K); } +/// Methods for querying and mutating an event log. +pub trait EventLogExt { + /// Query events in the event log matching the given query. + fn query_event_log( + &self, + matcher: dumb_queries::QueryMatcher, + ) -> Vec; + /// Return a reference to the [`EventLog`]. + fn event_log(&self) -> &EventLog; + /// Return a mutable reference to the [`EventLog`]. + fn event_log_mut(&mut self) -> &mut EventLog; +} + +impl EventLogExt for Storage +where + D: DB + for<'iter> DBIter<'iter>, + H: StorageHasher, +{ + /// Query events in the event log matching the given query. + fn query_event_log( + &self, + matcher: dumb_queries::QueryMatcher, + ) -> Vec { + self.event_log() + .iter_with_matcher(matcher) + .cloned() + .collect::>() + } + + /// Return a reference to the [`EventLog`]. + #[inline] + fn event_log(&self) -> &EventLog { + &self.event_log + } + + /// Return a mutable reference to the [`EventLog`]. + #[inline] + fn event_log_mut(&mut self) -> &mut EventLog { + &mut self.event_log + } +} + impl Storage where D: DB + for<'iter> DBIter<'iter>, @@ -302,6 +348,7 @@ where ), #[cfg(feature = "ferveo-tpke")] tx_queue: TxQueue::default(), + event_log: EventLog::default(), } } @@ -907,6 +954,7 @@ pub mod testing { ), #[cfg(feature = "ferveo-tpke")] tx_queue: TxQueue::default(), + event_log: EventLog::default(), } } } diff --git a/shared/src/types/hash.rs b/shared/src/types/hash.rs index 498fc426bd..4198cac4d5 100644 --- a/shared/src/types/hash.rs +++ b/shared/src/types/hash.rs @@ -2,6 +2,7 @@ use std::fmt::{self, Display}; use std::ops::Deref; +use std::str::FromStr; use arse_merkle_tree::traits::Value; use arse_merkle_tree::Hash as TreeHash; @@ -114,6 +115,14 @@ impl From for transaction::Hash { } } +impl FromStr for Hash { + type Err = self::Error; + + fn from_str(str: &str) -> Result { + Self::try_from(str) + } +} + impl Hash { /// Compute sha256 of some bytes pub fn sha256(data: impl AsRef<[u8]>) -> Self { diff --git a/wasm/Cargo.lock b/wasm/Cargo.lock index 920f7162eb..80a3bee615 100644 --- a/wasm/Cargo.lock +++ b/wasm/Cargo.lock @@ -375,6 +375,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "circular-queue" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d34327ead1c743a10db339de35fb58957564b99d248a67985c55638b22c59b5" +dependencies = [ + "version_check", +] + [[package]] name = "clru" version = "0.5.0" @@ -1361,6 +1370,7 @@ dependencies = [ "bech32", "borsh", "chrono", + "circular-queue", "clru", "data-encoding", "derivative", diff --git a/wasm_for_tests/wasm_source/Cargo.lock b/wasm_for_tests/wasm_source/Cargo.lock index fbcca7b195..0d44ef4701 100644 --- a/wasm_for_tests/wasm_source/Cargo.lock +++ b/wasm_for_tests/wasm_source/Cargo.lock @@ -375,6 +375,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "circular-queue" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d34327ead1c743a10db339de35fb58957564b99d248a67985c55638b22c59b5" +dependencies = [ + "version_check", +] + [[package]] name = "clru" version = "0.5.0" @@ -1361,6 +1370,7 @@ dependencies = [ "bech32", "borsh", "chrono", + "circular-queue", "clru", "data-encoding", "derivative", From 676e1e1fa71b691e3d7bd2b38d2c6bc7bd054f77 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 3 Nov 2022 12:36:50 +0000 Subject: [PATCH 5/8] [ci] wasm checksums update --- wasm/checksums.json | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/wasm/checksums.json b/wasm/checksums.json index 2a7fd149a8..57862f254c 100644 --- a/wasm/checksums.json +++ b/wasm/checksums.json @@ -1,15 +1,15 @@ { - "tx_bond.wasm": "tx_bond.04d6847800dad11990b42e8f2981a4a79d06d6d0c981c3d70c929e5b6a4f348b.wasm", - "tx_ibc.wasm": "tx_ibc.6ab530398ed8e276a8af7f231edbfae984b7e84eeb854714ba9339c5bed9d330.wasm", - "tx_init_account.wasm": "tx_init_account.578d987351e6ae42baa7849ae167e3ba33f3a62dba51cd47b0fa6d3ea6e4f128.wasm", - "tx_init_proposal.wasm": "tx_init_proposal.71e27610210622fa53c3de58351761cca839681a4f450d4eff6b46bde3ae85a5.wasm", - "tx_init_validator.wasm": "tx_init_validator.269f065ff683782db2fdcac6e2485e80cbebb98929671a42eeb01703e0bbd8f5.wasm", - "tx_transfer.wasm": "tx_transfer.784325cf7763faf8d75797960cda6fbabbd343f3c6f7e6785f60f5e0911a6bb5.wasm", - "tx_unbond.wasm": "tx_unbond.ed13fa636d138ac4e35f2b4f31a6b4d3bed67e6b998dc6325f90711a2aca3704.wasm", - "tx_update_vp.wasm": "tx_update_vp.c4050e597116203eba5afde946a014afb067bdeaaae417377214a80c38a3786b.wasm", - "tx_vote_proposal.wasm": "tx_vote_proposal.ece325881aad1c8a29f715c2f435c3335e08e51eed837c00ce0f7bbaddbefe50.wasm", - "tx_withdraw.wasm": "tx_withdraw.408fc10b3744c398258124e5e48e3449f6baf82a263df26911586a3382fbceb9.wasm", - "vp_testnet_faucet.wasm": "vp_testnet_faucet.ae9a681dc2c1bd244b0575474fa4a364af56fa75833950693ca52ab25018c97d.wasm", - "vp_token.wasm": "vp_token.468de153dc5ce3af208bd762de3e85be48bc631012ec5f0947af95168da6cb93.wasm", - "vp_user.wasm": "vp_user.c101016a85a72f40da7f33e5d9061cfd2e3274eaac75a71c59c9ab4ed9896ffd.wasm" -} + "tx_bond.wasm": "tx_bond.a898b44d2d971c82580564a20a54b60832ed077e1a1d5f522117027f3c4c8c76.wasm", + "tx_ibc.wasm": "tx_ibc.bd0db7017d12db2f2e2bc68b1b4d889ab43a1af47191460c7a33f70af3261644.wasm", + "tx_init_account.wasm": "tx_init_account.b7b7811a3f61a5bc9e2155b27cf314765b41964cde29576ac730b647fc5ac564.wasm", + "tx_init_proposal.wasm": "tx_init_proposal.7fce8427cb2e55c1ac8d9605ed5d01aba66b7d792af7a11da8482c3616a94aef.wasm", + "tx_init_validator.wasm": "tx_init_validator.8f72bbc32e9dc05793d38cc1be29ad6e4b8cfc04a38b44eb4cbc7e41946e45bd.wasm", + "tx_transfer.wasm": "tx_transfer.d08ce64ee60202123137a481fb20efd30e81116c0df795d764734a360e635e29.wasm", + "tx_unbond.wasm": "tx_unbond.95eeaf1b61409469b96caae3ec52e2d131cbb43a7684b0df4e21c8a04940fcc8.wasm", + "tx_update_vp.wasm": "tx_update_vp.7c2e5ad75707a8dff036420ccd22b8d10a24b2881d7e94438acfade419db3f7c.wasm", + "tx_vote_proposal.wasm": "tx_vote_proposal.cac0d2b9b694a6d39f49c46c6f9adc45bff46f5ef6b699377029a291b4d9e96e.wasm", + "tx_withdraw.wasm": "tx_withdraw.f4887cd2089488d13253323824072c900eafeca3dc9eb7c9bd602ae4a552b6a4.wasm", + "vp_testnet_faucet.wasm": "vp_testnet_faucet.902d9b32bd2257bf5980aad75c5737695b0b5f89a4cbccd30b96cae524ac94f4.wasm", + "vp_token.wasm": "vp_token.9fb172d9b3d5f0c88f5626294feedb486273cab8dc45b7dfff7def9804e3033c.wasm", + "vp_user.wasm": "vp_user.a5026fa53661916575a6d22dc0833461cc2b8d7cd6fa6674aadefd355b574c74.wasm" +} \ No newline at end of file From 89ea5259f31720895b8b14bfde6d7a8bfab46d1c Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Thu, 3 Nov 2022 13:15:36 +0000 Subject: [PATCH 6/8] Code review suggestions * Remove the `EventLog` from `Storage`, and add it back to the `Shell`. * Pass a reference to the `EventLog` to `RequestCtx`. * Return `Option` instead of `Vec` from accepted and applied RPC calls. --- apps/src/lib/client/rpc.rs | 9 ++-- .../lib/node/ledger/shell/finalize_block.rs | 5 +- apps/src/lib/node/ledger/shell/mod.rs | 17 +++++++ apps/src/lib/node/ledger/shell/queries.rs | 1 + shared/src/ledger/queries/mod.rs | 6 +++ shared/src/ledger/queries/router.rs | 1 + shared/src/ledger/queries/shell.rs | 24 +++++++--- shared/src/ledger/queries/types.rs | 13 +++-- shared/src/ledger/storage/mod.rs | 48 ------------------- 9 files changed, 56 insertions(+), 68 deletions(-) diff --git a/apps/src/lib/client/rpc.rs b/apps/src/lib/client/rpc.rs index 8eebbc360a..1f4cb1f15a 100644 --- a/apps/src/lib/client/rpc.rs +++ b/apps/src/lib/client/rpc.rs @@ -76,7 +76,7 @@ pub async fn query_tx_status( loop { tracing::debug!(query = ?status, "Querying tx status"); - let mut events = match query_tx_events(&client, status).await { + let maybe_event = match query_tx_events(&client, status).await { Ok(response) => response, Err(err) => { tracing::debug!(%err, "ABCI query failed"); @@ -84,8 +84,7 @@ pub async fn query_tx_status( continue; } }; - if let Some(e) = events.pop() { - // we should only have one event matching the query + if let Some(e) = maybe_event { break Ok(e); } sleep_update(status, &mut backoff).await; @@ -1433,10 +1432,12 @@ impl<'a> From> for Query { } } +/// Call the corresponding `tx_event_query` RPC method, to fetch +/// the current status of a transation. pub async fn query_tx_events( client: &HttpClient, tx_event_query: TxEventQuery<'_>, -) -> eyre::Result> { +) -> eyre::Result> { let tx_hash: Hash = tx_event_query.tx_hash().try_into()?; match tx_event_query { TxEventQuery::Accepted(_) => RPC diff --git a/apps/src/lib/node/ledger/shell/finalize_block.rs b/apps/src/lib/node/ledger/shell/finalize_block.rs index 5c6f7c2c0a..a08bbfea16 100644 --- a/apps/src/lib/node/ledger/shell/finalize_block.rs +++ b/apps/src/lib/node/ledger/shell/finalize_block.rs @@ -1,7 +1,6 @@ //! Implementation of the `FinalizeBlock` ABCI++ method for the Shell use namada::ledger::protocol; -use namada::ledger::storage::EventLogExt; use namada::types::storage::{BlockHash, Header}; use super::governance::execute_governance_proposals; @@ -240,9 +239,7 @@ where .finalize_transaction() .map_err(|_| Error::GasOverflow)?; - self.storage - .event_log_mut() - .log_events(response.events.clone()); + self.event_log_mut().log_events(response.events.clone()); Ok(response) } diff --git a/apps/src/lib/node/ledger/shell/mod.rs b/apps/src/lib/node/ledger/shell/mod.rs index a1ca0222bb..3776377080 100644 --- a/apps/src/lib/node/ledger/shell/mod.rs +++ b/apps/src/lib/node/ledger/shell/mod.rs @@ -20,6 +20,7 @@ use std::path::{Path, PathBuf}; use std::rc::Rc; use borsh::{BorshDeserialize, BorshSerialize}; +use namada::ledger::events::log::EventLog; use namada::ledger::events::Event; use namada::ledger::gas::BlockGasMeter; use namada::ledger::pos::namada_proof_of_stake::types::{ @@ -213,6 +214,8 @@ where tx_wasm_cache: TxCache, /// Proposal execution tracking pub proposal_data: HashSet, + /// Log of events emitted by `FinalizeBlock` ABCI calls. + event_log: EventLog, } impl Shell @@ -318,9 +321,23 @@ where tx_wasm_compilation_cache as usize, ), proposal_data: HashSet::new(), + // TODO: config event log params + event_log: EventLog::default(), } } + /// Return a reference to the [`EventLog`]. + #[inline] + pub fn event_log(&self) -> &EventLog { + &self.event_log + } + + /// Return a mutable reference to the [`EventLog`]. + #[inline] + pub fn event_log_mut(&mut self) -> &mut EventLog { + &mut self.event_log + } + /// Iterate over the wrapper txs in order #[allow(dead_code)] fn iter_tx_queue(&mut self) -> impl Iterator { diff --git a/apps/src/lib/node/ledger/shell/queries.rs b/apps/src/lib/node/ledger/shell/queries.rs index e53ea91417..2293cdd2d5 100644 --- a/apps/src/lib/node/ledger/shell/queries.rs +++ b/apps/src/lib/node/ledger/shell/queries.rs @@ -23,6 +23,7 @@ where pub fn query(&self, query: request::Query) -> response::Query { let ctx = RequestCtx { storage: &self.storage, + event_log: self.event_log(), vp_wasm_cache: self.vp_wasm_cache.read_only(), tx_wasm_cache: self.tx_wasm_cache.read_only(), }; diff --git a/shared/src/ledger/queries/mod.rs b/shared/src/ledger/queries/mod.rs index 8b31376be4..c448780cd2 100644 --- a/shared/src/ledger/queries/mod.rs +++ b/shared/src/ledger/queries/mod.rs @@ -147,6 +147,7 @@ mod testing { use tempfile::TempDir; use super::*; + use crate::ledger::events::log::EventLog; use crate::ledger::storage::testing::TestStorage; use crate::types::storage::BlockHeight; use crate::vm::wasm::{self, TxCache, VpCache}; @@ -161,6 +162,8 @@ mod testing { pub rpc: RPC, /// storage pub storage: TestStorage, + /// event log + pub event_log: EventLog, /// VP wasm compilation cache pub vp_wasm_cache: VpCache, /// tx wasm compilation cache @@ -180,6 +183,7 @@ mod testing { pub fn new(rpc: RPC) -> Self { // Initialize the `TestClient` let storage = TestStorage::default(); + let event_log = EventLog::default(); let (vp_wasm_cache, vp_cache_dir) = wasm::compilation_cache::common::testing::cache(); let (tx_wasm_cache, tx_cache_dir) = @@ -187,6 +191,7 @@ mod testing { Self { rpc, storage, + event_log, vp_wasm_cache: vp_wasm_cache.read_only(), tx_wasm_cache: tx_wasm_cache.read_only(), vp_cache_dir, @@ -221,6 +226,7 @@ mod testing { }; let ctx = RequestCtx { storage: &self.storage, + event_log: &self.event_log, vp_wasm_cache: self.vp_wasm_cache.clone(), tx_wasm_cache: self.tx_wasm_cache.clone(), }; diff --git a/shared/src/ledger/queries/router.rs b/shared/src/ledger/queries/router.rs index e4823e5ad7..9ff33247f9 100644 --- a/shared/src/ledger/queries/router.rs +++ b/shared/src/ledger/queries/router.rs @@ -1008,6 +1008,7 @@ mod test { ..RequestQuery::default() }; let ctx = RequestCtx { + event_log: &client.event_log, storage: &client.storage, vp_wasm_cache: client.vp_wasm_cache.clone(), tx_wasm_cache: client.tx_wasm_cache.clone(), diff --git a/shared/src/ledger/queries/shell.rs b/shared/src/ledger/queries/shell.rs index cf05d925a1..81f701e8aa 100644 --- a/shared/src/ledger/queries/shell.rs +++ b/shared/src/ledger/queries/shell.rs @@ -6,7 +6,7 @@ use crate::ledger::events::Event; use crate::ledger::queries::types::{RequestCtx, RequestQuery}; use crate::ledger::queries::{require_latest_height, EncodedResponseQuery}; use crate::ledger::storage::traits::StorageHasher; -use crate::ledger::storage::{DBIter, EventLogExt, DB}; +use crate::ledger::storage::{DBIter, DB}; use crate::ledger::storage_api::{self, ResultExt, StorageRead}; use crate::types::hash::Hash; use crate::types::storage::{self, Epoch, PrefixValue}; @@ -36,10 +36,10 @@ router! {SHELL, -> bool = storage_has_key, // was the transaction accepted? - ( "accepted" / [tx_hash: Hash] ) -> Vec = accepted, + ( "accepted" / [tx_hash: Hash] ) -> Option = accepted, // was the transaction applied? - ( "applied" / [tx_hash: Hash] ) -> Vec = applied, + ( "applied" / [tx_hash: Hash] ) -> Option = applied, } #[cfg(not(all(feature = "wasm-runtime", feature = "ferveo-tpke")))] @@ -228,25 +228,35 @@ where fn accepted( ctx: RequestCtx<'_, D, H>, tx_hash: Hash, -) -> storage_api::Result> +) -> storage_api::Result> where D: 'static + DB + for<'iter> DBIter<'iter> + Sync, H: 'static + StorageHasher + Sync, { let matcher = dumb_queries::QueryMatcher::accepted(tx_hash); - Ok(ctx.storage.query_event_log(matcher)) + Ok(ctx + .event_log + .iter_with_matcher(matcher) + .by_ref() + .next() + .cloned()) } fn applied( ctx: RequestCtx<'_, D, H>, tx_hash: Hash, -) -> storage_api::Result> +) -> storage_api::Result> where D: 'static + DB + for<'iter> DBIter<'iter> + Sync, H: 'static + StorageHasher + Sync, { let matcher = dumb_queries::QueryMatcher::applied(tx_hash); - Ok(ctx.storage.query_event_log(matcher)) + Ok(ctx + .event_log + .iter_with_matcher(matcher) + .by_ref() + .next() + .cloned()) } #[cfg(test)] diff --git a/shared/src/ledger/queries/types.rs b/shared/src/ledger/queries/types.rs index c7b349ddc0..74f9c683d2 100644 --- a/shared/src/ledger/queries/types.rs +++ b/shared/src/ledger/queries/types.rs @@ -1,5 +1,6 @@ use tendermint_proto::crypto::ProofOps; +use crate::ledger::events::log::EventLog; use crate::ledger::storage::{DBIter, Storage, StorageHasher, DB}; use crate::ledger::storage_api; use crate::types::storage::BlockHeight; @@ -11,17 +12,19 @@ use crate::vm::WasmCacheRoAccess; /// A request context provides read-only access to storage and WASM compilation /// caches to request handlers. #[derive(Debug, Clone)] -pub struct RequestCtx<'a, D, H> +pub struct RequestCtx<'shell, D, H> where D: 'static + DB + for<'iter> DBIter<'iter> + Sync, H: 'static + StorageHasher + Sync, { - /// Storage access - pub storage: &'a Storage, - /// VP WASM compilation cache + /// Reference to the ledger's [`Storage`]. + pub storage: &'shell Storage, + /// Log of events emitted by `FinalizeBlock` ABCI calls. + pub event_log: &'shell EventLog, + /// Cache of VP wasm compiled artifacts. #[cfg(feature = "wasm-runtime")] pub vp_wasm_cache: VpCache, - /// tx WASM compilation cache + /// Cache of transaction wasm compiled artifacts. #[cfg(feature = "wasm-runtime")] pub tx_wasm_cache: TxCache, } diff --git a/shared/src/ledger/storage/mod.rs b/shared/src/ledger/storage/mod.rs index 4cdca104d6..16c3ecf180 100644 --- a/shared/src/ledger/storage/mod.rs +++ b/shared/src/ledger/storage/mod.rs @@ -13,8 +13,6 @@ use std::array; use thiserror::Error; -use super::events::log::{dumb_queries, EventLog}; -use super::events::Event; use super::parameters::Parameters; use super::storage_api::{ResultExt, StorageRead, StorageWrite}; use super::{parameters, storage_api}; @@ -71,8 +69,6 @@ where /// Wrapper txs to be decrypted in the next block proposal #[cfg(feature = "ferveo-tpke")] pub tx_queue: TxQueue, - /// Log of events emitted by `FinalizeBlock`. - event_log: EventLog, } /// The block storage data @@ -274,48 +270,6 @@ pub trait DBWriteBatch { fn delete>(&mut self, key: K); } -/// Methods for querying and mutating an event log. -pub trait EventLogExt { - /// Query events in the event log matching the given query. - fn query_event_log( - &self, - matcher: dumb_queries::QueryMatcher, - ) -> Vec; - /// Return a reference to the [`EventLog`]. - fn event_log(&self) -> &EventLog; - /// Return a mutable reference to the [`EventLog`]. - fn event_log_mut(&mut self) -> &mut EventLog; -} - -impl EventLogExt for Storage -where - D: DB + for<'iter> DBIter<'iter>, - H: StorageHasher, -{ - /// Query events in the event log matching the given query. - fn query_event_log( - &self, - matcher: dumb_queries::QueryMatcher, - ) -> Vec { - self.event_log() - .iter_with_matcher(matcher) - .cloned() - .collect::>() - } - - /// Return a reference to the [`EventLog`]. - #[inline] - fn event_log(&self) -> &EventLog { - &self.event_log - } - - /// Return a mutable reference to the [`EventLog`]. - #[inline] - fn event_log_mut(&mut self) -> &mut EventLog { - &mut self.event_log - } -} - impl Storage where D: DB + for<'iter> DBIter<'iter>, @@ -348,7 +302,6 @@ where ), #[cfg(feature = "ferveo-tpke")] tx_queue: TxQueue::default(), - event_log: EventLog::default(), } } @@ -954,7 +907,6 @@ pub mod testing { ), #[cfg(feature = "ferveo-tpke")] tx_queue: TxQueue::default(), - event_log: EventLog::default(), } } } From 932d94cb1df05bb4e1f08dc4102f5bbf2a02bbac Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Thu, 3 Nov 2022 13:48:09 +0000 Subject: [PATCH 7/8] Small fixes --- apps/src/lib/client/rpc.rs | 1 + shared/src/ledger/queries/shell.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/src/lib/client/rpc.rs b/apps/src/lib/client/rpc.rs index 1f4cb1f15a..36214c3d7e 100644 --- a/apps/src/lib/client/rpc.rs +++ b/apps/src/lib/client/rpc.rs @@ -1458,6 +1458,7 @@ pub async fn query_tx_events( } /// Lookup the full response accompanying the specified transaction event +// TODO: maybe remove this in favor of `query_tx_status` pub async fn query_tx_response( ledger_address: &TendermintAddress, tx_query: TxEventQuery<'_>, diff --git a/shared/src/ledger/queries/shell.rs b/shared/src/ledger/queries/shell.rs index 81f701e8aa..c065029169 100644 --- a/shared/src/ledger/queries/shell.rs +++ b/shared/src/ledger/queries/shell.rs @@ -60,10 +60,10 @@ router! {SHELL, -> bool = storage_has_key, // was the transaction accepted? - ( "accepted" / [tx_hash: Hash]) -> Vec = accepted, + ( "accepted" / [tx_hash: Hash]) -> Option = accepted, // was the transaction applied? - ( "applied" / [tx_hash: Hash]) -> Vec = applied, + ( "applied" / [tx_hash: Hash]) -> Option = applied, } // Handlers: From 20ad42b94cd81bc0bdbce984801b08a2a58828be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Wed, 16 Nov 2022 16:07:37 +0100 Subject: [PATCH 8/8] changelog: add #674 --- .changelog/unreleased/improvements/674-event-log.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/unreleased/improvements/674-event-log.md diff --git a/.changelog/unreleased/improvements/674-event-log.md b/.changelog/unreleased/improvements/674-event-log.md new file mode 100644 index 0000000000..8dc0efaa55 --- /dev/null +++ b/.changelog/unreleased/improvements/674-event-log.md @@ -0,0 +1,3 @@ +- Added a custom events store and replaced WebSocket client for + transaction results with query endpoints to the events store. + ([#674](https://github.com/anoma/namada/pull/674)) \ No newline at end of file