Skip to content

Commit

Permalink
[forge] indexer SuccessCriteria and new test
Browse files Browse the repository at this point in the history
  • Loading branch information
rustielin committed Oct 3, 2024
1 parent e8a6faf commit 1408da3
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 0 deletions.
66 changes: 66 additions & 0 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ fn get_test_suite(
return Ok(test_suite);
} else if let Some(test_suite) = get_dag_test(test_name, duration, test_cmd) {
return Ok(test_suite);
} else if let Some(test_suite) = get_indexer_test(test_name) {
return Ok(test_suite);
}

// Otherwise, check the test name against the ungrouped test suites
Expand Down Expand Up @@ -691,6 +693,15 @@ fn get_land_blocking_test(
Some(test)
}

/// Attempts to match the test name to an indexer test
fn get_indexer_test(test_name: &str) -> Option<ForgeConfig> {
let test = match test_name {
"indexer_test" => indexer_test(),
_ => return None, // The test name does not match an indexer test
};
Some(test)
}

/// Attempts to match the test name to a network benchmark test
fn get_netbench_test(test_name: &str) -> Option<ForgeConfig> {
let test = match test_name {
Expand Down Expand Up @@ -2367,6 +2378,61 @@ fn multiregion_benchmark_test() -> ForgeConfig {
)
}

/// Workload sweep with multiple stressful workloads for indexer
fn indexer_test() -> ForgeConfig {
realistic_env_sweep_wrap(4, 4, LoadVsPerfBenchmark {
test: Box::new(PerformanceBenchmark),
workloads: Workloads::TRANSACTIONS(vec![
TransactionWorkload::new(TransactionTypeArg::CoinTransfer, 20000),
TransactionWorkload::new(TransactionTypeArg::NoOp, 20000).with_num_modules(100),
TransactionWorkload::new(TransactionTypeArg::ModifyGlobalResource, 6000)
.with_transactions_per_account(1),
TransactionWorkload::new(TransactionTypeArg::TokenV2AmbassadorMint, 20000)
.with_unique_senders(),
TransactionWorkload::new(TransactionTypeArg::PublishPackage, 200)
.with_transactions_per_account(1),
]),
// NOTE: tune these indexer latencies
criteria: [
(7000, 100, 1.0, 1.0, 1.0),
(8500, 100, 1.0, 1.0, 1.0),
(2000, 300, 1.0, 1.0, 1.0),
(3200, 500, 1.0, 1.0, 1.0),
(28, 5, 1.0, 1.0, 1.0),
]
.into_iter()
.map(
|(
min_tps,
max_expired,
indexer_fullnode_processed_batch,
indexer_cache_worker_processed_batch,
indexer_data_service_all_chunks_sent,
)| {
SuccessCriteria::new(min_tps)
.add_max_expired_tps(max_expired as f64)
.add_max_failed_submission_tps(200.0)
.add_latency_breakdown_threshold(LatencyBreakdownThreshold::new_strict(vec![
(
LatencyBreakdownSlice::IndexerFullnodeProcessedBatch,
indexer_fullnode_processed_batch,
),
(
LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch,
indexer_cache_worker_processed_batch,
),
(
LatencyBreakdownSlice::IndexerDataServiceAllChunksSent,
indexer_data_service_all_chunks_sent,
),
]))
},
)
.collect(),
background_traffic: background_traffic_for_sweep(5),
})
}

/// This test runs a constant-TPS benchmark where the network includes
/// PFNs, and the transactions are submitted to the PFNs. This is useful
/// for measuring latencies when the system is not saturated.
Expand Down
1 change: 1 addition & 0 deletions testsuite/forge/src/backend/k8s/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl Factory for K8sFactory {
self.keep,
new_era,
self.use_port_forward,
self.enable_indexer,
)
.await
.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions testsuite/forge/src/backend/k8s/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct K8sSwarm {
era: Option<String>,
use_port_forward: bool,
chaos_experiment_ops: Box<dyn ChaosExperimentOps + Send + Sync>,
has_indexer: bool,
}

impl K8sSwarm {
Expand All @@ -75,6 +76,7 @@ impl K8sSwarm {
keep: bool,
era: Option<String>,
use_port_forward: bool,
has_indexer: bool,
) -> Result<Self> {
let kube_client = create_k8s_client().await?;

Expand Down Expand Up @@ -123,6 +125,7 @@ impl K8sSwarm {
kube_client: kube_client.clone(),
kube_namespace: kube_namespace.to_string(),
}),
has_indexer,
};

// test hitting the configured prometheus endpoint
Expand Down Expand Up @@ -446,6 +449,10 @@ impl Swarm for K8sSwarm {
fn get_default_pfn_node_config(&self) -> NodeConfig {
get_default_pfn_node_config()
}

fn has_indexer(&self) -> bool {
self.has_indexer
}
}

/// Amount of time to wait for genesis to complete
Expand Down
4 changes: 4 additions & 0 deletions testsuite/forge/src/backend/local/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,10 @@ impl Swarm for LocalSwarm {
fn get_default_pfn_node_config(&self) -> NodeConfig {
todo!()
}

fn has_indexer(&self) -> bool {
false
}
}

#[derive(Debug)]
Expand Down
62 changes: 62 additions & 0 deletions testsuite/forge/src/interface/prometheus_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ pub enum LatencyBreakdownSlice {
ConsensusProposalToOrdered,
ConsensusOrderedToCommit,
ConsensusProposalToCommit,
// each of the indexer grpc steps in order
IndexerFullnodeProcessedBatch,
IndexerCacheWorkerProcessedBatch,
IndexerDataServiceAllChunksSent,
// TODO: add processor insertion into DB latency
}

#[derive(Clone, Debug)]
Expand All @@ -124,6 +129,14 @@ impl LatencyBreakdown {
.get(slice)
.unwrap_or_else(|| panic!("Missing latency breakdown for {:?}", slice))
}

pub fn join(&self, other: &LatencyBreakdown) -> LatencyBreakdown {
let mut ret_latency = self.0.clone();
for (slice, samples) in other.0.iter() {
ret_latency.insert(slice.clone(), samples.clone());
}
LatencyBreakdown::new(ret_latency)
}
}

