From 8328991841ce295a2300bd90bcdf93f3ab5cb366 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Fri, 16 Jun 2023 16:06:15 -0400 Subject: [PATCH] Update metrics to include inner transactions. (#105) --- conduit/pipeline/pipeline.go | 19 ++++++- conduit/pipeline/pipeline_test.go | 92 ++++++++++++++++++++++++++++++- 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 66a24b3b..892156a5 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -366,17 +366,30 @@ func (p *pipelineImpl) Stop() { } } -func (p *pipelineImpl) addMetrics(block data.BlockData, importTime time.Duration) { +func numInnerTxn(txn sdk.SignedTxnWithAD) int { + result := 0 + for _, itxn := range txn.ApplyData.EvalDelta.InnerTxns { + result += 1 + numInnerTxn(itxn) + } + return result +} + +func addMetrics(block data.BlockData, importTime time.Duration) { metrics.BlockImportTimeSeconds.Observe(importTime.Seconds()) - metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset))) metrics.ImportedRoundGauge.Set(float64(block.Round())) txnCountByType := make(map[string]int) + innerTxn := 0 for _, txn := range block.Payset { txnCountByType[string(txn.Txn.Type)]++ + innerTxn += numInnerTxn(txn.SignedTxnWithAD) + } + if innerTxn != 0 { + txnCountByType["inner"] = innerTxn } for k, v := range txnCountByType { metrics.ImportedTxns.WithLabelValues(k).Set(float64(v)) } + metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)) + float64(innerTxn)) } // Start pushes block data through the pipeline @@ -465,7 +478,7 @@ func (p *pipelineImpl) Start() { metrics.ExporterTimeSeconds.Observe(time.Since(exporterStart).Seconds()) // Ignore round 0 (which is empty). if p.pipelineMetadata.NextRound > 1 { - p.addMetrics(blkData, time.Since(start)) + addMetrics(blkData, time.Since(start)) } p.setError(nil) retry = 0 diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index bf20e67a..dc716537 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -25,7 +25,7 @@ import ( "github.com/algorand/conduit/conduit" "github.com/algorand/conduit/conduit/data" - _ "github.com/algorand/conduit/conduit/metrics" + "github.com/algorand/conduit/conduit/metrics" "github.com/algorand/conduit/conduit/plugins" "github.com/algorand/conduit/conduit/plugins/exporters" "github.com/algorand/conduit/conduit/plugins/importers" @@ -815,3 +815,93 @@ func TestMetricPrefixApplied(t *testing.T) { pImpl.registerPluginMetricsCallbacks() assert.Equal(t, prefix, mImporter.subsystem) } + +func TestMetrics(t *testing.T) { + // This test cannot run in parallel because the metrics are global. + basicTxn := func(t sdk.TxType) sdk.SignedTxnWithAD { + return sdk.SignedTxnWithAD{ + SignedTxn: sdk.SignedTxn{ + Txn: sdk.Transaction{ + Type: t, + }, + }, + } + } + txnWithInner := func(t sdk.TxType, inner ...sdk.SignedTxnWithAD) sdk.SignedTxnWithAD { + result := basicTxn(t) + result.EvalDelta.InnerTxns = inner + return result + } + const round = sdk.Round(1234) + + block := data.BlockData{ + BlockHeader: sdk.BlockHeader{Round: round}, + Payset: []sdk.SignedTxnInBlock{ + { + SignedTxnWithAD: basicTxn(sdk.PaymentTx), + }, { + SignedTxnWithAD: basicTxn(sdk.KeyRegistrationTx), + }, { + SignedTxnWithAD: basicTxn(sdk.AssetConfigTx), + }, { + SignedTxnWithAD: basicTxn(sdk.AssetTransferTx), + }, { + SignedTxnWithAD: basicTxn(sdk.AssetFreezeTx), + }, { + SignedTxnWithAD: basicTxn(sdk.ApplicationCallTx), + }, { + SignedTxnWithAD: basicTxn(sdk.StateProofTx), + }, { + // counted as 1 app call and 6 inner txns + SignedTxnWithAD: txnWithInner(sdk.ApplicationCallTx, + basicTxn(sdk.PaymentTx), + txnWithInner(sdk.ApplicationCallTx, + basicTxn(sdk.PaymentTx), + basicTxn(sdk.PaymentTx)), + basicTxn(sdk.PaymentTx), + basicTxn(sdk.PaymentTx)), + }, + }, + } + + assert.Equal(t, 6, numInnerTxn(block.Payset[7].SignedTxnWithAD)) + + metrics.RegisterPrometheusMetrics("add_metrics_test") + addMetrics(block, time.Hour) + stats, err := prometheus.DefaultGatherer.Gather() + require.NoError(t, err) + found := 0 + for _, stat := range stats { + if strings.HasSuffix(*stat.Name, metrics.BlockImportTimeName) { + found++ + // 1 hour in seconds + assert.Contains(t, stat.String(), "sample_count:1 sample_sum:3600") + } + if strings.HasSuffix(*stat.Name, metrics.ImportedRoundGaugeName) { + found++ + assert.Contains(t, stat.String(), "value:1234") + } + if strings.HasSuffix(*stat.Name, metrics.ImportedTxnsPerBlockName) { + found++ + assert.Contains(t, stat.String(), "sample_count:1 sample_sum:14") + } + if strings.HasSuffix(*stat.Name, metrics.ImportedTxnsName) { + found++ + str := stat.String() + // the 6 single txns + assert.Contains(t, str, `label: gauge:`) + assert.Contains(t, str, `label: gauge:`) + assert.Contains(t, str, `label: gauge:`) + assert.Contains(t, str, `label: gauge:`) + assert.Contains(t, str, `label: gauge:`) + assert.Contains(t, str, `label: gauge:`) + + // 2 app call txns + assert.Contains(t, str, `label: gauge:`) + + // 1 app had 6 inner txns + assert.Contains(t, str, `label: gauge:`) + } + } + assert.Equal(t, 4, found) +}