diff --git a/Cargo.lock b/Cargo.lock index ca3e47878f1d8..80a48ae3ef7c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3566,6 +3566,27 @@ dependencies = [ "tokio-retry", ] +[[package]] +name = "aptos-replay-benchmark" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-block-executor", + "aptos-logger", + "aptos-move-debugger", + "aptos-push-metrics", + "aptos-rest-client", + "aptos-types", + "aptos-vm", + "bcs 0.1.4", + "claims", + "clap 4.5.21", + "parking_lot 0.12.1", + "serde", + "tokio", + "url", +] + [[package]] name = "aptos-resource-viewer" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cc429d5c1435d..7bb1b9b0c15e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ members = [ "aptos-move/move-examples", "aptos-move/mvhashmap", "aptos-move/package-builder", + "aptos-move/replay-benchmark", "aptos-move/script-composer", "aptos-move/vm-genesis", "aptos-node", @@ -419,6 +420,7 @@ aptos-push-metrics = { path = "crates/aptos-push-metrics" } aptos-rate-limiter = { path = "crates/aptos-rate-limiter" } aptos-release-builder = { path = "aptos-move/aptos-release-builder" } aptos-reliable-broadcast = { path = "crates/reliable-broadcast" } +aptos-replay-benchmark = { path = "aptos-move/replay-benchmark" } aptos-resource-viewer = { path = "aptos-move/aptos-resource-viewer" } aptos-rest-client = { path = "crates/aptos-rest-client" } aptos-retrier = { path = "crates/aptos-retrier" } diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 2dc191812cc1a..a27281eceb672 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -1,12 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::{bail, format_err, Result}; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, -}; +use anyhow::{bail, format_err}; +use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_gas_profiling::{GasProfiler, TransactionGasLog}; use aptos_rest_client::Client; use aptos_types::{ @@ -28,9 +24,7 @@ use aptos_validator_interface::{ AptosValidatorInterface, DBDebuggerInterface, DebuggerStateView, RestDebuggerInterface, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, - data_cache::AsMoveResolver, - AptosVM, + aptos_vm::AptosVMBlockExecutor, data_cache::AsMoveResolver, AptosVM, VMBlockExecutor, }; use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_logging::log_schema::AdapterLogSchema; @@ -47,23 +41,31 @@ impl AptosDebugger { Self { debugger } } - pub fn rest_client(rest_client: Client) -> Result { + pub fn rest_client(rest_client: Client) -> anyhow::Result { Ok(Self::new(Arc::new(RestDebuggerInterface::new(rest_client)))) } - pub fn db + Clone>(db_root_path: P) -> Result { + pub fn db + Clone>(db_root_path: P) -> anyhow::Result { Ok(Self::new(Arc::new(DBDebuggerInterface::open( db_root_path, )?))) } + pub async fn get_committed_transactions( + &self, + begin: Version, + limit: u64, + ) -> anyhow::Result<(Vec, Vec)> { + self.debugger.get_committed_transactions(begin, limit).await + } + pub fn execute_transactions_at_version( &self, version: Version, txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let sig_verified_txns: Vec = txns.into_iter().map(|x| x.into()).collect::>(); let txn_provider = DefaultTxnProvider::new(sig_verified_txns); @@ -114,7 +116,7 @@ impl AptosDebugger { &self, version: Version, txn: SignedTransaction, - ) -> Result<(VMStatus, VMOutput, TransactionGasLog)> { + ) -> anyhow::Result<(VMStatus, VMOutput, TransactionGasLog)> { let state_view = DebuggerStateView::new(self.debugger.clone(), version); let log_context = AdapterLogSchema::new(state_view.id(), 0); let txn = txn @@ -166,11 +168,8 @@ impl AptosDebugger { use_same_block_boundaries: bool, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { - let (txns, txn_infos) = self - .debugger - .get_committed_transactions(begin, limit) - .await?; + ) -> anyhow::Result> { + let (txns, txn_infos) = self.get_committed_transactions(begin, limit).await?; if use_same_block_boundaries { // when going block by block, no need to worry about epoch boundaries @@ -238,7 +237,7 @@ impl AptosDebugger { txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let results = self.execute_transactions_at_version( begin, txns, @@ -268,7 +267,7 @@ impl AptosDebugger { repeat_execution_times: u64, concurrency_levels: &[usize], mut txn_infos: Vec, - ) -> Result> { + ) -> anyhow::Result> { let mut ret = vec![]; while limit != 0 { println!( @@ -301,7 +300,7 @@ impl AptosDebugger { txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let mut ret = vec![]; let mut cur = vec![]; let mut cur_version = begin; @@ -336,7 +335,7 @@ impl AptosDebugger { &self, account: AccountAddress, seq: u64, - ) -> Result> { + ) -> anyhow::Result> { self.debugger .get_version_by_account_sequence(account, seq) .await @@ -345,7 +344,7 @@ impl AptosDebugger { pub async fn get_committed_transaction_at_version( &self, version: Version, - ) -> Result<(Transaction, TransactionInfo)> { + ) -> anyhow::Result<(Transaction, TransactionInfo)> { let (mut txns, mut info) = self.debugger.get_committed_transactions(version, 1).await?; let txn = txns.pop().expect("there must be exactly 1 txn in the vec"); @@ -434,20 +433,16 @@ fn execute_block_no_limit( state_view: &DebuggerStateView, concurrency_level: usize, ) -> Result, VMStatus> { - AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - state_view, - &AptosModuleCacheManager::new(), - BlockExecutorConfig { - local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), - onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), - }, - TransactionSliceMetadata::unknown(), - None, - ) - .map(BlockOutput::into_transaction_outputs_forced) + let executor = AptosVMBlockExecutor::new(); + executor + .execute_block_with_config( + txn_provider, + state_view, + BlockExecutorConfig { + local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + }, + TransactionSliceMetadata::unknown(), + ) + .map(BlockOutput::into_transaction_outputs_forced) } diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index f250de5c60758..81db4cbded0c2 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -3,11 +3,7 @@ use crate::transactions; use aptos_bitvec::BitVec; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, -}; +use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_block_partitioner::{ v2::config::PartitionerV2Config, BlockPartitioner, PartitionerConfig, }; @@ -32,15 +28,15 @@ use aptos_types::{ }, ExecutionStatus, Transaction, TransactionOutput, TransactionStatus, }, - vm_status::VMStatus, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, + aptos_vm::AptosVMBlockExecutor, data_cache::AsMoveResolver, sharded_block_executor::{ local_executor_shard::{LocalExecutorClient, LocalExecutorService}, ShardedBlockExecutor, }, + VMBlockExecutor, }; use proptest::{collection::vec, prelude::Strategy, strategy::ValueTree, test_runner::TestRunner}; use std::{net::SocketAddr, sync::Arc, time::Instant}; @@ -217,20 +213,18 @@ where ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - self.state_view.as_ref(), - &AptosModuleCacheManager::new(), - BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), - TransactionSliceMetadata::unknown(), - None, - ) - .expect("VM should not fail to start") - .into_transaction_outputs_forced(); + + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + txn_provider, + self.state_view.as_ref(), + BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), + TransactionSliceMetadata::unknown(), + ) + .expect("Sequential block execution should succeed") + .into_transaction_outputs_forced(); + let exec_time = timer.elapsed().as_millis(); (output, block_size * 1000 / exec_time as usize) @@ -263,28 +257,25 @@ where fn execute_benchmark_parallel( &self, txn_provider: &DefaultTxnProvider, - concurrency_level_per_shard: usize, + concurrency_level: usize, maybe_block_gas_limit: Option, ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - self.state_view.as_ref(), - &AptosModuleCacheManager::new(), - BlockExecutorConfig::new_maybe_block_limit( - concurrency_level_per_shard, - maybe_block_gas_limit, - ), - TransactionSliceMetadata::unknown(), - None, - ) - .expect("VM should not fail to start") - .into_transaction_outputs_forced(); + + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + txn_provider, + self.state_view.as_ref(), + BlockExecutorConfig::new_maybe_block_limit( + concurrency_level, + maybe_block_gas_limit, + ), + TransactionSliceMetadata::unknown(), + ) + .expect("Parallel block execution should succeed") + .into_transaction_outputs_forced(); let exec_time = timer.elapsed().as_millis(); (output, block_size * 1000 / exec_time as usize) diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 1b469bcd82c9a..9e3cc84baaa4a 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -2780,35 +2780,31 @@ pub struct AptosVMBlockExecutor { module_cache_manager: AptosModuleCacheManager, } -impl VMBlockExecutor for AptosVMBlockExecutor { - fn new() -> Self { - Self { - module_cache_manager: AptosModuleCacheManager::new(), - } - } - - fn execute_block( +impl AptosVMBlockExecutor { + /// Executes transactions with the specified [BlockExecutorConfig] and returns output for each + /// one of them. + pub fn execute_block_with_config( &self, txn_provider: &DefaultTxnProvider, state_view: &(impl StateView + Sync), - onchain_config: BlockExecutorConfigFromOnchain, + config: BlockExecutorConfig, transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { - fail_point!("move_adapter::execute_block", |_| { + fail_point!("aptos_vm_block_executor::execute_block_with_config", |_| { Err(VMStatus::error( StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, None, )) }); + let log_context = AdapterLogSchema::new(state_view.id(), 0); + let num_txns = txn_provider.num_txns(); info!( log_context, - "Executing block, transaction count: {}", - txn_provider.num_txns() + "Executing block, transaction count: {}", num_txns ); - let count = txn_provider.num_txns(); - let ret = AptosVMBlockExecutorWrapper::execute_block::< + let result = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, DefaultTxnProvider, @@ -2816,23 +2812,42 @@ impl VMBlockExecutor for AptosVMBlockExecutor { txn_provider, state_view, &self.module_cache_manager, - BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level: AptosVM::get_concurrency_level(), - allow_fallback: true, - discard_failed_blocks: AptosVM::get_discard_failed_blocks(), - module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), - }, - onchain: onchain_config, - }, + config, transaction_slice_metadata, None, ); - if ret.is_ok() { + if result.is_ok() { // Record the histogram count for transactions per block. - BLOCK_TRANSACTION_COUNT.observe(count as f64); + BLOCK_TRANSACTION_COUNT.observe(num_txns as f64); } - ret + result + } +} + +impl VMBlockExecutor for AptosVMBlockExecutor { + fn new() -> Self { + Self { + module_cache_manager: AptosModuleCacheManager::new(), + } + } + + fn execute_block( + &self, + txn_provider: &DefaultTxnProvider, + state_view: &(impl StateView + Sync), + onchain_config: BlockExecutorConfigFromOnchain, + transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + let config = BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level: AptosVM::get_concurrency_level(), + allow_fallback: true, + discard_failed_blocks: AptosVM::get_discard_failed_blocks(), + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + onchain: onchain_config, + }; + self.execute_block_with_config(txn_provider, state_view, config, transaction_slice_metadata) } fn execute_block_sharded>( diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index d5c55eeb957c4..168b1bb4a3719 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -481,7 +481,7 @@ impl< } /// Uses shared thread pool to execute blocks. - pub fn execute_block< + pub(crate) fn execute_block< S: StateView + Sync, L: TransactionCommitHook, TP: TxnProvider + Sync, diff --git a/aptos-move/replay-benchmark/Cargo.toml b/aptos-move/replay-benchmark/Cargo.toml new file mode 100644 index 0000000000000..e531876d494de --- /dev/null +++ b/aptos-move/replay-benchmark/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "aptos-replay-benchmark" +version = "0.1.0" +description = "A tool to replay and locally benchmark on-chain transactions." + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +aptos-block-executor = { workspace = true } +aptos-logger = { workspace = true } +aptos-move-debugger = { workspace = true } +aptos-push-metrics = { workspace = true } +aptos-rest-client = { workspace = true } +aptos-types = { workspace = true } +aptos-vm = { workspace = true } +bcs = { workspace = true } +claims = { workspace = true } +clap = { workspace = true } +parking_lot = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } +url = { workspace = true } diff --git a/aptos-move/replay-benchmark/README.md b/aptos-move/replay-benchmark/README.md new file mode 100644 index 0000000000000..15563eb4212fb --- /dev/null +++ b/aptos-move/replay-benchmark/README.md @@ -0,0 +1,84 @@ +## A tool to replay and benchmark past Aptos transactions + + +### Benchmarking and measurements + +This tool allows to benchmark an ordered sequence of past transactions, specifying the first (`--begin-version B`) and the last (`--end-version E`) versions. +Transactions are split into blocks, to mimic on-chain behaviour, and blocks are executed one-by-one using an executor. +During the execution, the time is measured. +Each block runs based on the pre-computed state, so there is no "commit" of block execution outputs. +Similarly, signature verification is also left out. +Hence, the benchmark reports the runtime only. + +The tool supports two ways to measure the time: + + 1. Measuring total execution time for all transactions (default). + 2. Measuring execution time for each of the executed blocks, and reporting all. + To enable this, use `--measure-block-times` flag. + +In both cases, the measurement is repeated at least 3 times (this can be configured by specifying the number of repeats, `N`, using `--num-repeats N`), and the minimum, maximum, average and median times are reported (in microseconds). + +When benchmarking, a list of concurrency levels (`--concurrency-levels L1 L2 ...`) has to be provided. +Concurrency level specifies the number of threads Block-STM will use to execute a block of transactions. +Typically, you want to have the concurrency level to match the number of cores. +If multiple concurrency levels are provided, the benchmark is run for all, reporting the measurements. +This way it is possible to see how concurrency affects the runtime. + +Finally, in order to differentiate between cold and warm states, there is an option to skip measurement for the first few blocks. +By specifying `--num-block-to-skip N`, the tool will not ignore measurements when reporting for the first `N` blocks. + +### State overriding + +The benchmark runs every block on top of the corresponding on-chain state. +However, it is possible to override the state. +Currently, the only supported overrides are feature flags: + + 1. Feature flags can be forcefully enabled (`--enable-features F1 F2 ...`). + 2. Feature flags can be forcefully disabled (`--disable-features F1 F2 ...`). + +Feature flags should be spelled in capital letters, e.g., `ENABLE_LOADER_V2`. +For the full list of available features, see [here](../../types/src/on_chain_config/aptos_features.rs). + +Overriding the feature flags allows to see how having some feature on or off affects the runtime. +For example, if there is a new feature that improves the performance of MoveVM, with overrides it is possible to evaluate it on past transactions. + +### Comparison to on-chain behavior + +Overriding the state can change the execution behavior. +Hence, if any overrides are provided, the tool compares the on-chain outputs to new outputs obtained when execution on top of a modified state. +The diff of comparison is logged, and the users of the tool can evaluate if the differences are significant or not. +If the differences are not significant (e.g., only the gas usage has changed), the execution behaviour still stays the same. +Hence, the time measurements are still representative of the on-chain behavior. + +### HTTP request rate limit quotas + +Transactions are fetched from the fullnode via REST API. +Users should provide fullnode's REST API query endpoint using `--rest-endpoint E` flag. +For example, to fetch mainnet transactions, specify `--rest-endpoint https://mainnet.aptoslabs.com/v1`. + +If too many transactions are fetched and executed (preparing the benchmark pre-executes the specified transactions and reads the state from the remote), it is possible to run into HTTP request rate limits. +To learn more about the API quotas, see https://developers.aptoslabs.com/docs/api-access/quotas. + +It is possible to increase your quota by creating an API key in Aptos Build. +For that, follow instructions here: https://developers.aptoslabs.com/docs/api-access/api-keys. +Then, when using the tool the key can be specified using `--api-key K` flag. + +### Examples + +An end-to-end example for using the tool: + +```commandline +aptos-replay-benchmark --begin-version 1944524532 \ + --end-version 1944524714 \ + --rest-endpoint https://mainnet.aptoslabs.com/v1 \ + --concurrency-levels 2 4 \ + --num-repeats 10 \ + --num-blocks-to-skip 1 \ + --enable-features ENABLE_LOADER_V2 +``` + +Here, mainnet transactions from versions 1944524532 to 1944524714 are benchmarked. +There are two measurements: when Block-STM uses 2 threads, or 4 threads per block. +Each measurement is repeated 10 times, and the overall execution time is reported for each level. +Note that the reported time excludes the first block. +Additionally, `ENABLE_LOADER_V2` feature flag is forcefully enabled to see how it impacts the runtime for past transactions. diff --git a/aptos-move/replay-benchmark/src/block.rs b/aptos-move/replay-benchmark/src/block.rs new file mode 100644 index 0000000000000..f545a2b71116b --- /dev/null +++ b/aptos-move/replay-benchmark/src/block.rs @@ -0,0 +1,154 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + diff::TransactionDiff, + state_view::{ReadSet, ReadSetCapturingStateView}, + workload::Workload, +}; +use aptos_logger::error; +use aptos_types::{ + block_executor::config::{ + BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, + }, + state_store::{state_key::StateKey, state_value::StateValue, StateView}, + transaction::{TransactionOutput, Version}, +}; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use std::collections::HashMap; + +/// Block execution config used for replay benchmarking. +fn block_execution_config(concurrency_level: usize) -> BlockExecutorConfig { + BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level, + allow_fallback: true, + discard_failed_blocks: false, + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + // For replay, there is no block limit. + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + } +} + +/// Represents a single benchmarking unit: a block of transactions with the input pre-block state. +/// Also stores the comparison of outputs based on the input state to on-chain outputs (recall that +/// input state may contain an override and differ from on-chain pre-block state). +pub struct Block { + /// Stores all data needed to execute this block. + inputs: ReadSet, + /// Stores transactions to execute and benchmark. + workload: Workload, + /// Stores diff results for each transaction output. The number of diffs is always equal to the + /// number of transactions, but they may or may not be empty. + diffs: Vec, +} + +impl Block { + /// Creates a new block for benchmarking by executing transactions on top of an overridden + /// state. If there are any state overrides, transactions are first executed based on the + /// on-chain state for later comparison (otherwise, if there are no overrides diffs are empty). + /// + /// Note: transaction execution is sequential, so that multiple blocks can be constructed in + /// parallel. + pub(crate) fn new( + workload: Workload, + state_view: &(impl StateView + Sync), + state_override: HashMap, + ) -> Self { + // Execute transactions without overrides. + let state_view_without_override = + ReadSetCapturingStateView::new(state_view, HashMap::new()); + let onchain_outputs = execute_workload( + &AptosVMBlockExecutor::new(), + &workload, + &state_view_without_override, + 1, + ); + let _onchain_inputs = state_view_without_override.into_read_set(); + + // Check on-chain outputs do not modify the state we override. If so, benchmarking results + // may not be correct. + let begin = workload + .transaction_slice_metadata() + .begin_version() + .expect("Transaction metadata must be a chunk"); + for (idx, on_chain_output) in onchain_outputs.iter().enumerate() { + for (state_key, _) in on_chain_output.write_set() { + if state_override.contains_key(state_key) { + error!( + "Transaction {} writes to overridden state value for {:?}", + begin + idx as Version, + state_key + ); + } + } + } + + // Execute transactions with an override. + let state_view_with_override = ReadSetCapturingStateView::new(state_view, state_override); + let outputs = execute_workload( + &AptosVMBlockExecutor::new(), + &workload, + &state_view_with_override, + 1, + ); + let inputs = state_view_with_override.into_read_set(); + + // Compute the differences between outputs. + // TODO: We can also compute the differences between the read sets. Maybe we should add it? + let diffs = onchain_outputs + .into_iter() + .zip(outputs) + .map(|(onchain_output, output)| TransactionDiff::from_outputs(onchain_output, output)) + .collect(); + + Self { + inputs, + workload, + diffs, + } + } + + /// Prints the difference in transaction outputs when running with overrides. + pub fn print_diffs(&self) { + let begin = self + .workload + .transaction_slice_metadata() + .begin_version() + .expect("Transaction metadata is a chunk"); + for (idx, diff) in self.diffs.iter().enumerate() { + if !diff.is_empty() { + println!("Transaction {} diff:\n {}\n", begin + idx as Version, diff); + } + } + } + + /// Executes the workload for benchmarking. + pub(crate) fn run(&self, executor: &AptosVMBlockExecutor, concurrency_level: usize) { + execute_workload(executor, &self.workload, &self.inputs, concurrency_level); + } +} + +fn execute_workload( + executor: &AptosVMBlockExecutor, + workload: &Workload, + state_view: &(impl StateView + Sync), + concurrency_level: usize, +) -> Vec { + executor + .execute_block_with_config( + workload.txn_provider(), + state_view, + block_execution_config(concurrency_level), + workload.transaction_slice_metadata(), + ) + .unwrap_or_else(|err| { + panic!( + "Block execution should not fail, but returned an error: {:?}", + err + ) + }) + .into_transaction_outputs_forced() +} diff --git a/aptos-move/replay-benchmark/src/diff.rs b/aptos-move/replay-benchmark/src/diff.rs new file mode 100644 index 0000000000000..ba10c8c1c9701 --- /dev/null +++ b/aptos-move/replay-benchmark/src/diff.rs @@ -0,0 +1,376 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::{ + contract_event::ContractEvent, + state_store::state_key::StateKey, + transaction::{ExecutionStatus, TransactionOutput}, + write_set::{WriteOp, WriteSet}, +}; +use claims::assert_ok; +use std::collections::BTreeMap; + +/// Different parts of [TransactionOutput] that can be different: +/// 1. gas used, +/// 2. status (must be kept since transactions are replayed), +/// 3. events, +/// 4. writes. +/// Note that fine-grained comparison allows for some differences to be okay, e.g., using more gas +/// implies that the fee statement event, the account balance of the fee payer, and the total token +/// supply are different. +#[derive(Eq, PartialEq)] +enum Diff { + GasUsed { + left: u64, + right: u64, + }, + ExecutionStatus { + left: ExecutionStatus, + right: ExecutionStatus, + }, + Event { + left: Option, + right: Option, + }, + WriteSet { + state_key: StateKey, + left: Option, + right: Option, + }, +} + +/// Holds all differences for a pair of transaction outputs. +pub(crate) struct TransactionDiff { + diffs: Vec, +} + +impl TransactionDiff { + /// Given a pair of transaction outputs, computes its [TransactionDiff] that includes the gas + /// used, execution status, events and write sets. + // TODO: Make comparison configurable, so we can skip gas differences, etc. + pub(crate) fn from_outputs(left: TransactionOutput, right: TransactionOutput) -> Self { + let (left_write_set, left_events, left_gas_used, left_transaction_status, _) = + left.unpack(); + let (right_write_set, right_events, right_gas_used, right_transaction_status, _) = + right.unpack(); + + let mut diffs = vec![]; + + // All statuses must be kept, since we are replaying transactions. + let left_execution_status = assert_ok!(left_transaction_status.as_kept_status()); + let right_execution_status = assert_ok!(right_transaction_status.as_kept_status()); + if left_execution_status != right_execution_status { + diffs.push(Diff::ExecutionStatus { + left: left_execution_status, + right: right_execution_status, + }); + } + + if left_gas_used != right_gas_used { + diffs.push(Diff::GasUsed { + left: left_gas_used, + right: right_gas_used, + }); + } + + Self::diff_events(&mut diffs, left_events, right_events); + Self::diff_write_sets(&mut diffs, left_write_set, right_write_set); + + Self { diffs } + } + + /// Returns true if the diff is empty, and transaction outputs match. + pub(crate) fn is_empty(&self) -> bool { + self.diffs.is_empty() + } + + /// Computes the differences between a pair of event vectors, and adds them to the diff. + fn diff_events(diffs: &mut Vec, left: Vec, right: Vec) { + let event_vec_to_map = |events: Vec| { + events + .into_iter() + .map(|event| (event.type_tag().clone(), event)) + .collect::>() + }; + + let left = event_vec_to_map(left); + let mut right = event_vec_to_map(right); + + for (left_ty_tag, left_event) in left { + let maybe_right_event = right.remove(&left_ty_tag); + if maybe_right_event + .as_ref() + .is_some_and(|right_event| left_event.event_data() == right_event.event_data()) + { + continue; + } + + diffs.push(Diff::Event { + left: Some(left_event), + right: maybe_right_event, + }); + } + + for right_event in right.into_values() { + diffs.push(Diff::Event { + left: None, + right: Some(right_event), + }); + } + } + + /// Computes the differences between a pair of write sets, and adds them to the diff. + fn diff_write_sets(diffs: &mut Vec, left: WriteSet, right: WriteSet) { + let left = left.into_mut().into_inner(); + let mut right = right.into_mut().into_inner(); + + for (left_state_key, left_write_op) in left { + let maybe_right_write_op = right.remove(&left_state_key); + if maybe_right_write_op + .as_ref() + .is_some_and(|right_write_op| right_write_op == &left_write_op) + { + continue; + } + + diffs.push(Diff::WriteSet { + state_key: left_state_key, + left: Some(left_write_op), + right: maybe_right_write_op, + }); + } + + for (right_state_key, right_write_op) in right { + diffs.push(Diff::WriteSet { + state_key: right_state_key, + left: None, + right: Some(right_write_op), + }); + } + } +} + +impl std::fmt::Display for TransactionDiff { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, " >>>>> ")?; + for diff in &self.diffs { + match diff { + Diff::GasUsed { left, right } => { + writeln!(f, "[gas used] before: {}, after: {}", left, right)?; + }, + Diff::ExecutionStatus { left, right } => { + writeln!( + f, + "[execution status] before: {:?}, after: {:?}", + left, right + )?; + }, + Diff::Event { left, right } => { + let left = left.as_ref(); + let right = right.as_ref(); + + if left.is_none() { + writeln!( + f, + "[event] {} was not emitted before", + right.unwrap().type_tag().to_canonical_string() + )?; + } else if right.is_none() { + writeln!( + f, + "[event] {} is not emitted anymore", + left.unwrap().type_tag().to_canonical_string() + )?; + } else { + writeln!( + f, + "[event] {} has changed its data", + left.unwrap().type_tag().to_canonical_string() + )?; + } + }, + Diff::WriteSet { + state_key, + left, + right, + } => { + let left = left.as_ref(); + let right = right.as_ref(); + + if left.is_none() { + writeln!(f, "[write] {:?} was not written to before", state_key)?; + } else if right.is_none() { + writeln!(f, "[write] {:?} is not written to anymore", state_key)?; + } else { + writeln!(f, "[write] {:?} has changed its value", state_key)?; + } + }, + } + } + writeln!(f, " <<<<< ") + } +} + +#[cfg(test)] +mod test { + use super::*; + use aptos_types::{ + on_chain_config::CurrentTimeMicroseconds, state_store::state_value::StateValueMetadata, + write_set::WriteSetMut, + }; + + #[test] + fn test_diff_events() { + let mut diffs = vec![]; + + let events_1 = vec![ + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventA", vec![0, 1, 2]), + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventB", vec![0, 1, 2]), + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventD", vec![0, 1, 2]), + ]; + + let events_2 = vec![ + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventA", vec![0, 1, 2]), + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventC", vec![0, 1, 2]), + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventD", vec![0, 1, 3]), + ]; + + let expected_diffs = vec![ + Diff::Event { + left: Some(ContractEvent::new_v2_with_type_tag_str( + "0x1::event::EventB", + vec![0, 1, 2], + )), + right: None, + }, + Diff::Event { + left: None, + right: Some(ContractEvent::new_v2_with_type_tag_str( + "0x1::event::EventC", + vec![0, 1, 2], + )), + }, + Diff::Event { + left: Some(ContractEvent::new_v2_with_type_tag_str( + "0x1::event::EventD", + vec![0, 1, 2], + )), + right: Some(ContractEvent::new_v2_with_type_tag_str( + "0x1::event::EventD", + vec![0, 1, 3], + )), + }, + ]; + + TransactionDiff::diff_events(&mut diffs, events_1, events_2); + + assert_eq!(diffs.len(), 3); + assert!(diffs.iter().all(|diff| expected_diffs.contains(diff))); + } + + #[test] + fn test_diff_write_sets() { + let mut diffs = vec![]; + + let write_set_1 = WriteSetMut::new(vec![ + // Same in 2nd write set. + ( + StateKey::raw(b"key-1"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + // Does not exist in 2nd write set. + ( + StateKey::raw(b"key-2"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + // Different from 2nd write-set. + ( + StateKey::raw(b"key-4"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + ( + StateKey::raw(b"key-5"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + ( + StateKey::raw(b"key-6"), + WriteOp::creation( + vec![0, 1, 2].into(), + StateValueMetadata::new(1, 2, &CurrentTimeMicroseconds { microseconds: 100 }), + ), + ), + ]) + .freeze() + .unwrap(); + + let write_set_2 = WriteSetMut::new(vec![ + // Same in 1st write set. + ( + StateKey::raw(b"key-1"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + // Does nto exist in 1st write set. + ( + StateKey::raw(b"key-3"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + // Different from 1st write-set. + ( + StateKey::raw(b"key-4"), + WriteOp::legacy_creation(vec![0, 1, 3].into()), + ), + ( + StateKey::raw(b"key-5"), + WriteOp::legacy_modification(vec![0, 1, 2].into()), + ), + ( + StateKey::raw(b"key-6"), + WriteOp::creation( + vec![0, 1, 2].into(), + StateValueMetadata::new(1, 2, &CurrentTimeMicroseconds { microseconds: 200 }), + ), + ), + ]) + .freeze() + .unwrap(); + + let expected_diffs = vec![ + Diff::WriteSet { + state_key: StateKey::raw(b"key-2"), + left: Some(WriteOp::legacy_creation(vec![0, 1, 2].into())), + right: None, + }, + Diff::WriteSet { + state_key: StateKey::raw(b"key-3"), + left: None, + right: Some(WriteOp::legacy_creation(vec![0, 1, 2].into())), + }, + Diff::WriteSet { + state_key: StateKey::raw(b"key-4"), + left: Some(WriteOp::legacy_creation(vec![0, 1, 2].into())), + right: Some(WriteOp::legacy_creation(vec![0, 1, 3].into())), + }, + Diff::WriteSet { + state_key: StateKey::raw(b"key-5"), + left: Some(WriteOp::legacy_creation(vec![0, 1, 2].into())), + right: Some(WriteOp::legacy_modification(vec![0, 1, 2].into())), + }, + Diff::WriteSet { + state_key: StateKey::raw(b"key-6"), + left: Some(WriteOp::creation( + vec![0, 1, 2].into(), + StateValueMetadata::new(1, 2, &CurrentTimeMicroseconds { microseconds: 100 }), + )), + right: Some(WriteOp::creation( + vec![0, 1, 2].into(), + StateValueMetadata::new(1, 2, &CurrentTimeMicroseconds { microseconds: 200 }), + )), + }, + ]; + + TransactionDiff::diff_write_sets(&mut diffs, write_set_1, write_set_2); + + assert_eq!(diffs.len(), 5); + assert!(diffs.iter().all(|diff| expected_diffs.contains(diff))); + } +} diff --git a/aptos-move/replay-benchmark/src/generator.rs b/aptos-move/replay-benchmark/src/generator.rs new file mode 100644 index 0000000000000..089d6a36acbfb --- /dev/null +++ b/aptos-move/replay-benchmark/src/generator.rs @@ -0,0 +1,105 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{block::Block, overrides::OverrideConfig, workload::Workload}; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_types::transaction::{Transaction, Version}; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Instant, +}; + +pub struct BenchmarkGenerator { + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, +} + +impl BenchmarkGenerator { + /// Generates a sequence of [Block] for benchmarking. + pub async fn generate_blocks( + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, + ) -> anyhow::Result> { + let generator = Arc::new(Self { + debugger, + begin_version, + end_version, + override_config, + }); + + let limit = generator.end_version - generator.begin_version + 1; + let (txns, _) = generator + .debugger + .get_committed_transactions(generator.begin_version, limit) + .await?; + let txn_blocks = generator.partition(txns); + + let num_generated = Arc::new(AtomicU64::new(0)); + let num_blocks = txn_blocks.len(); + + let mut tasks = Vec::with_capacity(num_blocks); + for (begin, txn_block) in txn_blocks { + let task = tokio::task::spawn_blocking({ + let generator = generator.clone(); + let num_generated = num_generated.clone(); + move || { + let start_time = Instant::now(); + let block = generator.generate_block(begin, txn_block); + let time = start_time.elapsed().as_secs(); + println!( + "Generated block {}/{} in {}s", + num_generated.fetch_add(1, Ordering::SeqCst) + 1, + num_blocks, + time + ); + block + } + }); + tasks.push(task); + } + + let mut blocks = Vec::with_capacity(tasks.len()); + for task in tasks { + blocks.push(task.await?); + } + + Ok(blocks) + } + + /// Generates a single [Block] for benchmarking. + fn generate_block(&self, begin: Version, txns: Vec) -> Block { + let workload = Workload::new(begin, txns); + let state_view = self.debugger.state_view_at_version(begin); + let state_override = self.override_config.get_state_override(&state_view); + Block::new(workload, &state_view, state_override) + } + + /// Partitions a sequence of transactions into blocks. + fn partition(&self, txns: Vec) -> Vec<(Version, Vec)> { + let mut begin_versions_and_blocks = Vec::with_capacity(txns.len()); + + let mut curr_begin = self.begin_version; + let mut curr_block = Vec::with_capacity(txns.len()); + + for txn in txns { + if txn.is_block_start() && !curr_block.is_empty() { + let block_size = curr_block.len(); + begin_versions_and_blocks.push((curr_begin, std::mem::take(&mut curr_block))); + curr_begin += block_size as Version; + } + curr_block.push(txn); + } + if !curr_block.is_empty() { + begin_versions_and_blocks.push((curr_begin, curr_block)); + } + + begin_versions_and_blocks + } +} diff --git a/aptos-move/replay-benchmark/src/lib.rs b/aptos-move/replay-benchmark/src/lib.rs new file mode 100644 index 0000000000000..0792bc55e5f4f --- /dev/null +++ b/aptos-move/replay-benchmark/src/lib.rs @@ -0,0 +1,10 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod block; +mod diff; +pub mod generator; +pub mod overrides; +pub mod runner; +mod state_view; +mod workload; diff --git a/aptos-move/replay-benchmark/src/main.rs b/aptos-move/replay-benchmark/src/main.rs new file mode 100644 index 0000000000000..051e0edc0e950 --- /dev/null +++ b/aptos-move/replay-benchmark/src/main.rs @@ -0,0 +1,187 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_logger::{Level, Logger}; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_push_metrics::MetricsPusher; +use aptos_replay_benchmark::{ + generator::BenchmarkGenerator, overrides::OverrideConfig, runner::BenchmarkRunner, +}; +use aptos_rest_client::{AptosBaseUrl, Client}; +use aptos_types::{on_chain_config::FeatureFlag, transaction::Version}; +use clap::Parser; +use url::Url; + +/// Minimum number of times to execute blocks of transactions and measure the time taken. +const MIN_NUM_REPEATS: usize = 3; + +#[derive(Parser)] +#[command(about)] +pub struct Command { + #[clap(long, default_value_t = Level::Error)] + log_level: Level, + + #[clap( + long, + help = "Fullnode's REST API query endpoint, e.g., https://mainnet.aptoslabs.com/v1 for \ + mainnet." + )] + rest_endpoint: String, + + #[clap( + long, + help = "Optional API key to increase HTTP request rate limit quota." + )] + api_key: Option, + + #[clap(long, help = "First transaction to include for benchmarking.")] + begin_version: Version, + + #[clap(long, help = "Last transaction to include for benchmarking.")] + end_version: Version, + + #[clap( + long, + default_value_t = 0, + help = "Number of blocks to skip for time measurement. Allows to warm-up caches." + )] + num_blocks_to_skip: usize, + + #[clap( + long, + num_args = 1.., + help = "List of space-separated concurrency levels that define how many threads Block-STM \ + is using to execute a block of transactions. For each level, the time taken to \ + execute blocks of transactions is measured and reported." + )] + concurrency_levels: Vec, + + #[clap( + long, + default_value_t = MIN_NUM_REPEATS, + help = "Number of times to execute blocks of transactions and measure the timr taken for \ + each concurrency level." + )] + num_repeats: usize, + + #[clap( + long, + help = "If true, measure time taken to execute each block separately. If false, measure \ + the overall time to execute all blocks." + )] + measure_block_times: bool, + + #[clap( + long, + num_args = 1.., + value_delimiter = ' ', + help = "List of space-separated feature flags to enable, in capital letters. For example, \ + GAS_PAYER_ENABLED or EMIT_FEE_STATEMENT. For the full list of feature flags, see \ + aptos-core/types/src/on_chain_config/aptos_features.rs." + )] + enable_features: Vec, + + #[clap( + long, + num_args = 1.., + value_delimiter = ' ', + help = "List of space-separated feature flags to disable, in capital letters. For \ + example, GAS_PAYER_ENABLED or EMIT_FEE_STATEMENT. For the full list of feature \ + flags, see aptos-core/types/src/on_chain_config/aptos_features.rs." + )] + disable_features: Vec, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let command = Command::parse(); + + let mut logger = Logger::new(); + logger.level(command.log_level); + logger.init(); + + let _mp = MetricsPusher::start(vec![]); + + // Sanity checks for provided commands. + assert!( + command.begin_version <= command.end_version, + "Transaction versions should be a valid closed interval. Instead got begin: {}, end: {}", + command.begin_version, + command.end_version, + ); + assert!( + !command.concurrency_levels.is_empty(), + "At least one concurrency level must be provided", + ); + assert!( + command.num_repeats >= MIN_NUM_REPEATS, + "Number of repeats must be at least {}", + MIN_NUM_REPEATS, + ); + assert!( + command + .enable_features + .iter() + .all(|f| !command.disable_features.contains(f)), + "Enable and disable feature flags cannot overlap", + ); + + // TODO: + // Right now we fetch transactions from debugger, but ideally we need a way to save them + // locally (with corresponding read-sets) so we can use this for CI. + let builder = Client::builder(AptosBaseUrl::Custom(Url::parse(&command.rest_endpoint)?)); + let client = if let Some(api_key) = command.api_key { + builder.api_key(&api_key)?.build() + } else { + builder.build() + }; + let debugger = AptosDebugger::rest_client(client)?; + + // TODO: + // Right now, only features can be overridden. In general, this can be allowed for anything: + // 1. Framework code, e.g., to test performance of new natives or compiler, + // 2. Gas schedule, to track the costs of charging gas or tracking limits. + // We probably should support at least these. + let override_config = OverrideConfig::new(command.enable_features, command.disable_features); + + let blocks = BenchmarkGenerator::generate_blocks( + debugger, + command.begin_version, + command.end_version, + override_config, + ) + .await?; + + // Ensure we have at least one block to benchmark. + assert!( + command.num_blocks_to_skip < blocks.len(), + "There are only {} blocks, but skipping {}", + blocks.len(), + command.num_blocks_to_skip + ); + + for block in &blocks { + block.print_diffs(); + } + + BenchmarkRunner::new( + command.concurrency_levels, + command.num_repeats, + command.measure_block_times, + command.num_blocks_to_skip, + ) + .measure_execution_time(&blocks); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn verify_tool() { + use clap::CommandFactory; + Command::command().debug_assert(); + } +} diff --git a/aptos-move/replay-benchmark/src/overrides.rs b/aptos-move/replay-benchmark/src/overrides.rs new file mode 100644 index 0000000000000..8e0270bbb898b --- /dev/null +++ b/aptos-move/replay-benchmark/src/overrides.rs @@ -0,0 +1,90 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Defines different overrides for on-chain state used for benchmarking. With overrides, past +//! transactions can be replayed on top of a modified state, and we can evaluate how it impacts +//! performance or other things. + +use aptos_logger::error; +use aptos_types::{ + on_chain_config::{FeatureFlag, Features, OnChainConfig}, + state_store::{state_key::StateKey, state_value::StateValue, StateView}, +}; +use serde::Serialize; +use std::collections::HashMap; + +/// Stores feature flags to enable/disable, essentially overriding on-chain state. +pub struct OverrideConfig { + additional_enabled_features: Vec, + additional_disabled_features: Vec, +} + +impl OverrideConfig { + pub fn new( + additional_enabled_features: Vec, + additional_disabled_features: Vec, + ) -> Self { + Self { + additional_enabled_features, + additional_disabled_features, + } + } + + pub(crate) fn get_state_override( + &self, + state_view: &impl StateView, + ) -> HashMap { + let mut state_override = HashMap::new(); + + // Enable/disable features. + let (features_state_key, features_state_value) = + config_override::(state_view, |features| { + for feature in &self.additional_enabled_features { + if features.is_enabled(*feature) { + error!("Feature {:?} is already enabled", feature); + } + features.enable(*feature); + } + for feature in &self.additional_disabled_features { + if !features.is_enabled(*feature) { + error!("Feature {:?} is already disabled", feature); + } + features.disable(*feature); + } + }); + state_override.insert(features_state_key, features_state_value); + state_override + } +} + +/// Returns the state key for on-chain config type. +fn config_state_key() -> StateKey { + StateKey::resource(T::address(), &T::struct_tag()) + .expect("Constructing state key for on-chain config must succeed") +} + +/// Fetches the config from the storage, and modifies it based on the passed function. Panics if +/// there is a storage error, config does not exist or fails to (de-)serialize. +fn config_override( + state_view: &impl StateView, + override_func: F, +) -> (StateKey, StateValue) { + let state_key = config_state_key::(); + let state_value = state_view + .get_state_value(&state_key) + .unwrap_or_else(|err| { + panic!( + "Failed to fetch on-chain config for {:?}: {:?}", + state_key, err + ) + }) + .unwrap_or_else(|| panic!("On-chain config for {:?} must always exist", state_key)); + + let mut config = T::deserialize_into_config(state_value.bytes()) + .expect("On-chain config must be deserializable"); + override_func(&mut config); + let config_bytes = bcs::to_bytes(&config).expect("On-chain config must be serializable"); + + let new_state_value = state_value.map_bytes(|_| Ok(config_bytes.into())).unwrap(); + (state_key, new_state_value) +} diff --git a/aptos-move/replay-benchmark/src/runner.rs b/aptos-move/replay-benchmark/src/runner.rs new file mode 100644 index 0000000000000..ed546ec40ff7b --- /dev/null +++ b/aptos-move/replay-benchmark/src/runner.rs @@ -0,0 +1,127 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::block::Block; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use std::time::Instant; + +/// Holds configuration for running the benchmarks and measuring the time taken. +pub struct BenchmarkRunner { + concurrency_levels: Vec, + num_repeats: usize, + measure_per_block_instead_of_overall_time: bool, + num_blocks_to_skip: usize, +} + +impl BenchmarkRunner { + pub fn new( + concurrency_levels: Vec, + num_repeats: usize, + measure_per_block_instead_of_overall_time: bool, + num_blocks_to_skip: usize, + ) -> Self { + Self { + concurrency_levels, + num_repeats, + measure_per_block_instead_of_overall_time, + num_blocks_to_skip, + } + } + + // TODO: + // This measures execution time from a cold-start. Ideally, we want to warm-up with executing + // 1-2 blocks prior to selected range, but not timing them. + pub fn measure_execution_time(&self, blocks: &[Block]) { + for concurrency_level in &self.concurrency_levels { + if self.measure_per_block_instead_of_overall_time { + self.measure_block_execution_times(blocks, *concurrency_level); + } else { + self.measure_overall_execution_time(blocks, *concurrency_level); + } + } + } + + /// Runs a sequence of blocks, measuring execution time for each block. The median is reported. + fn measure_block_execution_times(&self, blocks: &[Block], concurrency_level: usize) { + let mut times = (0..blocks.len()) + .map(|_| Vec::with_capacity(self.num_repeats)) + .collect::>(); + + for i in 0..self.num_repeats { + let executor = AptosVMBlockExecutor::new(); + for (idx, block) in blocks.iter().enumerate() { + let start_time = Instant::now(); + block.run(&executor, concurrency_level); + let time = start_time.elapsed().as_micros(); + if idx >= self.num_blocks_to_skip { + println!( + "[{}/{}] Block {} execution time is {}us", + i + 1, + self.num_repeats, + idx + 1, + time, + ); + } + times[idx].push(time); + } + } + + for (idx, mut time) in times.into_iter().enumerate() { + if idx >= self.num_blocks_to_skip { + time.sort(); + let min_time = *time.first().unwrap(); + let average_time = time.iter().sum::() as f64 / self.num_repeats as f64; + let median_time = time[self.num_repeats / 2]; + let max_time = *time.last().unwrap(); + + println!( + "Block {} execution time: min {}us, average {:.2}us, median {}us, max {}us\n", + idx + 1, + min_time, + average_time, + median_time, + max_time, + ); + } + } + } + + /// Runs the sequence of blocks, measuring end-to-end execution time. + fn measure_overall_execution_time(&self, blocks: &[Block], concurrency_level: usize) { + let mut times = Vec::with_capacity(self.num_repeats); + for i in 0..self.num_repeats { + let executor = AptosVMBlockExecutor::new(); + + // Warm-up. + for block in &blocks[..self.num_blocks_to_skip] { + block.run(&executor, concurrency_level); + } + + // Actual measurement. + let start_time = Instant::now(); + for block in &blocks[self.num_blocks_to_skip..] { + block.run(&executor, concurrency_level); + } + let time = start_time.elapsed().as_micros(); + + println!( + "[{}/{}] Overall execution time is {}us", + i + 1, + self.num_repeats, + time, + ); + times.push(time); + } + + times.sort(); + let min_time = *times.first().unwrap(); + let average_time = times.iter().sum::() as f64 / self.num_repeats as f64; + let median_time = times[self.num_repeats / 2]; + let max_time = *times.last().unwrap(); + + println!( + "Overall execution time (blocks {}-{}): min {}us, average {:.2}us, median {}us, max {}us\n", + self.num_blocks_to_skip + 1, blocks.len(), min_time, average_time, median_time, max_time, + ); + } +} diff --git a/aptos-move/replay-benchmark/src/state_view.rs b/aptos-move/replay-benchmark/src/state_view.rs new file mode 100644 index 0000000000000..0de19abc37dfd --- /dev/null +++ b/aptos-move/replay-benchmark/src/state_view.rs @@ -0,0 +1,82 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::state_store::{ + state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue, + StateView, StateViewResult, TStateView, +}; +use parking_lot::Mutex; +use std::collections::HashMap; + +/// Represents the read-set obtained when executing transactions. +pub(crate) struct ReadSet { + data: HashMap, +} + +impl TStateView for ReadSet { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> StateViewResult> { + Ok(self.data.get(state_key).cloned()) + } + + fn get_usage(&self) -> StateViewResult { + unreachable!("Should not be called when benchmarking") + } +} + +/// [StateView] implementation that records all execution reads. Captured reads can be converted +/// into a [ReadSet]. +pub(crate) struct ReadSetCapturingStateView<'s, S> { + captured_reads: Mutex>, + state_view: &'s S, +} + +impl<'s, S: StateView> ReadSetCapturingStateView<'s, S> { + pub(crate) fn new(state_view: &'s S, initial_read_set: HashMap) -> Self { + Self { + captured_reads: Mutex::new(initial_read_set), + state_view, + } + } + + pub(crate) fn into_read_set(self) -> ReadSet { + ReadSet { + data: self.captured_reads.into_inner(), + } + } +} + +impl<'s, S: StateView> TStateView for ReadSetCapturingStateView<'s, S> { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> StateViewResult> { + // Check the read-set first. + if let Some(state_value) = self.captured_reads.lock().get(state_key) { + return Ok(Some(state_value.clone())); + } + + // We do not allow failures because then benchmarking will not be correct (we miss a read). + // Plus, these failures should not happen when replaying past transactions. + let maybe_state_value = self + .state_view + .get_state_value(state_key) + .unwrap_or_else(|err| { + panic!("Failed to fetch state value for {:?}: {:?}", state_key, err) + }); + + // Capture the read on first access. + if let Some(state_value) = &maybe_state_value { + let mut captured_reads = self.captured_reads.lock(); + if !captured_reads.contains_key(state_key) { + captured_reads.insert(state_key.clone(), state_value.clone()); + } + } + + Ok(maybe_state_value) + } + + fn get_usage(&self) -> StateViewResult { + unreachable!("Should not be called when benchmarking") + } +} diff --git a/aptos-move/replay-benchmark/src/workload.rs b/aptos-move/replay-benchmark/src/workload.rs new file mode 100644 index 0000000000000..ccc9ed1ea2f73 --- /dev/null +++ b/aptos-move/replay-benchmark/src/workload.rs @@ -0,0 +1,51 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_block_executor::txn_provider::default::DefaultTxnProvider; +use aptos_types::{ + block_executor::transaction_slice_metadata::TransactionSliceMetadata, + transaction::{ + signature_verified_transaction::{ + into_signature_verified_block, SignatureVerifiedTransaction, + }, + Transaction, Version, + }, +}; + +/// A workload to benchmark. Contains signature verified transactions, and metadata specifying the +/// start and end versions of these transactions. +pub(crate) struct Workload { + /// Stores a block of transactions for execution. Always has at least one transaction. + txn_provider: DefaultTxnProvider, + /// Stores metadata for the version range of a block. It is always set to + /// [TransactionSliceMetadata::Chunk]. + transaction_slice_metadata: TransactionSliceMetadata, +} + +impl Workload { + /// Returns a new workload to execute transactions at specified version. + pub(crate) fn new(begin: Version, txns: Vec) -> Self { + assert!(!txns.is_empty()); + + let end = begin + txns.len() as Version; + let transaction_slice_metadata = TransactionSliceMetadata::chunk(begin, end); + + let signature_verified_txns = into_signature_verified_block(txns); + let txn_provider = DefaultTxnProvider::new(signature_verified_txns); + + Workload { + txn_provider, + transaction_slice_metadata, + } + } + + /// Returns the signature verified transactions in the workload. + pub(crate) fn txn_provider(&self) -> &DefaultTxnProvider { + &self.txn_provider + } + + /// Returns transaction metadata corresponding to [begin, end) versions of the workload. + pub(crate) fn transaction_slice_metadata(&self) -> TransactionSliceMetadata { + self.transaction_slice_metadata + } +} diff --git a/crates/aptos-logger/src/metadata.rs b/crates/aptos-logger/src/metadata.rs index aba5b7503ce61..6ef82530c0a06 100644 --- a/crates/aptos-logger/src/metadata.rs +++ b/crates/aptos-logger/src/metadata.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use serde::{Deserialize, Serialize}; -use std::{fmt, str::FromStr}; +use strum_macros::{Display, EnumString, FromRepr}; /// Associated metadata with every log to identify what kind of log and where it came from #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -60,68 +60,96 @@ impl Metadata { } } -static LOG_LEVEL_NAMES: &[&str] = &["ERROR", "WARN", "INFO", "DEBUG", "TRACE"]; - /// Logging levels, used for stratifying logs, and disabling less important ones for performance reasons #[repr(usize)] -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize, Deserialize)] +#[derive( + Copy, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Debug, + Hash, + Serialize, + Deserialize, + FromRepr, + EnumString, + Display, +)] #[serde(rename_all = "UPPERCASE")] pub enum Level { /// The "error" level. /// /// Designates very serious errors. + #[strum(ascii_case_insensitive)] + #[strum(to_string = "ERROR")] Error = 0, /// The "warn" level. /// /// Designates hazardous situations. - Warn, + #[strum(ascii_case_insensitive)] + #[strum(to_string = "WARN")] + Warn = 1, /// The "info" level. /// /// Designates useful information. - Info, + #[strum(ascii_case_insensitive)] + #[strum(to_string = "INFO")] + Info = 2, /// The "debug" level. /// /// Designates lower priority information. - Debug, + #[strum(ascii_case_insensitive)] + #[strum(to_string = "DEBUG")] + Debug = 3, /// The "trace" level. /// /// Designates very low priority, often extremely verbose, information. - Trace, + #[strum(ascii_case_insensitive)] + #[strum(to_string = "TRACE")] + Trace = 4, } -impl Level { - fn from_usize(idx: usize) -> Option { - let lvl = match idx { - 0 => Level::Error, - 1 => Level::Warn, - 2 => Level::Info, - 3 => Level::Debug, - 4 => Level::Trace, - _ => return None, - }; - - Some(lvl) +#[cfg(test)] +mod test { + use super::*; + use std::str::FromStr; + + #[test] + fn test_log_level_from_string() { + assert_eq!(Level::Error, Level::from_str("ERROR").unwrap()); + assert_eq!(Level::Error, Level::from_str("Error").unwrap()); + assert_eq!(Level::Error, Level::from_str("error").unwrap()); + + assert_eq!(Level::Warn, Level::from_str("WARN").unwrap()); + assert_eq!(Level::Warn, Level::from_str("wArN").unwrap()); + assert_eq!(Level::Warn, Level::from_str("warn").unwrap()); + + assert_eq!(Level::Info, Level::from_str("INFO").unwrap()); + assert_eq!(Level::Info, Level::from_str("infO").unwrap()); + assert_eq!(Level::Info, Level::from_str("info").unwrap()); + + assert_eq!(Level::Debug, Level::from_str("DEBUG").unwrap()); + assert_eq!(Level::Debug, Level::from_str("Debug").unwrap()); + assert_eq!(Level::Debug, Level::from_str("debug").unwrap()); + + assert_eq!(Level::Trace, Level::from_str("TRACE").unwrap()); + assert_eq!(Level::Trace, Level::from_str("trAce").unwrap()); + assert_eq!(Level::Trace, Level::from_str("trace").unwrap()); + + assert!(Level::from_str("ERR").is_err()); + assert!(Level::from_str("err_or").is_err()); + assert!(Level::from_str("INF").is_err()); + assert!(Level::from_str("tracey").is_err()); } -} - -/// An error given when no `Level` matches the inputted string -#[derive(Debug)] -pub struct LevelParseError; - -impl FromStr for Level { - type Err = LevelParseError; - - fn from_str(level: &str) -> Result { - LOG_LEVEL_NAMES - .iter() - .position(|name| name.eq_ignore_ascii_case(level)) - .map(|idx| Level::from_usize(idx).unwrap()) - .ok_or(LevelParseError) - } -} -impl fmt::Display for Level { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.pad(LOG_LEVEL_NAMES[*self as usize]) + #[test] + fn test_log_level_to_string() { + assert_eq!(String::from("ERROR"), Level::Error.to_string()); + assert_eq!(String::from("WARN"), Level::Warn.to_string()); + assert_eq!(String::from("INFO"), Level::Info.to_string()); + assert_eq!(String::from("DEBUG"), Level::Debug.to_string()); + assert_eq!(String::from("TRACE"), Level::Trace.to_string()); } } diff --git a/types/src/block_executor/transaction_slice_metadata.rs b/types/src/block_executor/transaction_slice_metadata.rs index 8df707552253f..381259b2da3f1 100644 --- a/types/src/block_executor/transaction_slice_metadata.rs +++ b/types/src/block_executor/transaction_slice_metadata.rs @@ -76,6 +76,15 @@ impl TransactionSliceMetadata { (Chunk { end, .. }, Chunk { begin, .. }) => begin == end, } } + + /// Returns the first transaction version for [TransactionSliceMetadata::Chunk], and [None] + /// otherwise. + pub fn begin_version(&self) -> Option { + if let TransactionSliceMetadata::Chunk { begin, .. } = self { + return Some(*begin); + } + None + } } #[cfg(test)] diff --git a/types/src/write_set.rs b/types/src/write_set.rs index e939b70d1346a..0da5b6b89dcd5 100644 --- a/types/src/write_set.rs +++ b/types/src/write_set.rs @@ -568,6 +568,10 @@ impl WriteSetMut { &mut self.write_set } + pub fn into_inner(self) -> BTreeMap { + self.write_set + } + pub fn squash(mut self, other: Self) -> Result { use btree_map::Entry::*;