Skip to content

Commit

Permalink
Migrate ContractSetMetrcis to raw SQL (#1306)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Jun 17, 2024
1 parent b90c8e8 commit d1510f1
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 40 deletions.
46 changes: 8 additions & 38 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,15 @@ func (s *SQLStore) ContractSetChurnMetrics(ctx context.Context, start time.Time,
return resp, nil
}

func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error) {
metrics, err := s.contractSetMetrics(ctx, start, n, interval, opts)
func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) (metrics []api.ContractSetMetric, _ error) {
err := s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) (err error) {
metrics, err = tx.ContractSetMetrics(ctx, start, n, interval, opts)
return
})
if err != nil {
return nil, err
}
resp := make([]api.ContractSetMetric, len(metrics))
for i := range resp {
resp[i] = api.ContractSetMetric{
Contracts: metrics[i].Contracts,
Name: metrics[i].Name,
Timestamp: api.TimeRFC3339(time.Time(metrics[i].Timestamp).UTC()),
}
}
return resp, nil
return
}

func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) {
Expand Down Expand Up @@ -281,16 +276,8 @@ func (s *SQLStore) RecordContractSetChurnMetric(ctx context.Context, metrics ...
}

func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
dbMetrics := make([]dbContractSetMetric, len(metrics))
for i, metric := range metrics {
dbMetrics[i] = dbContractSetMetric{
Contracts: metric.Contracts,
Name: metric.Name,
Timestamp: unixTimeMS(metric.Timestamp),
}
}
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
return s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error {
return tx.RecordContractSetMetric(ctx, metrics...)
})
}

Expand Down Expand Up @@ -462,23 +449,6 @@ func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time,
return metrics, nil
}

func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]dbContractSetMetric, error) {
whereExpr := gorm.Expr("TRUE")
if opts.Name != "" {
whereExpr = gorm.Expr("name = ?", opts.Name)
}

var metrics []dbContractSetMetric
err := s.findPeriods(ctx, dbContractSetMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err)
}
for i, m := range metrics {
metrics[i].Timestamp = normaliseTimestamp(start, interval, m.Timestamp)
}
return metrics, nil
}

func normaliseTimestamp(start time.Time, interval time.Duration, t unixTimeMS) unixTimeMS {
startMS := start.UnixMilli()
toNormaliseMS := time.Time(t).UnixMilli()
Expand Down
2 changes: 1 addition & 1 deletion stores/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestContractSetMetrics(t *testing.T) {
} else if m := metrics[0]; m.Contracts != 0 {
t.Fatalf("expected 0 contracts, got %v", m.Contracts)
} else if ti := time.Time(m.Timestamp); !ti.Equal(testStart) {
t.Fatal("expected time to match start time")
t.Fatalf("expected time to match start time, %v != %v", ti, testStart)
} else if m.Name != testContractSet {
t.Fatalf("expected name to be %v, got %v", testContractSet, m.Name)
}
Expand Down
7 changes: 7 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,15 @@ type (
// time range and options.
ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error)

// ContractSetMetrics returns the contract set metrics for the given
// time range and options.
ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error)

// RecordContractPruneMetric records a contract prune metric.
RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error

// RecordContractSetMetric records a contract set metric.
RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error
}

UsedContract struct {
Expand Down
68 changes: 67 additions & 1 deletion stores/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func ContractPruneMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) (metrics []api.ContractPruneMetric, _ error) {
rows, err := queryPeriods(ctx, tx, start, n, interval, opts)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract metrics: %w", err)
return nil, fmt.Errorf("failed to fetch contract prune metrics: %w", err)
}
defer rows.Close()

Expand All @@ -32,6 +32,29 @@ func ContractPruneMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uin
return metrics, nil
}

func ContractSetMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) (metrics []api.ContractSetMetric, _ error) {
rows, err := queryPeriods(ctx, tx, start, n, interval, opts)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err)
}
defer rows.Close()

for rows.Next() {
var csm api.ContractSetMetric
var placeHolder int64
var placeHolderTime time.Time
var timestamp UnixTimeMS
if err := rows.Scan(&placeHolder, &placeHolderTime, &timestamp, &csm.Name, &csm.Contracts); err != nil {
return nil, fmt.Errorf("failed to scan contract set metric: %w", err)
}

csm.Timestamp = api.TimeRFC3339(normaliseTimestamp(start, interval, timestamp))
metrics = append(metrics, csm)
}

