Skip to content

Commit

Permalink
Move namada_apps::node::ledger::events to the shared crate
Browse files Browse the repository at this point in the history
Move event log from `Shell` to `Storage`

Guard less things with the ferveo-tpke feature so that wasm compiles

Fix cargo doc link

Update client to use new RPC router for accepted/applied queries

Use Vec<Event> instead of raw Vec<u8>

Remove no longer used From<TxEventQuery> impl

query_tx_events should return eyre::Result
  • Loading branch information
james-chf authored and sug0 committed Nov 3, 2022
1 parent ee5c559 commit ecf8d7a
Show file tree
Hide file tree
Showing 19 changed files with 206 additions and 88 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ blake2b-rs = "0.2.0"
borsh = "0.9.0"
byte-unit = "4.0.13"
byteorder = "1.4.2"
circular-queue = "0.2.6"
# https://github.com/clap-rs/clap/issues/1037
clap = {git = "https://github.com/clap-rs/clap/", tag = "v3.0.0-beta.2", default-features = false, features = ["std", "suggestions", "color", "cargo"]}
color-eyre = "0.5.10"
Expand Down
64 changes: 29 additions & 35 deletions apps/src/lib/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use async_std::path::PathBuf;
use async_std::prelude::*;
use borsh::BorshDeserialize;
use data_encoding::HEXLOWER;
use eyre::{eyre, Context as EyreContext};
use itertools::Itertools;
use namada::ledger::events::Event;
use namada::ledger::governance::parameters::GovParams;
use namada::ledger::governance::storage as gov_storage;
use namada::ledger::governance::utils::Votes;
Expand All @@ -29,6 +31,7 @@ use namada::types::governance::{
OfflineProposal, OfflineVote, ProposalResult, ProposalVote, TallyResult,
VotePower,
};
use namada::types::hash::Hash;
use namada::types::key::*;
use namada::types::storage::{Epoch, Key, KeySeg, PrefixValue};
use namada::types::token::{balance_key, Amount};
Expand All @@ -43,8 +46,6 @@ use crate::facade::tendermint_rpc::query::Query;
use crate::facade::tendermint_rpc::{
Client, HttpClient, Order, SubscriptionClient, WebSocketClient,
};
use crate::node::ledger::events::Event;
use crate::node::ledger::rpc::Path;

