Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a metric which counts how many chunks couldn't fit all transactions from the pool #10422

Merged
merged 9 commits into from
Jan 17, 2024
6 changes: 3 additions & 3 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::ValidatorSchedule;
use crate::types::{
ApplyChunkBlockContext, ApplyChunkResult, ApplyChunkShardContext, ApplyResultForResharding,
RuntimeAdapter, RuntimeStorageConfig,
PreparedTransactions, RuntimeAdapter, RuntimeStorageConfig,
};
use crate::BlockHeader;
use borsh::{BorshDeserialize, BorshSerialize};
Expand Down Expand Up @@ -1032,12 +1032,12 @@ impl RuntimeAdapter for KeyValueRuntime {
_chain_validate: &mut dyn FnMut(&SignedTransaction) -> bool,
_current_protocol_version: ProtocolVersion,
_time_limit: Option<Duration>,
) -> Result<Vec<SignedTransaction>, Error> {
) -> Result<PreparedTransactions, Error> {
let mut res = vec![];
while let Some(iter) = transactions.next() {
res.push(iter.next().unwrap());
}
Ok(res)
Ok(PreparedTransactions { transactions: res, limited_by: None })
}

fn apply_chunk(
Expand Down
23 changes: 22 additions & 1 deletion chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,27 @@ pub struct ApplyChunkShardContext<'a> {
pub is_first_block_with_chunk_of_version: bool,
}

// Contains transactions that were fetched from the transaction pool
// and prepared for adding them to a new chunk that is being produced.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PreparedTransactions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a small comment please?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typically we use tripple / for struct and method comments

/// Contains transactions ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, fixed 👍

/// Prepared transactions
pub transactions: Vec<SignedTransaction>,
/// Describes which limit was hit when preparing the transactions.
pub limited_by: Option<PrepareTransactionsLimit>,
}

// Chunk producer prepares transactions from the transaction pool
// until it hits some limit (too many transactions, too much gas used, etc).
// This enum describes which limit was hit when preparing transactions.
#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::AsRefStr)]
pub enum PrepareTransactionsLimit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Gas,
Size,
Time,
ReceiptCount,
}

/// Bridge between the chain and the runtime.
/// Main function is to update state given transactions.
/// Additionally handles validators.
Expand Down Expand Up @@ -378,7 +399,7 @@ pub trait RuntimeAdapter: Send + Sync {
chain_validate: &mut dyn FnMut(&SignedTransaction) -> bool,
current_protocol_version: ProtocolVersion,
time_limit: Option<Duration>,
) -> Result<Vec<SignedTransaction>, Error>;
) -> Result<PreparedTransactions, Error>;

