Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[forge] indexer SuccessCriteria and new test #14851

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ fn get_test_suite(
return Ok(test_suite);
} else if let Some(test_suite) = get_dag_test(test_name, duration, test_cmd) {
return Ok(test_suite);
} else if let Some(test_suite) = get_indexer_test(test_name) {
return Ok(test_suite);
}

// Otherwise, check the test name against the ungrouped test suites
Expand Down Expand Up @@ -691,6 +693,15 @@ fn get_land_blocking_test(
Some(test)
}

/// Attempts to match the test name to an indexer test
fn get_indexer_test(test_name: &str) -> Option<ForgeConfig> {
let test = match test_name {
"indexer_test" => indexer_test(),
_ => return None, // The test name does not match an indexer test
};
Some(test)
}

/// Attempts to match the test name to a network benchmark test
fn get_netbench_test(test_name: &str) -> Option<ForgeConfig> {
let test = match test_name {
Expand Down Expand Up @@ -2367,6 +2378,99 @@ fn multiregion_benchmark_test() -> ForgeConfig {
)
}

/// Workload sweep with multiple stressful workloads for indexer
fn indexer_test() -> ForgeConfig {
// Define all the workloads and their corresponding success criteria upfront
// The TransactionTypeArg is the workload per phase
// The structure of the success criteria is generally (min_tps, latencies...). See below for the exact definition.
let workloads_and_criteria = vec![
(
TransactionWorkload::new(TransactionTypeArg::CoinTransfer, 20000),
(7000, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::NoOp, 20000).with_num_modules(100),
(8500, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::ModifyGlobalResource, 6000)
.with_transactions_per_account(1),
(2000, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::TokenV2AmbassadorMint, 20000)
.with_unique_senders(),
(3200, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::PublishPackage, 200)
.with_transactions_per_account(1),
(28, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::VectorPicture30k, 100),
(100, 1.0, 1.0, 1.0),
),
(
TransactionWorkload::new(TransactionTypeArg::SmartTablePicture30KWith200Change, 100),
(100, 1.0, 1.0, 1.0),
),
(
TransactionWorkload::new(
TransactionTypeArg::TokenV1NFTMintAndTransferSequential,
1000,
),
(500, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::TokenV1FTMintAndTransfer, 1000),
(500, 0.5, 0.5, 0.5),
),
];
let num_sweep = workloads_and_criteria.len();

let workloads = Workloads::TRANSACTIONS(
workloads_and_criteria
.iter()
.map(|(w, _)| w.clone())
.collect(),
);
let criteria = workloads_and_criteria
.iter()
.map(|(_, c)| {
let (
min_tps,
indexer_fullnode_processed_batch,
indexer_cache_worker_processed_batch,
indexer_data_service_all_chunks_sent,
) = c.to_owned();
SuccessCriteria::new(min_tps).add_latency_breakdown_threshold(
LatencyBreakdownThreshold::new_strict(vec![
(
LatencyBreakdownSlice::IndexerFullnodeProcessedBatch,
indexer_fullnode_processed_batch,
),
(
LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch,
indexer_cache_worker_processed_batch,
),
(
LatencyBreakdownSlice::IndexerDataServiceAllChunksSent,
indexer_data_service_all_chunks_sent,
),
]),
)
})
.collect::<Vec<_>>();

realistic_env_sweep_wrap(4, 4, LoadVsPerfBenchmark {
test: Box::new(PerformanceBenchmark),
workloads,
criteria,
background_traffic: background_traffic_for_sweep(num_sweep),
})
}

/// This test runs a constant-TPS benchmark where the network includes
/// PFNs, and the transactions are submitted to the PFNs. This is useful
/// for measuring latencies when the system is not saturated.
Expand Down
1 change: 1 addition & 0 deletions testsuite/forge/src/backend/k8s/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl Factory for K8sFactory {
self.keep,
new_era,
self.use_port_forward,
self.enable_indexer,
)
.await
.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions testsuite/forge/src/backend/k8s/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct K8sSwarm {
era: Option<String>,
use_port_forward: bool,
chaos_experiment_ops: Box<dyn ChaosExperimentOps + Send + Sync>,
has_indexer: bool,
}

impl K8sSwarm {
Expand All @@ -75,6 +76,7 @@ impl K8sSwarm {
keep: bool,
era: Option<String>,
use_port_forward: bool,
has_indexer: bool,
) -> Result<Self> {
let kube_client = create_k8s_client().await?;

Expand Down Expand Up @@ -123,6 +125,7 @@ impl K8sSwarm {
kube_client: kube_client.clone(),
kube_namespace: kube_namespace.to_string(),
}),
has_indexer,
};

// test hitting the configured prometheus endpoint
Expand Down Expand Up @@ -446,6 +449,10 @@ impl Swarm for K8sSwarm {
fn get_default_pfn_node_config(&self) -> NodeConfig {
get_default_pfn_node_config()
}

fn has_indexer(&self) -> bool {
self.has_indexer
}
}

/// Amount of time to wait for genesis to complete
Expand Down
4 changes: 4 additions & 0 deletions testsuite/forge/src/backend/local/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,10 @@ impl Swarm for LocalSwarm {
fn get_default_pfn_node_config(&self) -> NodeConfig {
todo!()
}

fn has_indexer(&self) -> bool {
false
}
}

#[derive(Debug)]
Expand Down
74 changes: 70 additions & 4 deletions testsuite/forge/src/interface/prometheus_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ impl fmt::Debug for MetricSamples {
}
}