return metrics, nil
}

func RecordContractPruneMetric(ctx context.Context, tx sql.Tx, metrics ...api.ContractPruneMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO contract_prunes (created_at, timestamp, fcid, host, host_version, pruned, remaining, duration) VALUES (?, ?,?, ?, ?, ?, ?, ?)")
if err != nil {
Expand Down Expand Up @@ -62,6 +85,32 @@ func RecordContractPruneMetric(ctx context.Context, tx sql.Tx, metrics ...api.Co
return nil
}

func RecordContractSetMetric(ctx context.Context, tx sql.Tx, metrics ...api.ContractSetMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO contract_sets (created_at, timestamp, name, contracts) VALUES (?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert contract set metric: %w", err)
}
defer insertStmt.Close()

for _, metric := range metrics {
res, err := insertStmt.Exec(ctx,
time.Now().UTC(),
UnixTimeMS(metric.Timestamp),
metric.Name,
metric.Contracts,
)
if err != nil {
return fmt.Errorf("failed to insert contract set metric: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n == 0 {
return fmt.Errorf("failed to insert contract set metric: no rows affected")
}
}

return nil
}

func queryPeriods(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts interface{}) (*sql.LoggedRows, error) {
if n > api.MetricMaxIntervals {
return nil, api.ErrMaxIntervalsExceeded
Expand Down Expand Up @@ -130,9 +179,26 @@ func whereClauseFromQueryOpts(opts interface{}) (where whereClause, _ error) {
where.query += " AND host_version = ?"
where.params = append(where.params, opts.HostVersion)
}
case api.ContractSetMetricsQueryOpts:
where.table = "contract_sets"
if opts.Name != "" {
where.query += " AND name = ?"
where.params = append(where.params, opts.Name)
}
default:
return whereClause{}, fmt.Errorf("unknown query opts type: %T", opts)
}

return
}

func normaliseTimestamp(start time.Time, interval time.Duration, t UnixTimeMS) UnixTimeMS {
startMS := start.UnixMilli()
toNormaliseMS := time.Time(t).UnixMilli()
intervalMS := interval.Milliseconds()
if startMS > toNormaliseMS {
return UnixTimeMS(start)
}
normalizedMS := (toNormaliseMS-startMS)/intervalMS*intervalMS + start.UnixMilli()
return UnixTimeMS(time.UnixMilli(normalizedMS))
}
10 changes: 10 additions & 0 deletions stores/sql/mysql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type (
}
)

var _ ssql.MetricsDatabaseTx = (*MetricsDatabaseTx)(nil)

// NewMetricsDatabase creates a new MySQL backend.
func NewMetricsDatabase(db *dsql.DB, log *zap.SugaredLogger, lqd, ltd time.Duration) (*MetricsDatabase, error) {
store, err := sql.NewDB(db, log.Desugar(), deadlockMsgs, lqd, ltd)
Expand Down Expand Up @@ -74,6 +76,14 @@ func (tx *MetricsDatabaseTx) ContractPruneMetrics(ctx context.Context, start tim
return ssql.ContractPruneMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) (metrics []api.ContractSetMetric, _ error) {
return ssql.ContractSetMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error {
return ssql.RecordContractPruneMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
return ssql.RecordContractSetMetric(ctx, tx, metrics...)
}
10 changes: 10 additions & 0 deletions stores/sql/sqlite/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type (
}
)

var _ ssql.MetricsDatabaseTx = (*MetricsDatabaseTx)(nil)

// NewSQLiteDatabase creates a new SQLite backend.
func NewMetricsDatabase(db *dsql.DB, log *zap.SugaredLogger, lqd, ltd time.Duration) (*MetricsDatabase, error) {
store, err := sql.NewDB(db, log.Desugar(), deadlockMsgs, lqd, ltd)
Expand Down Expand Up @@ -73,6 +75,14 @@ func (tx *MetricsDatabaseTx) ContractPruneMetrics(ctx context.Context, start tim
return ssql.ContractPruneMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) (metrics []api.ContractSetMetric, _ error) {
return ssql.ContractSetMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error {
return ssql.RecordContractPruneMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
return ssql.RecordContractSetMetric(ctx, tx, metrics...)
}

0 comments on commit d1510f1

Please sign in to comment.