Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert NS timestamps to MS #1491

Merged
merged 12 commits into from
Sep 5, 2024
8 changes: 0 additions & 8 deletions api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ type (
Reason string
}

PerformanceMetric struct {
Action string `json:"action"`
HostKey types.PublicKey `json:"hostKey"`
Origin string `json:"origin"`
Duration time.Duration `json:"duration"`
Timestamp TimeRFC3339 `json:"timestamp"`
}

PerformanceMetricsQueryOpts struct {
Action string
HostKey types.PublicKey
Expand Down
6 changes: 6 additions & 0 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ var (
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00016_account_owner", log)
},
},
{
ID: "00017_unix_ms",
Migrate: func(tx Tx) error {
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00016_unix_ms", log)
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
},
},
}
}
MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
Expand Down
2 changes: 1 addition & 1 deletion internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2696,7 +2696,7 @@ func TestHostScan(t *testing.T) {
// fetch hosts again with the unix epoch timestamp which should only return
// 1 host since that one hasn't been scanned yet
toScan, err := b.HostsForScanning(context.Background(), api.HostsForScanningOptions{
MaxLastScan: api.TimeRFC3339(time.Unix(0, 1)),
MaxLastScan: api.TimeRFC3339(time.UnixMilli(1)),
})
tt.OK(err)
if len(toScan) != 1 {
Expand Down
8 changes: 4 additions & 4 deletions stores/hostdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func TestRecordScan(t *testing.T) {
t.Fatal(err)
}
if host.Interactions != (api.HostInteractions{}) {
t.Fatal("mismatch")
t.Fatal("mismatch", cmp.Diff(host.Interactions, api.HostInteractions{}))
}
if host.Settings != (rhpv2.HostSettings{}) {
t.Fatal("mismatch")
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestRecordScan(t *testing.T) {
// We expect no uptime or downtime from only a single scan.
uptime := time.Duration(0)
downtime := time.Duration(0)
if host.Interactions.LastScan.UnixNano() != firstScanTime.UnixNano() {
if host.Interactions.LastScan.UnixMilli() != firstScanTime.UnixMilli() {
t.Fatal("wrong time")
}
host.Interactions.LastScan = time.Time{}
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestRecordScan(t *testing.T) {
host, err = ss.Host(ctx, hk)
if err != nil {
t.Fatal(err)
} else if host.Interactions.LastScan.UnixNano() != secondScanTime.UnixNano() {
} else if host.Interactions.LastScan.UnixMilli() != secondScanTime.UnixMilli() {
t.Fatal("wrong time")
} else if time.Now().After(host.PriceTable.Expiry) {
t.Fatal("invalid expiry")
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestRecordScan(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if host.Interactions.LastScan.UnixNano() != thirdScanTime.UnixNano() {
if host.Interactions.LastScan.UnixMilli() != thirdScanTime.UnixMilli() {
t.Fatal("wrong time")
}
host.Interactions.LastScan = time.Time{}
Expand Down
14 changes: 0 additions & 14 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ func (s *SQLStore) ContractSetMetrics(ctx context.Context, start time.Time, n ui
return
}

func (s *SQLStore) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) (metrics []api.PerformanceMetric, err error) {
err = s.dbMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) (txErr error) {
metrics, txErr = tx.PerformanceMetrics(ctx, start, n, interval, opts)
return
})
return
}

func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.ContractMetric) error {
return s.dbMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error {
return tx.RecordContractMetric(ctx, metrics...)
Expand All @@ -72,12 +64,6 @@ func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.C
})
}

func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
return s.dbMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error {
return tx.RecordPerformanceMetric(ctx, metrics...)
})
}

func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error {
return s.dbMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error {
return tx.RecordWalletMetric(ctx, metrics...)
Expand Down
85 changes: 0 additions & 85 deletions stores/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,91 +431,6 @@ func TestNormaliseTimestamp(t *testing.T) {
}
}

func TestPerformanceMetrics(t *testing.T) {
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()

// Create metrics to query.
actions := []string{"download", "upload"}
hosts := []types.PublicKey{types.GeneratePrivateKey().PublicKey(), types.GeneratePrivateKey().PublicKey()}
origins := []string{"worker1", "worker2"}
durations := []time.Duration{time.Second, time.Hour}
times := []time.Time{time.UnixMilli(3), time.UnixMilli(1), time.UnixMilli(2)}
var i byte
for _, action := range actions {
for _, host := range hosts {
for _, origin := range origins {
for _, duration := range durations {
for _, recordedTime := range times {
if err := ss.RecordPerformanceMetric(context.Background(), api.PerformanceMetric{
Action: action,
Timestamp: api.TimeRFC3339(recordedTime),
Duration: duration,
HostKey: host,
Origin: origin,
}); err != nil {
t.Fatal(err)
}
i++
}
}
}
}
}

assertMetrics := func(start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts, expected int, cmp func(api.PerformanceMetric)) {
t.Helper()
metrics, err := ss.PerformanceMetrics(context.Background(), start, n, interval, opts)
if err != nil {
t.Fatal(err)
}
if len(metrics) != expected {
t.Fatalf("expected %v metrics, got %v", expected, len(metrics))
} else if !sort.SliceIsSorted(metrics, func(i, j int) bool {
return time.Time(metrics[i].Timestamp).Before(time.Time(metrics[j].Timestamp))
}) {
t.Fatal("expected metrics to be sorted by time")
}
for _, m := range metrics {
cmp(m)
}
}

// Query without any filters.
start := time.UnixMilli(1)
assertMetrics(start, 3, time.Millisecond, api.PerformanceMetricsQueryOpts{}, 3, func(m api.PerformanceMetric) {})

// Filter by actions.
assertMetrics(start, 3, time.Millisecond, api.PerformanceMetricsQueryOpts{Action: actions[0]}, 3, func(m api.PerformanceMetric) {
if m.Action != actions[0] {
t.Fatalf("expected action to be %v, got %v", actions[0], m.Action)
}
})

// Filter by hosts.
assertMetrics(start, 3, time.Millisecond, api.PerformanceMetricsQueryOpts{HostKey: hosts[0]}, 3, func(m api.PerformanceMetric) {
if m.HostKey != hosts[0] {
t.Fatalf("expected hosts to be %v, got %v", hosts[0], m.HostKey)
}
})

// Filter by reporters.
assertMetrics(start, 3, time.Millisecond, api.PerformanceMetricsQueryOpts{Origin: origins[0]}, 3, func(m api.PerformanceMetric) {
if m.Origin != origins[0] {
t.Fatalf("expected origin to be %v, got %v", origins[0], m.Origin)
}
})

// Prune metrics
if err := ss.PruneMetrics(context.Background(), api.MetricPerformance, time.UnixMilli(3)); err != nil {
t.Fatal(err)
} else if metrics, err := ss.PerformanceMetrics(context.Background(), time.UnixMilli(1), 3, time.Millisecond, api.PerformanceMetricsQueryOpts{}); err != nil {
t.Fatal(err)
} else if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %v", len(metrics))
}
}

func TestWalletMetrics(t *testing.T) {
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()
Expand Down
6 changes: 0 additions & 6 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,6 @@ type (
// time range and options.
ContractSetMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.ContractSetMetricsQueryOpts) ([]api.ContractSetMetric, error)

// PerformanceMetrics returns performance metrics for the given time range
PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error)

// PruneMetrics deletes metrics of a certain type older than the given
// cutoff time.
PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error
Expand All @@ -441,9 +438,6 @@ type (
// RecordContractSetMetric records contract set metrics.
RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error

// RecordPerformanceMetric records performance metrics.
RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error

// RecordWalletMetric records wallet metrics.
RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error

Expand Down
14 changes: 7 additions & 7 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func HostsForScanning(ctx context.Context, tx sql.Tx, maxLastScan time.Time, off
}

rows, err := tx.Query(ctx, "SELECT public_key, net_address FROM hosts WHERE last_scan < ? LIMIT ? OFFSET ?",
maxLastScan.UnixNano(), limit, offset)
UnixTimeMS(maxLastScan), limit, offset)
if err != nil {
return nil, fmt.Errorf("failed to fetch hosts for scanning: %w", err)
}
Expand Down Expand Up @@ -1751,7 +1751,7 @@ func RecordHostScans(ctx context.Context, tx sql.Tx, scans []api.HostScan) error

now := time.Now()
for _, scan := range scans {
scanTime := scan.Timestamp.UnixNano()
scanTime := scan.Timestamp.UnixMilli()
_, err = stmt.Exec(ctx,
scan.Success, // scanned
scan.Success, // last_scan_success
Expand Down Expand Up @@ -1827,7 +1827,7 @@ func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64
FROM contracts
INNER JOIN hosts h ON h.id = contracts.host_id
WHERE recent_downtime >= ? AND recent_scan_failures >= ?
`, maxDownTime, minRecentFailures)
`, DurationMS(maxDownTime), minRecentFailures)
if err != nil {
return 0, fmt.Errorf("failed to fetch contracts: %w", err)
}
Expand All @@ -1851,7 +1851,7 @@ func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64

// delete hosts
res, err := tx.Exec(ctx, "DELETE FROM hosts WHERE recent_downtime >= ? AND recent_scan_failures >= ?",
maxDownTime, minRecentFailures)
DurationMS(maxDownTime), minRecentFailures)
if err != nil {
return 0, fmt.Errorf("failed to delete hosts: %w", err)
}
Expand Down Expand Up @@ -2134,8 +2134,8 @@ func SearchHosts(ctx context.Context, tx sql.Tx, autopilot, filterMode, usabilit
var resolvedAddresses string
err := rows.Scan(&hostID, &h.KnownSince, &h.LastAnnouncement, (*PublicKey)(&h.PublicKey),
&h.NetAddress, (*PriceTable)(&h.PriceTable.HostPriceTable), &pte,
(*HostSettings)(&h.Settings), &h.Interactions.TotalScans, (*UnixTimeNS)(&h.Interactions.LastScan), &h.Interactions.LastScanSuccess,
&h.Interactions.SecondToLastScanSuccess, &h.Interactions.Uptime, &h.Interactions.Downtime,
(*HostSettings)(&h.Settings), &h.Interactions.TotalScans, (*UnixTimeMS)(&h.Interactions.LastScan), &h.Interactions.LastScanSuccess,
&h.Interactions.SecondToLastScanSuccess, (*DurationMS)(&h.Interactions.Uptime), (*DurationMS)(&h.Interactions.Downtime),
&h.Interactions.SuccessfulInteractions, &h.Interactions.FailedInteractions, &h.Interactions.LostSectors,
&h.Scanned, &resolvedAddresses, &h.Blocked,
)
Expand Down Expand Up @@ -2547,7 +2547,7 @@ func scanWalletEvent(s Scanner) (wallet.Event, error) {
var inflow, outflow Currency
var edata []byte
var etype string
var ts UnixTimeNS
var ts UnixTimeMS
if err := s.Scan(
&eventID,
&blockID,
Expand Down
55 changes: 2 additions & 53 deletions stores/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func ContractPruneMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uin
&m.HostVersion,
(*Unsigned64)(&m.Pruned),
(*Unsigned64)(&m.Remaining),
&m.Duration,
(*DurationMS)(&m.Duration),
)
if err != nil {
err = fmt.Errorf("failed to scan contract prune metric: %w", err)
Expand Down Expand Up @@ -144,29 +144,6 @@ func ContractSetMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint6
})
}

func PerformanceMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) {
return queryPeriods(ctx, tx, start, n, interval, opts, func(rows *sql.LoggedRows) (m api.PerformanceMetric, err error) {
var placeHolder int64
var placeHolderTime time.Time
var timestamp UnixTimeMS
err = rows.Scan(
&placeHolder,
&placeHolderTime,
&timestamp,
&m.Action,
(*PublicKey)(&m.HostKey),
&m.Origin,
&m.Duration,
)
if err != nil {
err = fmt.Errorf("failed to scan contract set metric: %w", err)
return
}
m.Timestamp = api.TimeRFC3339(normaliseTimestamp(start, interval, timestamp))
return
})
}

func PruneMetrics(ctx context.Context, tx sql.Tx, metric string, cutoff time.Time) error {
if metric == "" {
return errors.New("metric must be set")
Expand Down Expand Up @@ -269,7 +246,7 @@ func RecordContractPruneMetric(ctx context.Context, tx sql.Tx, metrics ...api.Co
metric.HostVersion,
Unsigned64(metric.Pruned),
Unsigned64(metric.Remaining),
metric.Duration,
(DurationMS)(metric.Duration),
)
if err != nil {
return fmt.Errorf("failed to insert contract prune metric: %w", err)
Expand Down Expand Up @@ -337,34 +314,6 @@ func RecordContractSetMetric(ctx context.Context, tx sql.Tx, metrics ...api.Cont
return nil
}

func RecordPerformanceMetric(ctx context.Context, tx sql.Tx, metrics ...api.PerformanceMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO performance (created_at, timestamp, action, host, origin, duration) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert performance metric: %w", err)
}
defer insertStmt.Close()

for _, metric := range metrics {
res, err := insertStmt.Exec(ctx,
time.Now().UTC(),
UnixTimeMS(metric.Timestamp),
metric.Action,
PublicKey(metric.HostKey),
metric.Origin,
metric.Duration,
)
if err != nil {
return fmt.Errorf("failed to insert performance metric: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n == 0 {
return fmt.Errorf("failed to insert performance metric: no rows affected")
}
}

return nil
}

func RecordWalletMetric(ctx context.Context, tx sql.Tx, metrics ...api.WalletMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO wallets (created_at, timestamp, confirmed_lo, confirmed_hi, spendable_lo, spendable_hi, unconfirmed_lo, unconfirmed_hi, immature_hi, immature_lo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion stores/sql/mysql/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c chainUpdateTx) WalletApplyIndex(index types.ChainIndex, created, spent [
e.Type,
data,
e.MaturityHeight,
ssql.UnixTimeNS(e.Timestamp),
ssql.UnixTimeMS(e.Timestamp),
); err != nil {
return fmt.Errorf("failed to insert new event: %w", err)
}
Expand Down
8 changes: 0 additions & 8 deletions stores/sql/mysql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ func (tx *MetricsDatabaseTx) ContractSetMetrics(ctx context.Context, start time.
return ssql.ContractSetMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) PerformanceMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.PerformanceMetricsQueryOpts) ([]api.PerformanceMetric, error) {
return ssql.PerformanceMetrics(ctx, tx, start, n, interval, opts)
}

func (tx *MetricsDatabaseTx) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error {
return ssql.PruneMetrics(ctx, tx, metric, cutoff)
}
Expand All @@ -113,10 +109,6 @@ func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metric
return ssql.RecordContractSetMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
return ssql.RecordPerformanceMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error {
return ssql.RecordWalletMetric(ctx, tx, metrics...)
}
Expand Down
4 changes: 4 additions & 0 deletions stores/sql/mysql/migrations/main/migration_00017_unix_ms.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
UPDATE hosts SET hosts.last_scan = hosts.last_scan / 1000000;
UPDATE hosts SET hosts.uptime = hosts.uptime / 1000000;
UPDATE hosts SET hosts.downtime = hosts.downtime / 1000000;
UPDATE wallet_events SET wallet_events.timestamp = wallet_events.timestamp / 1000000;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
UPDATE contract_prunes SET contract_prunes.timestamp = contract_prunes.timestamp / 1000000;
DROP TABLE IF EXISTS performance;
Loading
Loading