Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate ContractSetMetrcis to raw SQL #1306

Merged
merged 6 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 8 additions & 38 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,15 @@ func (s *SQLStore) ContractSetChurnMetrics(ctx context.Context, start time.Time,
return resp, nil
}

func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error) {
metrics, err := s.contractSetMetrics(ctx, start, n, interval, opts)
func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) (metrics []api.ContractSetMetric, _ error) {
err := s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) (err error) {
metrics, err = tx.ContractSetMetrics(ctx, start, n, interval, opts)
return
})
if err != nil {
return nil, err
}
resp := make([]api.ContractSetMetric, len(metrics))
for i := range resp {
resp[i] = api.ContractSetMetric{
Contracts: metrics[i].Contracts,
Name: metrics[i].Name,
Timestamp: api.TimeRFC3339(time.Time(metrics[i].Timestamp).UTC()),
}
}
return resp, nil
return
}

func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) {
Expand Down Expand Up @@ -281,16 +276,8 @@ func (s *SQLStore) RecordContractSetChurnMetric(ctx context.Context, metrics ...
}

func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
dbMetrics := make([]dbContractSetMetric, len(metrics))
for i, metric := range metrics {
dbMetrics[i] = dbContractSetMetric{
Contracts: metric.Contracts,
Name: metric.Name,
Timestamp: unixTimeMS(metric.Timestamp),
}
}
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
return s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error {
return tx.RecordContractSetMetric(ctx, metrics...)
})
}

Expand Down Expand Up @@ -462,23 +449,6 @@ func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time,
return metrics, nil
}

func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]dbContractSetMetric, error) {
whereExpr := gorm.Expr("TRUE")
if opts.Name != "" {
whereExpr = gorm.Expr("name = ?", opts.Name)
}

var metrics []dbContractSetMetric
err := s.findPeriods(ctx, dbContractSetMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err)
}
for i, m := range metrics {
metrics[i].Timestamp = normaliseTimestamp(start, interval, m.Timestamp)
}
return metrics, nil
}

func normaliseTimestamp(start time.Time, interval time.Duration, t unixTimeMS) unixTimeMS {
startMS := start.UnixMilli()
toNormaliseMS := time.Time(t).UnixMilli()
Expand Down
2 changes: 1 addition & 1 deletion stores/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestContractSetMetrics(t *testing.T) {
} else if m := metrics[0]; m.Contracts != 0 {
t.Fatalf("expected 0 contracts, got %v", m.Contracts)
} else if ti := time.Time(m.Timestamp); !ti.Equal(testStart) {
t.Fatal("expected time to match start time")
t.Fatalf("expected time to match start time, %v != %v", ti, testStart)
} else if m.Name != testContractSet {
t.Fatalf("expected name to be %v, got %v", testContractSet, m.Name)
}
Expand Down
7 changes: 7 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,15 @@ type (
// time range and options.
ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error)

// ContractSetMetrics returns the contract set metrics for the given
// time range and options.
ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error)

// RecordContractPruneMetric records a contract prune metric.
RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error

// RecordContractSetMetric records a contract set metric.
RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error
}

UsedContract struct {
Expand Down
68 changes: 67 additions & 1 deletion stores/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func ContractPruneMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) (metrics []api.ContractPruneMetric, _ error) {
rows, err := queryPeriods(ctx, tx, start, n, interval, opts)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract metrics: %w", err)
return nil, fmt.Errorf("failed to fetch contract prune metrics: %w", err)
}
defer rows.Close()

Expand All @@ -32,6 +32,29 @@ func ContractPruneMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uin
return metrics, nil
}

func ContractSetMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) (metrics []api.ContractSetMetric, _ error) {
rows, err := queryPeriods(ctx, tx, start, n, interval, opts)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err)
}
defer rows.Close()

for rows.Next() {
var csm api.ContractSetMetric
var placeHolder int64
var placeHolderTime time.Time
var timestamp UnixTimeMS
if err := rows.Scan(&placeHolder, &placeHolderTime, &timestamp, &csm.Name, &csm.Contracts); err != nil {
return nil, fmt.Errorf("failed to scan contract set metric: %w", err)
}

csm.Timestamp = api.TimeRFC3339(normaliseTimestamp(start, interval, timestamp))
metrics = append(metrics, csm)
}

