Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose host errors in migration alerts #1526

Merged
merged 4 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ type (
Total uint64 `json:"total"`
}

// MigrateSlabResponse is the response type for the /slab/migrate endpoint.
MigrateSlabResponse struct {
NumShardsMigrated int `json:"numShardsMigrated"`
SurchargeApplied bool `json:"surchargeApplied,omitempty"`
Error string `json:"error,omitempty"`
}

// RHPFormResponse is the response type for the /rhp/form endpoint.
RHPFormResponse struct {
ContractID types.FileContractID `json:"contractID"`
Expand Down
71 changes: 5 additions & 66 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/object"
)

var (
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertMigrationID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertOngoingMigrationsID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
)

func (ap *Autopilot) RegisterAlert(ctx context.Context, a alerts.Alert) {
Expand Down Expand Up @@ -74,74 +73,14 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert {
}

return alerts.Alert{
ID: alertMigrationID,
ID: alertOngoingMigrationsID,
Severity: alerts.SeverityInfo,
Message: fmt.Sprintf("Migrating %d slabs", n),
Timestamp: time.Now(),
Data: data,
}
}

func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert {
return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityInfo,
Message: "Critical migration succeeded",
Data: map[string]interface{}{
"slabKey": slabKey.String(),
"hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads",
},
Timestamp: time.Now(),
}
}

func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
} else if health < 0.5 {
severity = alerts.SeverityWarning
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newRefreshHealthFailedAlert(err error) alerts.Alert {
return alerts.Alert{
ID: alertHealthRefreshID,
Expand Down
102 changes: 20 additions & 82 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package autopilot

import (
"context"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"

"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
Expand Down Expand Up @@ -49,20 +47,15 @@ type (
}
)

func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) {
func (j *job) execute(ctx context.Context, w Worker) (time.Duration, error) {
start := time.Now()
slab, err := j.b.Slab(ctx, j.EncryptionKey)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err)
return 0, fmt.Errorf("failed to fetch slab; %w", err)
}

res, err := w.MigrateSlab(ctx, slab, j.set)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err)
} else if res.Error != "" {
return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error))
}

return res, nil
err = w.MigrateSlab(ctx, slab, j.set)
return time.Since(start), err
}

func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator {
Expand Down Expand Up @@ -157,44 +150,20 @@ func (m *migrator) performMigrations(p *workerPool) {

// process jobs
for j := range jobs {
start := time.Now()
res, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds()))
if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, err)
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if !utils.IsErr(err, api.ErrSlabNotFound) {
// fetch all object IDs for the slab we failed to migrate
var objectIds map[string][]string
if res, err := m.objectIDsForSlabKey(ctx, j.EncryptionKey); err != nil {
m.logger.Errorf("failed to fetch object ids for slab key; %w", err)
} else {
objectIds = res
}

// register the alert
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err))
} else {
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err))
}
}
} else {
m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alerts.IDForSlab(alertMigrationID, j.EncryptionKey))
if res.SurchargeApplied {
// this alert confirms the user his gouging
// settings are working, it will be dismissed
// automatically the next time this slab is
// successfully migrated
m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.EncryptionKey))
duration, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(duration.Milliseconds()))
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if err != nil {
m.logger.Errorw("migration failed",
zap.Float64("health", j.Health),
zap.Stringer("slab", j.EncryptionKey),
zap.String("worker", id))
}
}
}(w)
Expand Down Expand Up @@ -263,8 +232,8 @@ func (m *migrator) performMigrations(p *workerPool) {
})
}

// unregister the migration alert when we're done
defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertMigrationID)
// unregister the ongoing migrations alert when we're done
defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertOngoingMigrationsID)

OUTER:
for {
Expand Down Expand Up @@ -312,34 +281,3 @@ OUTER:
return
}
}

