Skip to content

Commit

Permalink
Update metrics to include inner transactions. (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder committed Jun 16, 2023
1 parent 55c2d00 commit 8328991
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 4 deletions.
19 changes: 16 additions & 3 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,17 +366,30 @@ func (p *pipelineImpl) Stop() {
}
}

func (p *pipelineImpl) addMetrics(block data.BlockData, importTime time.Duration) {
func numInnerTxn(txn sdk.SignedTxnWithAD) int {
result := 0
for _, itxn := range txn.ApplyData.EvalDelta.InnerTxns {
result += 1 + numInnerTxn(itxn)
}
return result
}

func addMetrics(block data.BlockData, importTime time.Duration) {
metrics.BlockImportTimeSeconds.Observe(importTime.Seconds())
metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)))
metrics.ImportedRoundGauge.Set(float64(block.Round()))
txnCountByType := make(map[string]int)
innerTxn := 0
for _, txn := range block.Payset {
txnCountByType[string(txn.Txn.Type)]++
innerTxn += numInnerTxn(txn.SignedTxnWithAD)
}
if innerTxn != 0 {
txnCountByType["inner"] = innerTxn
}
for k, v := range txnCountByType {
metrics.ImportedTxns.WithLabelValues(k).Set(float64(v))
}
metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)) + float64(innerTxn))
}

// Start pushes block data through the pipeline
Expand Down Expand Up @@ -465,7 +478,7 @@ func (p *pipelineImpl) Start() {
metrics.ExporterTimeSeconds.Observe(time.Since(exporterStart).Seconds())
// Ignore round 0 (which is empty).
if p.pipelineMetadata.NextRound > 1 {
p.addMetrics(blkData, time.Since(start))
addMetrics(blkData, time.Since(start))
}
p.setError(nil)
retry = 0
Expand Down
92 changes: 91 additions & 1 deletion conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/algorand/conduit/conduit"
"github.com/algorand/conduit/conduit/data"
_ "github.com/algorand/conduit/conduit/metrics"
"github.com/algorand/conduit/conduit/metrics"
"github.com/algorand/conduit/conduit/plugins"
"github.com/algorand/conduit/conduit/plugins/exporters"
"github.com/algorand/conduit/conduit/plugins/importers"
Expand Down Expand Up @@ -815,3 +815,93 @@ func TestMetricPrefixApplied(t *testing.T) {
pImpl.registerPluginMetricsCallbacks()
assert.Equal(t, prefix, mImporter.subsystem)
}

func TestMetrics(t *testing.T) {
// This test cannot run in parallel because the metrics are global.
basicTxn := func(t sdk.TxType) sdk.SignedTxnWithAD {
return sdk.SignedTxnWithAD{
SignedTxn: sdk.SignedTxn{
Txn: sdk.Transaction{
Type: t,
},
},
}
}
txnWithInner := func(t sdk.TxType, inner ...sdk.SignedTxnWithAD) sdk.SignedTxnWithAD {
result := basicTxn(t)
result.EvalDelta.InnerTxns = inner
return result
}
const round = sdk.Round(1234)

block := data.BlockData{
BlockHeader: sdk.BlockHeader{Round: round},
Payset: []sdk.SignedTxnInBlock{
{
SignedTxnWithAD: basicTxn(sdk.PaymentTx),
}, {
SignedTxnWithAD: basicTxn(sdk.KeyRegistrationTx),
}, {
SignedTxnWithAD: basicTxn(sdk.AssetConfigTx),
}, {
SignedTxnWithAD: basicTxn(sdk.AssetTransferTx),
}, {
SignedTxnWithAD: basicTxn(sdk.AssetFreezeTx),
}, {
SignedTxnWithAD: basicTxn(sdk.ApplicationCallTx),
}, {
SignedTxnWithAD: basicTxn(sdk.StateProofTx),
}, {
// counted as 1 app call and 6 inner txns
SignedTxnWithAD: txnWithInner(sdk.ApplicationCallTx,
basicTxn(sdk.PaymentTx),
txnWithInner(sdk.ApplicationCallTx,
basicTxn(sdk.PaymentTx),
basicTxn(sdk.PaymentTx)),
basicTxn(sdk.PaymentTx),
basicTxn(sdk.PaymentTx)),
},
},
}

assert.Equal(t, 6, numInnerTxn(block.Payset[7].SignedTxnWithAD))

metrics.RegisterPrometheusMetrics("add_metrics_test")
addMetrics(block, time.Hour)
stats, err := prometheus.DefaultGatherer.Gather()
require.NoError(t, err)
found := 0
for _, stat := range stats {
if strings.HasSuffix(*stat.Name, metrics.BlockImportTimeName) {
found++
// 1 hour in seconds
assert.Contains(t, stat.String(), "sample_count:1 sample_sum:3600")
}
if strings.HasSuffix(*stat.Name, metrics.ImportedRoundGaugeName) {
found++
assert.Contains(t, stat.String(), "value:1234")
}
if strings.HasSuffix(*stat.Name, metrics.ImportedTxnsPerBlockName) {
found++
assert.Contains(t, stat.String(), "sample_count:1 sample_sum:14")
}
if strings.HasSuffix(*stat.Name, metrics.ImportedTxnsName) {
found++
str := stat.String()
// the 6 single txns
assert.Contains(t, str, `label:<name:"txn_type" value:"acfg" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"afrz" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"axfer" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"keyreg" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"pay" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"stpf" > gauge:<value:1 >`)

// 2 app call txns
assert.Contains(t, str, `label:<name:"txn_type" value:"appl" > gauge:<value:2 >`)

// 1 app had 6 inner txns
assert.Contains(t, str, `label:<name:"txn_type" value:"inner" > gauge:<value:6 >`)
}
}
assert.Equal(t, 4, found)
}

0 comments on commit 8328991

Please sign in to comment.