impl Default for MetricSamples {
fn default() -> Self {
Self::new(vec![])
}
}

#[derive(Clone, Debug)]
pub struct SystemMetrics {
pub cpu_core_metrics: MetricSamples,
Expand Down Expand Up @@ -105,6 +111,11 @@ pub enum LatencyBreakdownSlice {
ConsensusProposalToOrdered,
ConsensusOrderedToCommit,
ConsensusProposalToCommit,
// each of the indexer grpc steps in order
IndexerFullnodeProcessedBatch,
IndexerCacheWorkerProcessedBatch,
IndexerDataServiceAllChunksSent,
// TODO: add processor insertion into DB latency
}

#[derive(Clone, Debug)]
Expand All @@ -119,10 +130,16 @@ impl LatencyBreakdown {
self.0.keys().cloned().collect()
}

pub fn get_samples(&self, slice: &LatencyBreakdownSlice) -> &MetricSamples {
self.0
.get(slice)
.unwrap_or_else(|| panic!("Missing latency breakdown for {:?}", slice))
pub fn get_samples(&self, slice: &LatencyBreakdownSlice) -> Option<&MetricSamples> {
self.0.get(slice)
}

pub fn join(&self, other: &LatencyBreakdown) -> LatencyBreakdown {
let mut ret_latency = self.0.clone();
for (slice, samples) in other.0.iter() {
ret_latency.insert(slice.clone(), samples.clone());
}
LatencyBreakdown::new(ret_latency)
}
}

Expand Down Expand Up @@ -210,5 +227,54 @@ pub async fn fetch_latency_breakdown(
MetricSamples::new(consensus_proposal_to_commit_samples),
);

if swarm.has_indexer() {
// These counters are defined in ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs
let indexer_fullnode_processed_batch_query =
r#"max(indexer_grpc_duration_in_secs{step="4", service_type="indexer_fullnode"})"#;
let indexer_cache_worker_processed_batch_query =
r#"max(indexer_grpc_duration_in_secs{step="4", service_type="cache_worker"})"#;
let indexer_data_service_all_chunks_sent_query =
r#"max(indexer_grpc_duration_in_secs{step="4", service_type="data_service"})"#;

let indexer_fullnode_processed_batch_samples = swarm
.query_range_metrics(
indexer_fullnode_processed_batch_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

let indexer_cache_worker_processed_batch_samples = swarm
.query_range_metrics(
indexer_cache_worker_processed_batch_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

let indexer_data_service_all_chunks_sent_samples = swarm
.query_range_metrics(
indexer_data_service_all_chunks_sent_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

samples.insert(
LatencyBreakdownSlice::IndexerFullnodeProcessedBatch,
MetricSamples::new(indexer_fullnode_processed_batch_samples),
);
samples.insert(
LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch,
MetricSamples::new(indexer_cache_worker_processed_batch_samples),
);
samples.insert(
LatencyBreakdownSlice::IndexerDataServiceAllChunksSent,
MetricSamples::new(indexer_data_service_all_chunks_sent_samples),
);
}
Ok(LatencyBreakdown::new(samples))
}
4 changes: 4 additions & 0 deletions testsuite/forge/src/interface/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ pub trait Swarm: Sync + Send {
}

fn get_default_pfn_node_config(&self) -> NodeConfig;

/// Check if the swarm has an indexer. NOTE: in the future we should make this more rich, and include
/// indexer endpoints, similar to how we collect validator and fullnode endpoints.
fn has_indexer(&self) -> bool;
}

impl<T: ?Sized> SwarmExt for T where T: Swarm {}
Expand Down
4 changes: 3 additions & 1 deletion testsuite/forge/src/success_criteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ impl LatencyBreakdownThreshold {
traffic_name_addition: &String,
) -> anyhow::Result<()> {
for (slice, threshold) in &self.thresholds {
let samples = metrics.get_samples(slice);
let samples = metrics
.get_samples(slice)
.expect("Could not get metric samples");
threshold.ensure_metrics_threshold(
&format!("{:?}{}", slice, traffic_name_addition),
samples.get(),
Expand Down
5 changes: 4 additions & 1 deletion testsuite/testcases/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ impl NetworkTest for dyn NetworkLoadTest {
.keys()
.into_iter()
.map(|slice| {
let slice_samples = phase_stats.latency_breakdown.get_samples(&slice);
let slice_samples = phase_stats
.latency_breakdown
.get_samples(&slice)
.expect("Could not get samples");
format!(
"{:?}: max: {:.3}, avg: {:.3}",
slice,
Expand Down
26 changes: 17 additions & 9 deletions testsuite/testcases/src/load_vs_perf_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Context;
use aptos_forge::{
args::TransactionTypeArg,
emitter::NumAccountsMode,
prometheus_metrics::{LatencyBreakdown, LatencyBreakdownSlice},
prometheus_metrics::{LatencyBreakdown, LatencyBreakdownSlice, MetricSamples},
success_criteria::{SuccessCriteria, SuccessCriteriaChecker},
EmitJob, EmitJobMode, EmitJobRequest, NetworkContext, NetworkContextSynchronizer, NetworkTest,
Result, Test, TxnStats, WorkflowProgress,
Expand Down Expand Up @@ -471,7 +471,7 @@ fn to_table(type_name: String, results: &[Vec<SingleRunStats>]) -> Vec<String> {

let mut table = Vec::new();
table.push(format!(
"{: <name_width$} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}",
"{: <name_width$} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}",
type_name,
"submitted/s",
"committed/s",
Expand All @@ -486,14 +486,18 @@ fn to_table(type_name: String, results: &[Vec<SingleRunStats>]) -> Vec<String> {
"pos->prop",
"prop->order",
"order->commit",
"actual dur"
"actual dur",
// optional indexer metrics
"idx_fn",
"idx_cache",
"idx_data",
));

for run_results in results {
for result in run_results {
let rate = result.stats.rate();
table.push(format!(
"{: <name_width$} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12}",
"{: <name_width$} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12} | {: <12.3} | {: <12.3} | {: <12.3}",
result.name,
rate.submitted,
rate.committed,
Expand All @@ -504,11 +508,15 @@ fn to_table(type_name: String, results: &[Vec<SingleRunStats>]) -> Vec<String> {
rate.p50_latency as f64 / 1000.0,
rate.p90_latency as f64 / 1000.0,
rate.p99_latency as f64 / 1000.0,
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(),
result.actual_duration.as_secs()
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).unwrap_or(&MetricSamples::default()).max_sample(),
result.actual_duration.as_secs(),
// optional indexer metrics
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerFullnodeProcessedBatch).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerDataServiceAllChunksSent).unwrap_or(&MetricSamples::default()).max_sample(),
));
}
}
Expand Down
Loading