func (m *migrator) objectIDsForSlabKey(ctx context.Context, key object.EncryptionKey) (map[string][]string, error) {
// fetch all buckets
//
// NOTE:at the time of writing the bus does not support fetching objects by
// slab key across all buckets at once, therefor we have to list all buckets
// and loop over them, revisit on the next major release
buckets, err := m.ap.bus.ListBuckets(ctx)
if err != nil {
return nil, fmt.Errorf("%w; failed to list buckets", err)
}

// fetch all objects per bucket
idsPerBucket := make(map[string][]string)
for _, bucket := range buckets {
res, err := m.ap.bus.ListObjects(ctx, "", api.ListObjectOptions{Bucket: bucket.Name, SlabEncryptionKey: key})
if err != nil {
m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err)
continue
} else if len(res.Objects) == 0 {
continue
}

idsPerBucket[bucket.Name] = make([]string, len(res.Objects))
for i, object := range res.Objects {
idsPerBucket[bucket.Name][i] = object.Key
}
}

return idsPerBucket, nil
}
2 changes: 1 addition & 1 deletion autopilot/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Worker interface {
Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error)
Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error)
ID(ctx context.Context) (string, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) error

RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error)
RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error)
Expand Down
14 changes: 7 additions & 7 deletions internal/test/e2e/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -138,18 +139,17 @@ func TestMigrations(t *testing.T) {
tt.OK(err)
for _, alert := range ress.Alerts {
// skip if not a migration alert
data, ok := alert.Data["objectIDs"].(map[string]interface{})
_, ok := alert.Data["objects"]
if !ok {
continue
}

// collect all object ids per bucket
for bucket, ids := range data {
if objectIDs, ok := ids.([]interface{}); ok {
for _, id := range objectIDs {
got[bucket] = append(got[bucket], id.(string))
}
}
var objects []api.ObjectMetadata
b, _ := json.Marshal(alert.Data["objects"])
_ = json.Unmarshal(b, &objects)
for _, object := range objects {
got[object.Bucket] = append(got[object.Bucket], object.Key)
}
}
if len(got) != 2 {
Expand Down
4 changes: 2 additions & 2 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2835,7 +2835,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s
INNER JOIN buckets b ON o.db_bucket_id = b.id
INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name
WHERE %s
GROUP BY d.id
GROUP BY d.id, o.db_bucket_id
`, col, strings.Join(markerExprsObj, " AND "), groupFn, col, tx.CharLengthExpr(), strings.Join(markerExprsDir, " AND ")), append(markerArgsObj, markerArgsDir...)...).Scan(dst)
if errors.Is(err, dsql.ErrNoRows) {
return api.ErrMarkerNotFound
Expand Down Expand Up @@ -2886,7 +2886,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s
FROM objects o
INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name %s
WHERE o.object_id LIKE ? AND SUBSTR(o.object_id, 1, ?) = ? AND d.db_parent_id = ?
GROUP BY d.id
GROUP BY d.id, o.db_bucket_id
) AS o
INNER JOIN buckets b ON b.id = o.db_bucket_id
%s
Expand Down
46 changes: 46 additions & 0 deletions worker/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"lukechampine.com/frand"
)

var (
alertMigrationID = alerts.RandomAlertID() // constant until restarted
)

func randomAlertID() types.Hash256 {
return frand.Entropy256()
}
Expand All @@ -30,6 +36,46 @@ func newDownloadFailedAlert(bucket, key string, offset, length, contracts int64,
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objects []api.ObjectMetadata, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously. It might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
}

if len(objects) > 0 {
data["objects"] = objects
}

hostErr := err
for errors.Unwrap(hostErr) != nil {
hostErr = errors.Unwrap(hostErr)
}
if set, ok := hostErr.(HostErrorSet); ok {
hostErrors := make(map[string]string, len(set))
for hk, err := range set {
hostErrors[hk.String()] = err.Error()
}
data["hosts"] = hostErrors
}

severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
} else if health < 0.5 {
severity = alerts.SeverityWarning
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newUploadFailedAlert(bucket, path, contractSet, mimeType string, minShards, totalShards, contracts int, packing, multipart bool, err error) alerts.Alert {
data := map[string]any{
"bucket": bucket,
Expand Down
4 changes: 3 additions & 1 deletion worker/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"go.sia.tech/renterd/alerts"
)

// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with a HostErrorSet error registers an alert with all the individual errors of any host in the payload.
// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with
// a HostErrorSet error registers an alert with all the individual errors of any
// host in the payload.
func TestUploadFailedAlertErrorSet(t *testing.T) {
hostErrSet := HostErrorSet{
types.PublicKey{1, 1, 1}: errors.New("test"),
Expand Down
Loading
Loading