Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement event log and patch Namada to use it for tx querying #674

Merged
merged 8 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/unreleased/improvements/674-event-log.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Added a custom events store and replaced WebSocket client for
transaction results with query endpoints to the events store.
([#674](https://github.com/anoma/namada/pull/674))
24 changes: 11 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ flate2 = "1.0.22"
file-lock = "2.0.2"
futures = "0.3"
itertools = "0.10.1"
jsonpath_lib = "0.3.0"
libc = "0.2.97"
libloading = "0.7.2"
num-derive = "0.3.3"
Expand Down
1 change: 0 additions & 1 deletion apps/src/lib/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod rpc;
pub mod signing;
pub mod tendermint_rpc_types;
mod tendermint_websocket_client;
pub mod tx;
pub mod utils;
115 changes: 98 additions & 17 deletions apps/src/lib/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use async_std::path::PathBuf;
use async_std::prelude::*;
use borsh::BorshDeserialize;
use data_encoding::HEXLOWER;
use eyre::{eyre, Context as EyreContext};
use itertools::Itertools;
use namada::ledger::events::Event;
use namada::ledger::governance::parameters::GovParams;
use namada::ledger::governance::storage as gov_storage;
use namada::ledger::governance::utils::Votes;
Expand All @@ -29,10 +31,12 @@ use namada::types::governance::{
OfflineProposal, OfflineVote, ProposalResult, ProposalVote, TallyResult,
VotePower,
};
use namada::types::hash::Hash;
use namada::types::key::*;
use namada::types::storage::{Epoch, Key, KeySeg, PrefixValue};
use namada::types::token::{balance_key, Amount};
use namada::types::{address, storage, token};
use tokio::time::{Duration, Instant};

use crate::cli::{self, args, Context};
use crate::client::tendermint_rpc_types::TxResponse;
Expand All @@ -43,6 +47,57 @@ use crate::facade::tendermint_rpc::{
Client, HttpClient, Order, SubscriptionClient, WebSocketClient,
};

/// Query the status of a given transaction.
///
/// If a response is not delivered until `deadline`, we exit the cli with an
/// error.
pub async fn query_tx_status(
status: TxEventQuery<'_>,
address: TendermintAddress,
deadline: Instant,
) -> Event {
const ONE_SECOND: Duration = Duration::from_secs(1);
// sleep for the duration of `backoff`,
// and update the underlying value
async fn sleep_update(query: TxEventQuery<'_>, backoff: &mut Duration) {
tracing::debug!(
?query,
duration = ?backoff,
"Retrying tx status query after timeout",
);
// simple linear backoff - if an event is not available,
// increase the backoff duration by one second
tokio::time::sleep(*backoff).await;
*backoff += ONE_SECOND;
}
tokio::time::timeout_at(deadline, async move {
let client = HttpClient::new(address).unwrap();
let mut backoff = ONE_SECOND;

loop {
tracing::debug!(query = ?status, "Querying tx status");
let maybe_event = match query_tx_events(&client, status).await {
Ok(response) => response,
Err(err) => {
tracing::debug!(%err, "ABCI query failed");
sleep_update(status, &mut backoff).await;
continue;
}
};
if let Some(e) = maybe_event {
break Ok(e);
}
sleep_update(status, &mut backoff).await;
}
})
.await
.map_err(|_| {
eprintln!("Transaction status query deadline of {deadline:?} exceeded");
})
.and_then(|result| result)
.unwrap_or_else(|_| cli::safe_exit(1))
}

/// Query the epoch of the last committed block
pub async fn query_epoch(args: args::Query) -> Epoch {
let client = HttpClient::new(args.ledger_address).unwrap();
Expand Down Expand Up @@ -1339,23 +1394,23 @@ pub async fn query_has_storage_key(
}

/// Represents a query for an event pertaining to the specified transaction
#[derive(Debug, Clone)]
pub enum TxEventQuery {
Accepted(String),
Applied(String),
#[derive(Debug, Copy, Clone)]
pub enum TxEventQuery<'a> {
Accepted(&'a str),
Applied(&'a str),
}

impl TxEventQuery {
impl<'a> TxEventQuery<'a> {
/// The event type to which this event query pertains
fn event_type(&self) -> &'static str {
fn event_type(self) -> &'static str {
match self {
TxEventQuery::Accepted(_tx_hash) => "accepted",
TxEventQuery::Applied(_tx_hash) => "applied",
TxEventQuery::Accepted(_) => "accepted",
TxEventQuery::Applied(_) => "applied",
}
}

/// The transaction to which this event query pertains
fn tx_hash(&self) -> &String {
fn tx_hash(self) -> &'a str {
match self {
TxEventQuery::Accepted(tx_hash) => tx_hash,
TxEventQuery::Applied(tx_hash) => tx_hash,
Expand All @@ -1364,8 +1419,8 @@ impl TxEventQuery {
}

/// Transaction event queries are semantically a subset of general queries
impl From<TxEventQuery> for Query {
fn from(tx_query: TxEventQuery) -> Self {
impl<'a> From<TxEventQuery<'a>> for Query {
fn from(tx_query: TxEventQuery<'a>) -> Self {
match tx_query {
TxEventQuery::Accepted(tx_hash) => {
Query::default().and_eq("accepted.hash", tx_hash)
Expand All @@ -1377,17 +1432,43 @@ impl From<TxEventQuery> for Query {
}
}

/// Call the corresponding `tx_event_query` RPC method, to fetch
/// the current status of a transation.
pub async fn query_tx_events(
client: &HttpClient,
tx_event_query: TxEventQuery<'_>,
) -> eyre::Result<Option<Event>> {
let tx_hash: Hash = tx_event_query.tx_hash().try_into()?;
match tx_event_query {
TxEventQuery::Accepted(_) => RPC
.shell()
.accepted(client, &tx_hash)
.await
.wrap_err_with(|| {
eyre!("Failed querying whether a transaction was accepted")
}),
TxEventQuery::Applied(_) => RPC
.shell()
.applied(client, &tx_hash)
.await
.wrap_err_with(|| {
eyre!("Error querying whether a transaction was applied")
}),
}
}

/// Lookup the full response accompanying the specified transaction event
// TODO: maybe remove this in favor of `query_tx_status`
pub async fn query_tx_response(
ledger_address: &TendermintAddress,
tx_query: TxEventQuery,
tx_query: TxEventQuery<'_>,
) -> Result<TxResponse, TError> {
// Connect to the Tendermint server holding the transactions
let (client, driver) = WebSocketClient::new(ledger_address.clone()).await?;
let driver_handle = tokio::spawn(async move { driver.run().await });
// Find all blocks that apply a transaction with the specified hash
let blocks = &client
.block_search(Query::from(tx_query.clone()), 1, 255, Order::Ascending)
.block_search(tx_query.into(), 1, 255, Order::Ascending)
.await
.expect("Unable to query for transaction with given hash")
.blocks;
Expand Down Expand Up @@ -1463,7 +1544,7 @@ pub async fn query_result(_ctx: Context, args: args::QueryResult) {
// First try looking up application event pertaining to given hash.
let tx_response = query_tx_response(
&args.query.ledger_address,
TxEventQuery::Applied(args.tx_hash.clone()),
TxEventQuery::Applied(&args.tx_hash),
)
.await;
match tx_response {
Expand All @@ -1477,7 +1558,7 @@ pub async fn query_result(_ctx: Context, args: args::QueryResult) {
// If this fails then instead look for an acceptance event.
let tx_response = query_tx_response(
&args.query.ledger_address,
TxEventQuery::Accepted(args.tx_hash),
TxEventQuery::Accepted(&args.tx_hash),
)
.await;
match tx_response {
Expand Down Expand Up @@ -1516,7 +1597,7 @@ pub async fn get_proposal_votes(
if let Some(vote_iter) = vote_iter {
for (key, vote) in vote_iter {
let voter_address = gov_storage::get_voter_address(&key)
.expect("Vote key should contains the voting address.")
.expect("Vote key should contain the voting address.")
.clone();
if vote.is_yay() && validators.contains(&voter_address) {
let amount =
Expand All @@ -1526,7 +1607,7 @@ pub async fn get_proposal_votes(
let validator_address =
gov_storage::get_vote_delegation_address(&key)
.expect(
"Vote key should contains the delegation address.",
"Vote key should contain the delegation address.",
)
.clone();
let delegator_token_amount = get_bond_amount_at(
Expand Down
Loading