Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate ContractPruneMetrics to raw SQL #1305

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 9 additions & 55 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
sql "go.sia.tech/renterd/stores/sql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand Down Expand Up @@ -150,27 +151,15 @@ func (s *SQLStore) ContractMetrics(ctx context.Context, start time.Time, n uint6
return resp, nil
}

func (s *SQLStore) ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error) {
metrics, err := s.contractPruneMetrics(ctx, start, n, interval, opts)
func (s *SQLStore) ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) (metrics []api.ContractPruneMetric, _ error) {
err := s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) (err error) {
metrics, err = tx.ContractPruneMetrics(ctx, start, n, interval, opts)
return
})
if err != nil {
return nil, err
}

resp := make([]api.ContractPruneMetric, len(metrics))
for i := range resp {
resp[i] = api.ContractPruneMetric{
Timestamp: api.TimeRFC3339(metrics[i].Timestamp),

ContractID: types.FileContractID(metrics[i].FCID),
HostKey: types.PublicKey(metrics[i].Host),
HostVersion: metrics[i].HostVersion,

Pruned: uint64(metrics[i].Pruned),
Remaining: uint64(metrics[i].Remaining),
Duration: metrics[i].Duration,
}
}
return resp, nil
return
}

func (s *SQLStore) ContractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]api.ContractSetChurnMetric, error) {
Expand Down Expand Up @@ -270,22 +259,8 @@ func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.Cont
}

func (s *SQLStore) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error {
dbMetrics := make([]dbContractPruneMetric, len(metrics))
for i, metric := range metrics {
dbMetrics[i] = dbContractPruneMetric{
Timestamp: unixTimeMS(metric.Timestamp),

FCID: fileContractID(metric.ContractID),
Host: publicKey(metric.HostKey),
HostVersion: metric.HostVersion,

Pruned: unsigned64(metric.Pruned),
Remaining: unsigned64(metric.Remaining),
Duration: metric.Duration,
}
}
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
return s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error {
return tx.RecordContractPruneMetric(ctx, metrics...)
})
}

Expand Down Expand Up @@ -465,27 +440,6 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6
return metrics, nil
}

func (s *SQLStore) contractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]dbContractPruneMetric, error) {
whereExpr := gorm.Expr("TRUE")
if opts.ContractID != (types.FileContractID{}) {
whereExpr = gorm.Expr("? AND fcid = ?", whereExpr, fileContractID(opts.ContractID))
}
if opts.HostKey != (types.PublicKey{}) {
whereExpr = gorm.Expr("? AND host = ?", whereExpr, publicKey(opts.HostKey))
}
if opts.HostVersion != "" {
whereExpr = gorm.Expr("? AND host_version = ?", whereExpr, opts.HostVersion)
}

var metrics []dbContractPruneMetric
err := s.findPeriods(ctx, dbContractPruneMetric{}.TableName(), &metrics, start, n, interval, whereExpr)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract metrics: %w", err)
}

return metrics, nil
}