/// Returns true if the shard layout will change in the next epoch
/// Current epoch is the epoch of the block after `parent_hash`
Expand Down
99 changes: 55 additions & 44 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use near_chain::orphan::OrphanMissingChunks;
use near_chain::resharding::ReshardingRequest;
use near_chain::state_snapshot_actor::SnapshotCallbacks;
use near_chain::test_utils::format_hash;
use near_chain::types::RuntimeAdapter;
use near_chain::types::{ChainConfig, LatestKnown};
use near_chain::types::{PreparedTransactions, RuntimeAdapter};
use near_chain::{
BlockProcessingArtifact, BlockStatus, Chain, ChainGenesis, ChainStoreAccess,
DoneApplyChunkCallback, Doomslug, DoomslugThresholdMode, Provenance,
Expand Down Expand Up @@ -853,20 +853,23 @@ impl Client {
.map_err(|err| Error::ChunkProducer(format!("No chunk extra available: {}", err)))?;

let prev_block_header = self.chain.get_block_header(&prev_block_hash)?;
let transactions = self.prepare_transactions(
let prepared_transactions = self.prepare_transactions(
shard_uid,
chunk_extra.gas_limit(),
*chunk_extra.state_root(),
&prev_block_header,
)?;
#[cfg(feature = "test_features")]
let transactions = Self::maybe_insert_invalid_transaction(
transactions,
prev_block_hash,
self.produce_invalid_tx_in_chunks,
);
let num_filtered_transactions = transactions.len();
let (tx_root, _) = merklize(&transactions);
let prepared_transactions = PreparedTransactions {
transactions: Self::maybe_insert_invalid_transaction(
prepared_transactions.transactions,
prev_block_hash,
self.produce_invalid_tx_in_chunks,
),
limited_by: prepared_transactions.limited_by,
};
let num_filtered_transactions = prepared_transactions.transactions.len();
let (tx_root, _) = merklize(&prepared_transactions.transactions);
let outgoing_receipts = self.chain.get_outgoing_receipts_for_shard(
prev_block_hash,
shard_id,
Expand All @@ -888,7 +891,7 @@ impl Client {
chunk_extra.gas_limit(),
chunk_extra.balance_burnt(),
chunk_extra.validator_proposals().collect(),
transactions,
prepared_transactions.transactions,
&outgoing_receipts,
outgoing_receipts_root,
tx_root,
Expand All @@ -913,6 +916,12 @@ impl Client {
chunk_production_duration_millis: Some(timer.elapsed().as_millis() as u64),
},
);
if let Some(limit) = prepared_transactions.limited_by {
// When some transactions from the pool didn't fit into the chunk due to a limit, it's reported in a metric.
metrics::PRODUCED_CHUNKS_SOME_POOL_TRANSACTIONS_DIDNT_FIT
.with_label_values(&[&shard_id.to_string(), limit.as_ref()])
.inc();
}

Ok(Some((encoded_chunk, merkle_paths, outgoing_receipts)))
}
Expand Down Expand Up @@ -967,49 +976,51 @@ impl Client {
gas_limit: Gas,
state_root: StateRoot,
prev_block_header: &BlockHeader,
) -> Result<Vec<SignedTransaction>, Error> {
) -> Result<PreparedTransactions, Error> {
let Self { chain, sharded_tx_pool, epoch_manager, runtime_adapter: runtime, .. } = self;

let shard_id = shard_uid.shard_id as ShardId;
let next_epoch_id = epoch_manager.get_epoch_id_from_prev_block(prev_block_header.hash())?;
let protocol_version = epoch_manager.get_epoch_protocol_version(&next_epoch_id)?;

let transactions = if let Some(mut iter) = sharded_tx_pool.get_pool_iterator(shard_uid) {
let transaction_validity_period = chain.transaction_validity_period;
runtime.prepare_transactions(
prev_block_header.next_gas_price(),
gas_limit,
&next_epoch_id,
shard_id,
state_root,
// while the height of the next block that includes the chunk might not be prev_height + 1,
// passing it will result in a more conservative check and will not accidentally allow
// invalid transactions to be included.
prev_block_header.height() + 1,
&mut iter,
&mut |tx: &SignedTransaction| -> bool {
chain
.chain_store()
.check_transaction_validity_period(
prev_block_header,
&tx.transaction.block_hash,
transaction_validity_period,
)
.is_ok()
},
protocol_version,
self.config.produce_chunk_add_transactions_time_limit.get(),
)?
} else {
vec![]
};
let prepared_transactions =
if let Some(mut iter) = sharded_tx_pool.get_pool_iterator(shard_uid) {
let transaction_validity_period = chain.transaction_validity_period;
runtime.prepare_transactions(
prev_block_header.next_gas_price(),
gas_limit,
&next_epoch_id,
shard_id,
state_root,
// while the height of the next block that includes the chunk might not be prev_height + 1,
// passing it will result in a more conservative check and will not accidentally allow
// invalid transactions to be included.
prev_block_header.height() + 1,
&mut iter,
&mut |tx: &SignedTransaction| -> bool {
chain
.chain_store()
.check_transaction_validity_period(
prev_block_header,
&tx.transaction.block_hash,
transaction_validity_period,
)
.is_ok()
},
protocol_version,
self.config.produce_chunk_add_transactions_time_limit.get(),
)?
} else {
PreparedTransactions { transactions: Vec::new(), limited_by: None }
};
// Reintroduce valid transactions back to the pool. They will be removed when the chunk is
// included into the block.
let reintroduced_count = sharded_tx_pool.reintroduce_transactions(shard_uid, &transactions);
if reintroduced_count < transactions.len() {
debug!(target: "client", reintroduced_count, num_tx = transactions.len(), "Reintroduced transactions");
let reintroduced_count = sharded_tx_pool
.reintroduce_transactions(shard_uid, &prepared_transactions.transactions);
if reintroduced_count < prepared_transactions.transactions.len() {
debug!(target: "client", reintroduced_count, num_tx = prepared_transactions.transactions.len(), "Reintroduced transactions");
}
Ok(transactions)
Ok(prepared_transactions)
}

pub fn send_challenges(&mut self, challenges: Vec<ChallengeBody>) {
Expand Down
12 changes: 12 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ pub(crate) static CHUNK_PRODUCED_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});

pub(crate) static PRODUCED_CHUNKS_SOME_POOL_TRANSACTIONS_DIDNT_FIT: Lazy<IntCounterVec> = Lazy::new(
|| {
try_create_int_counter_vec(
"near_produced_chunks_some_pool_transactions_didnt_fit",
"Total number of produced chunks where some transactions from the pool didn't fit in the chunk \
(since starting this node). The limited_by label specifies which limit was hit.",
&["shard_id", "limited_by"],
)
.unwrap()
},
);

pub(crate) static IS_VALIDATOR: Lazy<IntGauge> = Lazy::new(|| {
try_create_int_gauge(
"near_is_validator",
Expand Down
43 changes: 28 additions & 15 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use borsh::BorshDeserialize;
use errors::FromStateViewerErrors;
use near_chain::types::{
ApplyChunkBlockContext, ApplyChunkResult, ApplyChunkShardContext, ApplyResultForResharding,
RuntimeAdapter, RuntimeStorageConfig, StorageDataSource, Tip,
PrepareTransactionsLimit, PreparedTransactions, RuntimeAdapter, RuntimeStorageConfig,
StorageDataSource, Tip,
};
use near_chain::Error;
use near_chain_configs::{
Expand Down Expand Up @@ -716,12 +717,8 @@ impl RuntimeAdapter for NightshadeRuntime {
chain_validate: &mut dyn FnMut(&SignedTransaction) -> bool,
current_protocol_version: ProtocolVersion,
time_limit: Option<Duration>,
) -> Result<Vec<SignedTransaction>, Error> {
) -> Result<PreparedTransactions, Error> {
let start_time = std::time::Instant::now();
let time_limit_reached = || match time_limit {
Some(limit_duration) => start_time.elapsed() >= limit_duration,
None => false,
};
let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, epoch_id)?;
let mut state_update = self.tries.new_trie_update(shard_uid, state_root);

Expand All @@ -730,7 +727,7 @@ impl RuntimeAdapter for NightshadeRuntime {
let mut total_size = 0u64;
// TODO: Update gas limit for transactions
let transactions_gas_limit = gas_limit / 2;
let mut transactions = vec![];
let mut result = PreparedTransactions { transactions: Vec::new(), limited_by: None };
wacban marked this conversation as resolved.
Show resolved Hide resolved
let mut num_checked_transactions = 0;

let runtime_config = self.runtime_config_store.get_config(current_protocol_version);
Expand Down Expand Up @@ -770,11 +767,27 @@ impl RuntimeAdapter for NightshadeRuntime {
/ (runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_write_value_byte)
+ runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_read_value_byte));

while total_gas_burnt < transactions_gas_limit
&& total_size < size_limit
&& transactions.len() < new_receipt_count_limit
&& !time_limit_reached()
{
// Add new transactions to the result until some limit is hit or the transactions run out.
loop {
if total_gas_burnt >= transactions_gas_limit {
result.limited_by = Some(PrepareTransactionsLimit::Gas);
break;
}
if total_size >= size_limit {
result.limited_by = Some(PrepareTransactionsLimit::Size);
break;
}
if result.transactions.len() >= new_receipt_count_limit {
result.limited_by = Some(PrepareTransactionsLimit::ReceiptCount);
break;
}
if let Some(time_limit) = &time_limit {
if start_time.elapsed() >= *time_limit {
result.limited_by = Some(PrepareTransactionsLimit::Time);
break;
}
}

if let Some(iter) = pool_iterator.next() {
while let Some(tx) = iter.next() {
num_checked_transactions += 1;
Expand All @@ -799,7 +812,7 @@ impl RuntimeAdapter for NightshadeRuntime {
state_update.commit(StateChangeCause::NotWritableToDisk);
total_gas_burnt += verification_result.gas_burnt;
total_size += tx.get_size();
transactions.push(tx);
result.transactions.push(tx);
break;
}
Err(RuntimeError::InvalidTxError(err)) => {
Expand All @@ -817,11 +830,11 @@ impl RuntimeAdapter for NightshadeRuntime {
break;
}
}
debug!(target: "runtime", "Transaction filtering results {} valid out of {} pulled from the pool", transactions.len(), num_checked_transactions);
debug!(target: "runtime", "Transaction filtering results {} valid out of {} pulled from the pool", result.transactions.len(), num_checked_transactions);
metrics::PREPARE_TX_SIZE
.with_label_values(&[&shard_id.to_string()])
.observe(total_size as f64);
Ok(transactions)
Ok(result)
}

fn get_gc_stop_height(&self, block_hash: &CryptoHash) -> BlockHeight {
Expand Down
Loading