pub async fn fetch_latency_breakdown(
Expand Down Expand Up @@ -210,5 +223,54 @@ pub async fn fetch_latency_breakdown(
MetricSamples::new(consensus_proposal_to_commit_samples),
);

if swarm.has_indexer() {
// These counters are defined in ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs
let indexer_fullnode_processed_batch_query =
r#"indexer_grpc_duration_in_secs{step="4", service_type="indexer_fullnode"}"#;
let indexer_cache_worker_processed_batch_query =
r#"indexer_grpc_duration_in_secs{step="4", service_type="cache_worker"}"#;
let indexer_data_service_all_chunks_sent_query =
r#"indexer_grpc_duration_in_secs{step="4", service_type="data_service"}"#;

let indexer_fullnode_processed_batch_samples = swarm
.query_range_metrics(
indexer_fullnode_processed_batch_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

let indexer_cache_worker_processed_batch_samples = swarm
.query_range_metrics(
indexer_cache_worker_processed_batch_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

let indexer_data_service_all_chunks_sent_samples = swarm
.query_range_metrics(
indexer_data_service_all_chunks_sent_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

samples.insert(
LatencyBreakdownSlice::IndexerFullnodeProcessedBatch,
MetricSamples::new(indexer_fullnode_processed_batch_samples),
);
samples.insert(
LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch,
MetricSamples::new(indexer_cache_worker_processed_batch_samples),
);
samples.insert(
LatencyBreakdownSlice::IndexerDataServiceAllChunksSent,
MetricSamples::new(indexer_data_service_all_chunks_sent_samples),
);
}
Ok(LatencyBreakdown::new(samples))
}
4 changes: 4 additions & 0 deletions testsuite/forge/src/interface/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ pub trait Swarm: Sync + Send {
}

fn get_default_pfn_node_config(&self) -> NodeConfig;

/// Check if the swarm has an indexer. NOTE: in the future we should make this more rich, and include
/// indexer endpoints, similar to how we collect validator and fullnode endpoints.
fn has_indexer(&self) -> bool;
}

impl<T: ?Sized> SwarmExt for T where T: Swarm {}
Expand Down

0 comments on commit 1408da3

Please sign in to comment.