func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetChurnMetricsQueryOpts) ([]dbContractSetChurnMetric, error) {
whereExpr := gorm.Expr("TRUE")
if opts.Name != "" {
Expand Down
5 changes: 3 additions & 2 deletions stores/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"lukechampine.com/frand"
Expand Down Expand Up @@ -92,8 +93,8 @@ func TestContractPruneMetrics(t *testing.T) {
t.Fatal("expected metrics to be sorted by time")
}
for _, m := range metrics {
if !cmp.Equal(m, fcid2Metric[m.ContractID], cmp.Comparer(api.CompareTimeRFC3339)) {
t.Fatal("unexpected metric", cmp.Diff(m, fcid2Metric[m.ContractID]))
if !cmp.Equal(m, fcid2Metric[m.ContractID], cmpopts.IgnoreUnexported(api.ContractPruneMetric{}), cmp.Comparer(api.CompareTimeRFC3339)) {
t.Fatal("unexpected metric", m, fcid2Metric[m.ContractID])
}
cmpFn(m)
}
Expand Down
22 changes: 19 additions & 3 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ type (
Database interface {
io.Closer

// Transaction starts a new transaction.
Transaction(ctx context.Context, fn func(DatabaseTx) error) error

// Migrate runs all missing migrations on the database.
Migrate(ctx context.Context) error

// Transaction starts a new transaction.
Transaction(ctx context.Context, fn func(DatabaseTx) error) error

// Version returns the database version and name.
Version(ctx context.Context) (string, string, error)
}
Expand Down Expand Up @@ -212,10 +212,26 @@ type (

MetricsDatabase interface {
io.Closer

// Migrate runs all missing migrations on the database.
Migrate(ctx context.Context) error

// Transaction starts a new transaction.
Transaction(ctx context.Context, fn func(MetricsDatabaseTx) error) error

// Version returns the database version and name.
Version(ctx context.Context) (string, string, error)
}

MetricsDatabaseTx interface {
// ContractPruneMetrics returns the contract prune metrics for the given
// time range and options.
ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error)

// RecordContractPruneMetric records a contract prune metric.
RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error
}

UsedContract struct {
ID int64
FCID FileContractID
Expand Down
138 changes: 138 additions & 0 deletions stores/sql/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package sql

import (
"context"
"fmt"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/sql"
)

func ContractPruneMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) (metrics []api.ContractPruneMetric, _ error) {
rows, err := queryPeriods(ctx, tx, start, n, interval, opts)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract metrics: %w", err)
}
defer rows.Close()

for rows.Next() {
var cpm api.ContractPruneMetric
var placeHolder int64
var placeHolderTime time.Time
var timestamp UnixTimeMS
if err := rows.Scan(&placeHolder, &placeHolderTime, &timestamp, (*FileContractID)(&cpm.ContractID), (*PublicKey)(&cpm.HostKey), &cpm.HostVersion, (*Unsigned64)(&cpm.Pruned), (*Unsigned64)(&cpm.Remaining), &cpm.Duration); err != nil {
return nil, fmt.Errorf("failed to scan contract prune metric: %w", err)
}
cpm.Timestamp = api.TimeRFC3339(timestamp)
metrics = append(metrics, cpm)
}

return metrics, nil
}

func RecordContractPruneMetric(ctx context.Context, tx sql.Tx, metrics ...api.ContractPruneMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO contract_prunes (created_at, timestamp, fcid, host, host_version, pruned, remaining, duration) VALUES (?, ?,?, ?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert contract prune metric: %w", err)
}
defer insertStmt.Close()

for _, metric := range metrics {
res, err := insertStmt.Exec(ctx,
time.Now().UTC(),
UnixTimeMS(metric.Timestamp),
FileContractID(metric.ContractID),
PublicKey(metric.HostKey),
metric.HostVersion,
Unsigned64(metric.Pruned),
Unsigned64(metric.Remaining),
metric.Duration,
)
if err != nil {
return fmt.Errorf("failed to insert contract prune metric: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n == 0 {
return fmt.Errorf("failed to insert contract prune metric: no rows affected")
}
}

return nil
}

func queryPeriods(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts interface{}) (*sql.LoggedRows, error) {
if n > api.MetricMaxIntervals {
return nil, api.ErrMaxIntervalsExceeded
}

params := []interface{}{
UnixTimeMS(start),
interval.Milliseconds(),
UnixTimeMS(start.Add(time.Duration(n) * interval)),
interval.Milliseconds(),
interval.Milliseconds(),
}

where, err := whereClauseFromQueryOpts(opts)
if err != nil {
return nil, fmt.Errorf("failed to build where clause: %w", err)
} else if len(where.params) > 0 {
params = append(params, where.params...)
}

return tx.Query(ctx, fmt.Sprintf(`
WITH RECURSIVE periods AS (
SELECT ? AS period_start
UNION ALL
SELECT period_start + ?
FROM periods
WHERE period_start < ? - ?
)
SELECT %s.* FROM %s
INNER JOIN (
SELECT
p.period_start as Period,
MIN(obj.id) AS id
FROM
periods p
INNER JOIN
%s obj ON obj.timestamp >= p.period_start AND obj.timestamp < p.period_start + ?
WHERE %s
GROUP BY
p.period_start
) i ON %s.id = i.id ORDER BY Period ASC
`, where.table, where.table, where.table, where.query, where.table), params...)
}

type whereClause struct {
table string
query string
params []interface{}
}

func whereClauseFromQueryOpts(opts interface{}) (where whereClause, _ error) {
where.query = "1=1"

switch opts := opts.(type) {
case api.ContractPruneMetricsQueryOpts:
where.table = "contract_prunes"
if opts.ContractID != (types.FileContractID{}) {
where.query += " AND fcid = ?"
where.params = append(where.params, FileContractID(opts.ContractID))
}
if opts.HostKey != (types.PublicKey{}) {
where.query += " AND host = ?"
where.params = append(where.params, PublicKey(opts.HostKey))
}
if opts.HostVersion != "" {
where.query += " AND host_version = ?"
where.params = append(where.params, opts.HostVersion)
}
default:
return whereClause{}, fmt.Errorf("unknown query opts type: %T", opts)
}

return
}
37 changes: 33 additions & 4 deletions stores/sql/mysql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,30 @@ package mysql

import (
"context"
"encoding/hex"
"time"

dsql "database/sql"

"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/sql"
ssql "go.sia.tech/renterd/stores/sql"
"lukechampine.com/frand"

"go.uber.org/zap"
)

type MetricsDatabase struct {
log *zap.SugaredLogger
db *sql.DB
}
type (
MetricsDatabase struct {
log *zap.SugaredLogger
db *sql.DB
}

MetricsDatabaseTx struct {
sql.Tx
log *zap.SugaredLogger
}
)

// NewMetricsDatabase creates a new MySQL backend.
func NewMetricsDatabase(db *dsql.DB, log *zap.SugaredLogger, lqd, ltd time.Duration) (*MetricsDatabase, error) {
Expand Down Expand Up @@ -45,6 +56,24 @@ func (b *MetricsDatabase) Migrate(ctx context.Context) error {
return sql.PerformMigrations(ctx, b, migrationsFs, "metrics", sql.MetricsMigrations(ctx, migrationsFs, b.log))
}

func (b *MetricsDatabase) Transaction(ctx context.Context, fn func(tx ssql.MetricsDatabaseTx) error) error {
return b.db.Transaction(ctx, func(tx sql.Tx) error {
return fn(b.wrapTxn(tx))
})
}

func (b *MetricsDatabase) Version(ctx context.Context) (string, string, error) {
return version(ctx, b.db)
}

func (b *MetricsDatabase) wrapTxn(tx sql.Tx) *MetricsDatabaseTx {
return &MetricsDatabaseTx{tx, b.log.Named(hex.EncodeToString(frand.Bytes(16)))}
}

func (tx *MetricsDatabaseTx) ContractPruneMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractPruneMetricsQueryOpts) ([]api.ContractPruneMetric, error) {
return ssql.ContractPruneMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error {
return ssql.RecordContractPruneMetric(ctx, tx, metrics...)
}
Loading
Loading