diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index 3deb5ebfa71c4..59c88f4101416 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -603,6 +603,8 @@ fn get_test_suite( return Ok(test_suite); } else if let Some(test_suite) = get_dag_test(test_name, duration, test_cmd) { return Ok(test_suite); + } else if let Some(test_suite) = get_indexer_test(test_name) { + return Ok(test_suite); } // Otherwise, check the test name against the ungrouped test suites @@ -691,6 +693,15 @@ fn get_land_blocking_test( Some(test) } +/// Attempts to match the test name to an indexer test +fn get_indexer_test(test_name: &str) -> Option { + let test = match test_name { + "indexer_test" => indexer_test(), + _ => return None, // The test name does not match an indexer test + }; + Some(test) +} + /// Attempts to match the test name to a network benchmark test fn get_netbench_test(test_name: &str) -> Option { let test = match test_name { @@ -2367,6 +2378,99 @@ fn multiregion_benchmark_test() -> ForgeConfig { ) } +/// Workload sweep with multiple stressful workloads for indexer +fn indexer_test() -> ForgeConfig { + // Define all the workloads and their corresponding success criteria upfront + // The TransactionTypeArg is the workload per phase + // The structure of the success criteria is generally (min_tps, latencies...). See below for the exact definition. + let workloads_and_criteria = vec![ + ( + TransactionWorkload::new(TransactionTypeArg::CoinTransfer, 20000), + (7000, 0.5, 0.5, 0.5), + ), + ( + TransactionWorkload::new(TransactionTypeArg::NoOp, 20000).with_num_modules(100), + (8500, 0.5, 0.5, 0.5), + ), + ( + TransactionWorkload::new(TransactionTypeArg::ModifyGlobalResource, 6000) + .with_transactions_per_account(1), + (2000, 0.5, 0.5, 0.5), + ), + ( + TransactionWorkload::new(TransactionTypeArg::TokenV2AmbassadorMint, 20000) + .with_unique_senders(), + (3200, 0.5, 0.5, 0.5), + ), + ( + TransactionWorkload::new(TransactionTypeArg::PublishPackage, 200) + .with_transactions_per_account(1), + (28, 0.5, 0.5, 0.5), + ), + ( + TransactionWorkload::new(TransactionTypeArg::VectorPicture30k, 100), + (100, 1.0, 1.0, 1.0), + ), + ( + TransactionWorkload::new(TransactionTypeArg::SmartTablePicture30KWith200Change, 100), + (100, 1.0, 1.0, 1.0), + ), + ( + TransactionWorkload::new( + TransactionTypeArg::TokenV1NFTMintAndTransferSequential, + 1000, + ), + (500, 0.5, 0.5, 0.5), + ), + ( + TransactionWorkload::new(TransactionTypeArg::TokenV1FTMintAndTransfer, 1000), + (500, 0.5, 0.5, 0.5), + ), + ]; + let num_sweep = workloads_and_criteria.len(); + + let workloads = Workloads::TRANSACTIONS( + workloads_and_criteria + .iter() + .map(|(w, _)| w.clone()) + .collect(), + ); + let criteria = workloads_and_criteria + .iter() + .map(|(_, c)| { + let ( + min_tps, + indexer_fullnode_processed_batch, + indexer_cache_worker_processed_batch, + indexer_data_service_all_chunks_sent, + ) = c.to_owned(); + SuccessCriteria::new(min_tps).add_latency_breakdown_threshold( + LatencyBreakdownThreshold::new_strict(vec![ + ( + LatencyBreakdownSlice::IndexerFullnodeProcessedBatch, + indexer_fullnode_processed_batch, + ), + ( + LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch, + indexer_cache_worker_processed_batch, + ), + ( + LatencyBreakdownSlice::IndexerDataServiceAllChunksSent, + indexer_data_service_all_chunks_sent, + ), + ]), + ) + }) + .collect::>(); + + realistic_env_sweep_wrap(4, 4, LoadVsPerfBenchmark { + test: Box::new(PerformanceBenchmark), + workloads, + criteria, + background_traffic: background_traffic_for_sweep(num_sweep), + }) +} + /// This test runs a constant-TPS benchmark where the network includes /// PFNs, and the transactions are submitted to the PFNs. This is useful /// for measuring latencies when the system is not saturated. diff --git a/testsuite/forge/src/backend/k8s/mod.rs b/testsuite/forge/src/backend/k8s/mod.rs index 260887b9145b3..782d9d7aa8749 100644 --- a/testsuite/forge/src/backend/k8s/mod.rs +++ b/testsuite/forge/src/backend/k8s/mod.rs @@ -245,6 +245,7 @@ impl Factory for K8sFactory { self.keep, new_era, self.use_port_forward, + self.enable_indexer, ) .await .unwrap(); diff --git a/testsuite/forge/src/backend/k8s/swarm.rs b/testsuite/forge/src/backend/k8s/swarm.rs index 9c249dc1c6aea..9211bed8a1381 100644 --- a/testsuite/forge/src/backend/k8s/swarm.rs +++ b/testsuite/forge/src/backend/k8s/swarm.rs @@ -62,6 +62,7 @@ pub struct K8sSwarm { era: Option, use_port_forward: bool, chaos_experiment_ops: Box, + has_indexer: bool, } impl K8sSwarm { @@ -75,6 +76,7 @@ impl K8sSwarm { keep: bool, era: Option, use_port_forward: bool, + has_indexer: bool, ) -> Result { let kube_client = create_k8s_client().await?; @@ -123,6 +125,7 @@ impl K8sSwarm { kube_client: kube_client.clone(), kube_namespace: kube_namespace.to_string(), }), + has_indexer, }; // test hitting the configured prometheus endpoint @@ -446,6 +449,10 @@ impl Swarm for K8sSwarm { fn get_default_pfn_node_config(&self) -> NodeConfig { get_default_pfn_node_config() } + + fn has_indexer(&self) -> bool { + self.has_indexer + } } /// Amount of time to wait for genesis to complete diff --git a/testsuite/forge/src/backend/local/swarm.rs b/testsuite/forge/src/backend/local/swarm.rs index cb1f8ba2989f7..eba54ae3c5bf3 100644 --- a/testsuite/forge/src/backend/local/swarm.rs +++ b/testsuite/forge/src/backend/local/swarm.rs @@ -650,6 +650,10 @@ impl Swarm for LocalSwarm { fn get_default_pfn_node_config(&self) -> NodeConfig { todo!() } + + fn has_indexer(&self) -> bool { + false + } } #[derive(Debug)] diff --git a/testsuite/forge/src/interface/prometheus_metrics.rs b/testsuite/forge/src/interface/prometheus_metrics.rs index cce9096aaac75..20283bbdb4e2c 100644 --- a/testsuite/forge/src/interface/prometheus_metrics.rs +++ b/testsuite/forge/src/interface/prometheus_metrics.rs @@ -43,6 +43,12 @@ impl fmt::Debug for MetricSamples { } } +impl Default for MetricSamples { + fn default() -> Self { + Self::new(vec![]) + } +} + #[derive(Clone, Debug)] pub struct SystemMetrics { pub cpu_core_metrics: MetricSamples, @@ -105,6 +111,11 @@ pub enum LatencyBreakdownSlice { ConsensusProposalToOrdered, ConsensusOrderedToCommit, ConsensusProposalToCommit, + // each of the indexer grpc steps in order + IndexerFullnodeProcessedBatch, + IndexerCacheWorkerProcessedBatch, + IndexerDataServiceAllChunksSent, + // TODO: add processor insertion into DB latency } #[derive(Clone, Debug)] @@ -119,10 +130,16 @@ impl LatencyBreakdown { self.0.keys().cloned().collect() } - pub fn get_samples(&self, slice: &LatencyBreakdownSlice) -> &MetricSamples { - self.0 - .get(slice) - .unwrap_or_else(|| panic!("Missing latency breakdown for {:?}", slice)) + pub fn get_samples(&self, slice: &LatencyBreakdownSlice) -> Option<&MetricSamples> { + self.0.get(slice) + } + + pub fn join(&self, other: &LatencyBreakdown) -> LatencyBreakdown { + let mut ret_latency = self.0.clone(); + for (slice, samples) in other.0.iter() { + ret_latency.insert(slice.clone(), samples.clone()); + } + LatencyBreakdown::new(ret_latency) } } @@ -210,5 +227,54 @@ pub async fn fetch_latency_breakdown( MetricSamples::new(consensus_proposal_to_commit_samples), ); + if swarm.has_indexer() { + // These counters are defined in ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs + let indexer_fullnode_processed_batch_query = + r#"max(indexer_grpc_duration_in_secs{step="4", service_type="indexer_fullnode"})"#; + let indexer_cache_worker_processed_batch_query = + r#"max(indexer_grpc_duration_in_secs{step="4", service_type="cache_worker"})"#; + let indexer_data_service_all_chunks_sent_query = + r#"max(indexer_grpc_duration_in_secs{step="4", service_type="data_service"})"#; + + let indexer_fullnode_processed_batch_samples = swarm + .query_range_metrics( + indexer_fullnode_processed_batch_query, + start_time as i64, + end_time as i64, + None, + ) + .await?; + + let indexer_cache_worker_processed_batch_samples = swarm + .query_range_metrics( + indexer_cache_worker_processed_batch_query, + start_time as i64, + end_time as i64, + None, + ) + .await?; + + let indexer_data_service_all_chunks_sent_samples = swarm + .query_range_metrics( + indexer_data_service_all_chunks_sent_query, + start_time as i64, + end_time as i64, + None, + ) + .await?; + + samples.insert( + LatencyBreakdownSlice::IndexerFullnodeProcessedBatch, + MetricSamples::new(indexer_fullnode_processed_batch_samples), + ); + samples.insert( + LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch, + MetricSamples::new(indexer_cache_worker_processed_batch_samples), + ); + samples.insert( + LatencyBreakdownSlice::IndexerDataServiceAllChunksSent, + MetricSamples::new(indexer_data_service_all_chunks_sent_samples), + ); + } Ok(LatencyBreakdown::new(samples)) } diff --git a/testsuite/forge/src/interface/swarm.rs b/testsuite/forge/src/interface/swarm.rs index b2c3f501ba5ec..4f378f37bc9d8 100644 --- a/testsuite/forge/src/interface/swarm.rs +++ b/testsuite/forge/src/interface/swarm.rs @@ -105,6 +105,10 @@ pub trait Swarm: Sync + Send { } fn get_default_pfn_node_config(&self) -> NodeConfig; + + /// Check if the swarm has an indexer. NOTE: in the future we should make this more rich, and include + /// indexer endpoints, similar to how we collect validator and fullnode endpoints. + fn has_indexer(&self) -> bool; } impl SwarmExt for T where T: Swarm {} diff --git a/testsuite/forge/src/success_criteria.rs b/testsuite/forge/src/success_criteria.rs index e1df551b4cfa9..e7383a87d12fe 100644 --- a/testsuite/forge/src/success_criteria.rs +++ b/testsuite/forge/src/success_criteria.rs @@ -150,7 +150,9 @@ impl LatencyBreakdownThreshold { traffic_name_addition: &String, ) -> anyhow::Result<()> { for (slice, threshold) in &self.thresholds { - let samples = metrics.get_samples(slice); + let samples = metrics + .get_samples(slice) + .expect("Could not get metric samples"); threshold.ensure_metrics_threshold( &format!("{:?}{}", slice, traffic_name_addition), samples.get(), diff --git a/testsuite/testcases/src/lib.rs b/testsuite/testcases/src/lib.rs index 1857815437254..92320a3136405 100644 --- a/testsuite/testcases/src/lib.rs +++ b/testsuite/testcases/src/lib.rs @@ -283,7 +283,10 @@ impl NetworkTest for dyn NetworkLoadTest { .keys() .into_iter() .map(|slice| { - let slice_samples = phase_stats.latency_breakdown.get_samples(&slice); + let slice_samples = phase_stats + .latency_breakdown + .get_samples(&slice) + .expect("Could not get samples"); format!( "{:?}: max: {:.3}, avg: {:.3}", slice, diff --git a/testsuite/testcases/src/load_vs_perf_benchmark.rs b/testsuite/testcases/src/load_vs_perf_benchmark.rs index e63ff8bc0ec56..634fda24f2ccb 100644 --- a/testsuite/testcases/src/load_vs_perf_benchmark.rs +++ b/testsuite/testcases/src/load_vs_perf_benchmark.rs @@ -6,7 +6,7 @@ use anyhow::Context; use aptos_forge::{ args::TransactionTypeArg, emitter::NumAccountsMode, - prometheus_metrics::{LatencyBreakdown, LatencyBreakdownSlice}, + prometheus_metrics::{LatencyBreakdown, LatencyBreakdownSlice, MetricSamples}, success_criteria::{SuccessCriteria, SuccessCriteriaChecker}, EmitJob, EmitJobMode, EmitJobRequest, NetworkContext, NetworkContextSynchronizer, NetworkTest, Result, Test, TxnStats, WorkflowProgress, @@ -471,7 +471,7 @@ fn to_table(type_name: String, results: &[Vec]) -> Vec { let mut table = Vec::new(); table.push(format!( - "{: ]) -> Vec { "pos->prop", "prop->order", "order->commit", - "actual dur" + "actual dur", + // optional indexer metrics + "idx_fn", + "idx_cache", + "idx_data", )); for run_results in results { for result in run_results { let rate = result.stats.rate(); table.push(format!( - "{: ]) -> Vec { rate.p50_latency as f64 / 1000.0, rate.p90_latency as f64 / 1000.0, rate.p99_latency as f64 / 1000.0, - result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).max_sample(), - result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).max_sample(), - result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(), - result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(), - result.actual_duration.as_secs() + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).unwrap_or(&MetricSamples::default()).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).unwrap_or(&MetricSamples::default()).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).unwrap_or(&MetricSamples::default()).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).unwrap_or(&MetricSamples::default()).max_sample(), + result.actual_duration.as_secs(), + // optional indexer metrics + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerFullnodeProcessedBatch).unwrap_or(&MetricSamples::default()).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch).unwrap_or(&MetricSamples::default()).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerDataServiceAllChunksSent).unwrap_or(&MetricSamples::default()).max_sample(), )); } }