return metrics, nil
}

func RecordContractPruneMetric(ctx context.Context, tx sql.Tx, metrics ...api.ContractPruneMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO contract_prunes (created_at, timestamp, fcid, host, host_version, pruned, remaining, duration) VALUES (?, ?,?, ?, ?, ?, ?, ?)")
if err != nil {
Expand Down Expand Up @@ -62,6 +85,32 @@ func RecordContractPruneMetric(ctx context.Context, tx sql.Tx, metrics ...api.Co
return nil
}

func RecordContractSetMetric(ctx context.Context, tx sql.Tx, metrics ...api.ContractSetMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO contract_sets (created_at, timestamp, name, contracts) VALUES (?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert contract set metric: %w", err)
}
defer insertStmt.Close()

for _, metric := range metrics {
res, err := insertStmt.Exec(ctx,
time.Now().UTC(),
UnixTimeMS(metric.Timestamp),
metric.Name,
metric.Contracts,
)
if err != nil {
return fmt.Errorf("failed to insert contract set metric: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n == 0 {
return fmt.Errorf("failed to insert contract set metric: no rows affected")
}
}

return nil
}

func queryPeriods(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts interface{}) (*sql.LoggedRows, error) {
if n > api.MetricMaxIntervals {
return nil, api.ErrMaxIntervalsExceeded
Expand Down Expand Up @@ -130,9 +179,26 @@ func whereClauseFromQueryOpts(opts interface{}) (where whereClause, _ error) {
where.query += " AND host_version = ?"
where.params = append(where.params, opts.HostVersion)
}
case api.ContractSetMetricsQueryOpts:
where.table = "contract_sets"
if opts.Name != "" {
where.query += " AND name = ?"
where.params = append(where.params, opts.Name)
}
default:
return whereClause{}, fmt.Errorf("unknown query opts type: %T", opts)
}

return
}

func normaliseTimestamp(start time.Time, interval time.Duration, t UnixTimeMS) UnixTimeMS {
startMS := start.UnixMilli()
toNormaliseMS := time.Time(t).UnixMilli()
intervalMS := interval.Milliseconds()
if startMS > toNormaliseMS {
return UnixTimeMS(start)
}
normalizedMS := (toNormaliseMS-startMS)/intervalMS*intervalMS + start.UnixMilli()
return UnixTimeMS(time.UnixMilli(normalizedMS))
}
10 changes: 10 additions & 0 deletions stores/sql/mysql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type (
}
)

var _ ssql.MetricsDatabaseTx = (*MetricsDatabaseTx)(nil)

// NewMetricsDatabase creates a new MySQL backend.
func NewMetricsDatabase(db *dsql.DB, log *zap.SugaredLogger, lqd, ltd time.Duration) (*MetricsDatabase, error) {
store, err := sql.NewDB(db, log.Desugar(), deadlockMsgs, lqd, ltd)
Expand Down Expand Up @@ -74,6 +76,14 @@ func (tx *MetricsDatabaseTx) ContractPruneMetrics(ctx context.Context, start tim
return ssql.ContractPruneMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) (metrics []api.ContractSetMetric, _ error) {
return ssql.ContractSetMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error {
return ssql.RecordContractPruneMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
return ssql.RecordContractSetMetric(ctx, tx, metrics...)
}
10 changes: 10 additions & 0 deletions stores/sql/sqlite/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type (
}
)

var _ ssql.MetricsDatabaseTx = (*MetricsDatabaseTx)(nil)

// NewSQLiteDatabase creates a new SQLite backend.
func NewMetricsDatabase(db *dsql.DB, log *zap.SugaredLogger, lqd, ltd time.Duration) (*MetricsDatabase, error) {
store, err := sql.NewDB(db, log.Desugar(), deadlockMsgs, lqd, ltd)
Expand Down Expand Up @@ -73,6 +75,14 @@ func (tx *MetricsDatabaseTx) ContractPruneMetrics(ctx context.Context, start tim
return ssql.ContractPruneMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) (metrics []api.ContractSetMetric, _ error) {
return ssql.ContractSetMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error {
return ssql.RecordContractPruneMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
return ssql.RecordContractSetMetric(ctx, tx, metrics...)
}
Loading