diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index 3deb5ebfa71c4..b2da9cdbb0e9c 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -603,6 +603,8 @@ fn get_test_suite( return Ok(test_suite); } else if let Some(test_suite) = get_dag_test(test_name, duration, test_cmd) { return Ok(test_suite); + } else if let Some(test_suite) = get_indexer_test(test_name) { + return Ok(test_suite); } // Otherwise, check the test name against the ungrouped test suites @@ -691,6 +693,15 @@ fn get_land_blocking_test( Some(test) } +/// Attempts to match the test name to an indexer test +fn get_indexer_test(test_name: &str) -> Option { + let test = match test_name { + "indexer_test" => indexer_test(), + _ => return None, // The test name does not match an indexer test + }; + Some(test) +} + /// Attempts to match the test name to a network benchmark test fn get_netbench_test(test_name: &str) -> Option { let test = match test_name { @@ -2367,6 +2378,61 @@ fn multiregion_benchmark_test() -> ForgeConfig { ) } +/// Workload sweep with multiple stressful workloads for indexer +fn indexer_test() -> ForgeConfig { + realistic_env_sweep_wrap(4, 4, LoadVsPerfBenchmark { + test: Box::new(PerformanceBenchmark), + workloads: Workloads::TRANSACTIONS(vec![ + TransactionWorkload::new(TransactionTypeArg::CoinTransfer, 20000), + TransactionWorkload::new(TransactionTypeArg::NoOp, 20000).with_num_modules(100), + TransactionWorkload::new(TransactionTypeArg::ModifyGlobalResource, 6000) + .with_transactions_per_account(1), + TransactionWorkload::new(TransactionTypeArg::TokenV2AmbassadorMint, 20000) + .with_unique_senders(), + TransactionWorkload::new(TransactionTypeArg::PublishPackage, 200) + .with_transactions_per_account(1), + ]), + // NOTE: tune these indexer latencies + criteria: [ + (7000, 100, 1.0, 1.0, 1.0), + (8500, 100, 1.0, 1.0, 1.0), + (2000, 300, 1.0, 1.0, 1.0), + (3200, 500, 1.0, 1.0, 1.0), + (28, 5, 1.0, 1.0, 1.0), + ] + .into_iter() + .map( + |( + min_tps, + max_expired, + indexer_fullnode_processed_batch, + indexer_cache_worker_processed_batch, + indexer_data_service_all_chunks_sent, + )| { + SuccessCriteria::new(min_tps) + .add_max_expired_tps(max_expired as f64) + .add_max_failed_submission_tps(200.0) + .add_latency_breakdown_threshold(LatencyBreakdownThreshold::new_strict(vec![ + ( + LatencyBreakdownSlice::IndexerFullnodeProcessedBatch, + indexer_fullnode_processed_batch, + ), + ( + LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch, + indexer_cache_worker_processed_batch, + ), + ( + LatencyBreakdownSlice::IndexerDataServiceAllChunksSent, + indexer_data_service_all_chunks_sent, + ), + ])) + }, + ) + .collect(), + background_traffic: background_traffic_for_sweep(5), + }) +} + /// This test runs a constant-TPS benchmark where the network includes /// PFNs, and the transactions are submitted to the PFNs. This is useful /// for measuring latencies when the system is not saturated. diff --git a/testsuite/forge/src/backend/k8s/mod.rs b/testsuite/forge/src/backend/k8s/mod.rs index 260887b9145b3..782d9d7aa8749 100644 --- a/testsuite/forge/src/backend/k8s/mod.rs +++ b/testsuite/forge/src/backend/k8s/mod.rs @@ -245,6 +245,7 @@ impl Factory for K8sFactory { self.keep, new_era, self.use_port_forward, + self.enable_indexer, ) .await .unwrap(); diff --git a/testsuite/forge/src/backend/k8s/swarm.rs b/testsuite/forge/src/backend/k8s/swarm.rs index 4d3c30e31801e..276fe1b10bbb6 100644 --- a/testsuite/forge/src/backend/k8s/swarm.rs +++ b/testsuite/forge/src/backend/k8s/swarm.rs @@ -62,6 +62,7 @@ pub struct K8sSwarm { era: Option, use_port_forward: bool, chaos_experiment_ops: Box, + has_indexer: bool, } impl K8sSwarm { @@ -75,6 +76,7 @@ impl K8sSwarm { keep: bool, era: Option, use_port_forward: bool, + has_indexer: bool, ) -> Result { let kube_client = create_k8s_client().await?; @@ -123,6 +125,7 @@ impl K8sSwarm { kube_client: kube_client.clone(), kube_namespace: kube_namespace.to_string(), }), + has_indexer, }; // test hitting the configured prometheus endpoint @@ -446,6 +449,10 @@ impl Swarm for K8sSwarm { fn get_default_pfn_node_config(&self) -> NodeConfig { get_default_pfn_node_config() } + + fn has_indexer(&self) -> bool { + self.has_indexer + } } /// Amount of time to wait for genesis to complete diff --git a/testsuite/forge/src/backend/local/swarm.rs b/testsuite/forge/src/backend/local/swarm.rs index cb1f8ba2989f7..eba54ae3c5bf3 100644 --- a/testsuite/forge/src/backend/local/swarm.rs +++ b/testsuite/forge/src/backend/local/swarm.rs @@ -650,6 +650,10 @@ impl Swarm for LocalSwarm { fn get_default_pfn_node_config(&self) -> NodeConfig { todo!() } + + fn has_indexer(&self) -> bool { + false + } } #[derive(Debug)] diff --git a/testsuite/forge/src/interface/prometheus_metrics.rs b/testsuite/forge/src/interface/prometheus_metrics.rs index cce9096aaac75..0d103c4a38827 100644 --- a/testsuite/forge/src/interface/prometheus_metrics.rs +++ b/testsuite/forge/src/interface/prometheus_metrics.rs @@ -105,6 +105,11 @@ pub enum LatencyBreakdownSlice { ConsensusProposalToOrdered, ConsensusOrderedToCommit, ConsensusProposalToCommit, + // each of the indexer grpc steps in order + IndexerFullnodeProcessedBatch, + IndexerCacheWorkerProcessedBatch, + IndexerDataServiceAllChunksSent, + // TODO: add processor insertion into DB latency } #[derive(Clone, Debug)] @@ -124,6 +129,14 @@ impl LatencyBreakdown { .get(slice) .unwrap_or_else(|| panic!("Missing latency breakdown for {:?}", slice)) } + + pub fn join(&self, other: &LatencyBreakdown) -> LatencyBreakdown { + let mut ret_latency = self.0.clone(); + for (slice, samples) in other.0.iter() { + ret_latency.insert(slice.clone(), samples.clone()); + } + LatencyBreakdown::new(ret_latency) + } } pub async fn fetch_latency_breakdown( @@ -210,5 +223,54 @@ pub async fn fetch_latency_breakdown( MetricSamples::new(consensus_proposal_to_commit_samples), ); + if swarm.has_indexer() { + // These counters are defined in ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs + let indexer_fullnode_processed_batch_query = + r#"indexer_grpc_duration_in_secs{step="4", service_type="indexer_fullnode"}"#; + let indexer_cache_worker_processed_batch_query = + r#"indexer_grpc_duration_in_secs{step="4", service_type="cache_worker"}"#; + let indexer_data_service_all_chunks_sent_query = + r#"indexer_grpc_duration_in_secs{step="4", service_type="data_service"}"#; + + let indexer_fullnode_processed_batch_samples = swarm + .query_range_metrics( + indexer_fullnode_processed_batch_query, + start_time as i64, + end_time as i64, + None, + ) + .await?; + + let indexer_cache_worker_processed_batch_samples = swarm + .query_range_metrics( + indexer_cache_worker_processed_batch_query, + start_time as i64, + end_time as i64, + None, + ) + .await?; + + let indexer_data_service_all_chunks_sent_samples = swarm + .query_range_metrics( + indexer_data_service_all_chunks_sent_query, + start_time as i64, + end_time as i64, + None, + ) + .await?; + + samples.insert( + LatencyBreakdownSlice::IndexerFullnodeProcessedBatch, + MetricSamples::new(indexer_fullnode_processed_batch_samples), + ); + samples.insert( + LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch, + MetricSamples::new(indexer_cache_worker_processed_batch_samples), + ); + samples.insert( + LatencyBreakdownSlice::IndexerDataServiceAllChunksSent, + MetricSamples::new(indexer_data_service_all_chunks_sent_samples), + ); + } Ok(LatencyBreakdown::new(samples)) } diff --git a/testsuite/forge/src/interface/swarm.rs b/testsuite/forge/src/interface/swarm.rs index b2c3f501ba5ec..4f378f37bc9d8 100644 --- a/testsuite/forge/src/interface/swarm.rs +++ b/testsuite/forge/src/interface/swarm.rs @@ -105,6 +105,10 @@ pub trait Swarm: Sync + Send { } fn get_default_pfn_node_config(&self) -> NodeConfig; + + /// Check if the swarm has an indexer. NOTE: in the future we should make this more rich, and include + /// indexer endpoints, similar to how we collect validator and fullnode endpoints. + fn has_indexer(&self) -> bool; } impl SwarmExt for T where T: Swarm {}