/// Query the status of a given transaction.
///
Expand Down Expand Up @@ -74,37 +75,15 @@ pub async fn query_tx_status(
let mut backoff = ONE_SECOND;

loop {
let data = vec![];
tracing::debug!(query = ?status, "Querying tx status");
let response = match client
.abci_query(Some(status.into()), data, None, false)
.await
{
let mut events = match query_tx_events(&client, status).await {
Ok(response) => response,
Err(err) => {
tracing::debug!(%err, "ABCI query failed");
sleep_update(status, &mut backoff).await;
continue;
}
};
let mut events = match response.code {
Code::Ok => {
match Vec::<Event>::try_from_slice(&response.value[..]) {
Ok(events) => events,
Err(err) => {
eprintln!("Error decoding the event value: {err}");
break Err(());
}
}
}
Code::Err(err) => {
eprintln!(
"Error in the query {} (error code {})",
response.info, err
);
break Err(());
}
};
if let Some(e) = events.pop() {
// we should only have one event matching the query
break Ok(e);
Expand Down Expand Up @@ -1440,14 +1419,6 @@ impl<'a> TxEventQuery<'a> {
}
}

impl<'a> From<TxEventQuery<'a>> for crate::facade::tendermint::abci::Path {
fn from(tx_query: TxEventQuery<'a>) -> Self {
format!("{}/{}", tx_query.event_type(), tx_query.tx_hash())
.parse()
.expect("This operation is infallible")
}
}

/// Transaction event queries are semantically a subset of general queries
impl<'a> From<TxEventQuery<'a>> for Query {
fn from(tx_query: TxEventQuery<'a>) -> Self {
Expand All @@ -1462,6 +1433,29 @@ impl<'a> From<TxEventQuery<'a>> for Query {
}
}

pub async fn query_tx_events(
client: &HttpClient,
tx_event_query: TxEventQuery<'_>,
) -> eyre::Result<Vec<Event>> {
let tx_hash: Hash = tx_event_query.tx_hash().try_into()?;
match tx_event_query {
TxEventQuery::Accepted(_) => RPC
.shell()
.accepted(client, &tx_hash)
.await
.wrap_err_with(|| {
eyre!("Failed querying whether a transaction was accepted")
}),
TxEventQuery::Applied(_) => RPC
.shell()
.applied(client, &tx_hash)
.await
.wrap_err_with(|| {
eyre!("Error querying whether a transaction was applied")
}),
}
}

/// Lookup the full response accompanying the specified transaction event
pub async fn query_tx_response(
ledger_address: &TendermintAddress,
Expand Down Expand Up @@ -1601,7 +1595,7 @@ pub async fn get_proposal_votes(
if let Some(vote_iter) = vote_iter {
for (key, vote) in vote_iter {
let voter_address = gov_storage::get_voter_address(&key)
.expect("Vote key should contains the voting address.")
.expect("Vote key should contain the voting address.")
.clone();
if vote.is_yay() && validators.contains(&voter_address) {
let amount =
Expand All @@ -1611,7 +1605,7 @@ pub async fn get_proposal_votes(
let validator_address =
gov_storage::get_vote_delegation_address(&key)
.expect(
"Vote key should contains the delegation address.",
"Vote key should contain the delegation address.",
)
.clone();
let delegator_token_amount = get_bond_amount_at(
Expand Down
2 changes: 1 addition & 1 deletion apps/src/lib/client/tendermint_rpc_types.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::convert::TryFrom;

use namada::ledger::events::Event;
use namada::proto::Tx;
use namada::types::address::Address;
use serde::Serialize;

use crate::cli::safe_exit;
use crate::node::ledger::events::Event;

/// Data needed for broadcasting a tx and
/// monitoring its progress on chain
Expand Down
26 changes: 17 additions & 9 deletions apps/src/lib/node/ledger/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod abortable;
mod broadcaster;
pub mod events;
mod shell;
mod shims;
pub mod storage;
Expand Down Expand Up @@ -55,7 +54,6 @@ const ENV_VAR_RAYON_THREADS: &str = "ANOMA_RAYON_THREADS";
// Poll::Ready(Ok(()))
// }
//```

impl Shell {
fn load_proposals(&mut self) {
let proposals_key = gov_storage::get_commiting_proposals_prefix(
Expand Down Expand Up @@ -199,10 +197,15 @@ pub fn reset(config: config::Ledger) -> Result<(), shell::Error> {
shell::reset(config)
}

/// Runs three concurrent tasks: A tendermint node, a shell which contains an
/// ABCI server for talking to the tendermint node, and a broadcaster so that
/// the ledger may submit txs to the chain. All must be alive for correct
/// functioning.
/// Runs and monitors a few concurrent tasks.
///
/// This includes:
/// - A Tendermint node.
/// - A shell which contains an ABCI server, for talking to the Tendermint
/// node.
/// - A [`Broadcaster`], for the ledger to submit txs to Tendermint's mempool.
///
/// All must be alive for correct functioning.
async fn run_aux(config: config::Ledger, wasm_dir: PathBuf) {
let setup_data = run_aux_setup(&config, &wasm_dir).await;

Expand Down Expand Up @@ -409,8 +412,7 @@ fn start_abci_broadcaster_shell(
let _ = bc_abort_send.send(());
})
} else {
// dummy async task, which will resolve instantly
tokio::spawn(async { std::future::ready(()).await })
spawn_dummy_task(())
};

// Setup DB cache, it must outlive the DB instance that's in the shell
Expand Down Expand Up @@ -518,7 +520,7 @@ async fn run_abci(
}

/// Launches a new task managing a Tendermint process into the asynchronous
/// runtime, and returns its `JoinHandle`.
/// runtime, and returns its [`task::JoinHandle`].
fn start_tendermint(
spawner: &mut AbortableSpawner,
config: &config::Ledger,
Expand Down Expand Up @@ -578,3 +580,9 @@ fn start_tendermint(
}
})
}

/// Spawn a dummy asynchronous task into the runtime,
/// which will resolve instantly.
fn spawn_dummy_task<T: Send + 'static>(ready: T) -> task::JoinHandle<T> {
tokio::spawn(async { std::future::ready(ready).await })
}
5 changes: 4 additions & 1 deletion apps/src/lib/node/ledger/shell/finalize_block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Implementation of the `FinalizeBlock` ABCI++ method for the Shell

use namada::ledger::protocol;
use namada::ledger::storage::EventLogExt;
use namada::types::storage::{BlockHash, Header};

use super::governance::execute_governance_proposals;
Expand Down Expand Up @@ -239,7 +240,9 @@ where
.finalize_transaction()
.map_err(|_| Error::GasOverflow)?;

self.event_log_mut().log_events(response.events.clone());
self.storage
.event_log_mut()
.log_events(response.events.clone());

Ok(response)
}
Expand Down
2 changes: 1 addition & 1 deletion apps/src/lib/node/ledger/shell/governance.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use namada::ledger::events::EventType;
use namada::ledger::governance::storage as gov_storage;
use namada::ledger::governance::utils::{
compute_tally, get_proposal_votes, ProposalEvent,
Expand All @@ -13,7 +14,6 @@ use namada::types::storage::Epoch;
use namada::types::token;

use super::*;
use crate::node::ledger::events::EventType;

#[derive(Default)]
pub struct ProposalsResult {
Expand Down
19 changes: 1 addition & 18 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::path::{Path, PathBuf};
use std::rc::Rc;

use borsh::{BorshDeserialize, BorshSerialize};
use namada::ledger::events::Event;
use namada::ledger::gas::BlockGasMeter;
use namada::ledger::pos::namada_proof_of_stake::types::{
ActiveValidator, ValidatorSetUpdate,
Expand Down Expand Up @@ -55,8 +56,6 @@ use crate::facade::tendermint_proto::abci::{
};
use crate::facade::tendermint_proto::crypto::public_key;
use crate::facade::tower_abci::{request, response};
use crate::node::ledger::events::log::EventLog;
use crate::node::ledger::events::Event;
use crate::node::ledger::shims::abcipp_shim_types::shim;
use crate::node::ledger::shims::abcipp_shim_types::shim::response::TxResult;
use crate::node::ledger::{storage, tendermint_node};
Expand Down Expand Up @@ -214,8 +213,6 @@ where
tx_wasm_cache: TxCache<WasmCacheRwAccess>,
/// Proposal execution tracking
pub proposal_data: HashSet<u64>,
/// Log of events emitted by `FinalizeBlock`.
event_log: EventLog,
}

impl<D, H> Shell<D, H>
Expand Down Expand Up @@ -321,23 +318,9 @@ where
tx_wasm_compilation_cache as usize,
),
proposal_data: HashSet::new(),
// TODO: config event log params
event_log: EventLog::default(),
}
}

/// Return a reference to the [`EventLog`].
#[inline]
pub fn event_log(&self) -> &EventLog {
&self.event_log
}

/// Return a mutable reference to the [`EventLog`].
#[inline]
pub fn event_log_mut(&mut self) -> &mut EventLog {
&mut self.event_log
}

/// Iterate over the wrapper txs in order
#[allow(dead_code)]
fn iter_tx_queue(&mut self) -> impl Iterator<Item = &WrapperTx> {
Expand Down
7 changes: 4 additions & 3 deletions apps/src/lib/node/ledger/shims/abcipp_shim_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ pub mod shim {

/// Custom types for response payloads
pub mod response {
use namada::ledger::events::Event;
#[cfg(feature = "abcipp")]
use namada::ledger::events::EventLevel;

use crate::facade::tendermint_proto::abci::{
Event as TmEvent, ResponseProcessProposal, ValidatorUpdate,
};
Expand All @@ -276,9 +280,6 @@ pub mod shim {
abci::{ExecTxResult, ResponseFinalizeBlock},
types::ConsensusParams,
};
use crate::node::ledger::events::Event;
#[cfg(feature = "abcipp")]
use crate::node::ledger::events::EventLevel;

#[derive(Debug, Default)]
pub struct VerifyHeader;
Expand Down
1 change: 1 addition & 0 deletions shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ arse-merkle-tree = {package = "sparse-merkle-tree", git = "https://github.com/he
async-trait = {version = "0.1.51", optional = true}
bech32 = "0.8.0"
borsh = "0.9.0"
circular-queue = "0.2.6"
chrono = {version = "0.4.22", default-features = false, features = ["clock", "std"]}
# Using unreleased commit on top of version 0.5.0 that adds Sync to the CLruCache
clru = {git = "https://github.com/marmeladema/clru-rs.git", rev = "71ca566"}
Expand Down
Loading

0 comments on commit ecf8d7a

Please sign in to comment.