Skip to content

Commit

Permalink
worker: list host errors in migration alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Sep 12, 2024
1 parent b8fe3c2 commit f997ad0
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 182 deletions.
7 changes: 0 additions & 7 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ type (
Total uint64 `json:"total"`
}

// MigrateSlabResponse is the response type for the /slab/migrate endpoint.
MigrateSlabResponse struct {
NumShardsMigrated int `json:"numShardsMigrated"`
SurchargeApplied bool `json:"surchargeApplied,omitempty"`
Error string `json:"error,omitempty"`
}

// RHPFormResponse is the response type for the /rhp/form endpoint.
RHPFormResponse struct {
ContractID types.FileContractID `json:"contractID"`
Expand Down
71 changes: 5 additions & 66 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/object"
)

var (
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertMigrationID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertOngoingMigrationsID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
)

func (ap *Autopilot) RegisterAlert(ctx context.Context, a alerts.Alert) {
Expand Down Expand Up @@ -74,74 +73,14 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert {
}

return alerts.Alert{
ID: alertMigrationID,
ID: alertOngoingMigrationsID,
Severity: alerts.SeverityInfo,
Message: fmt.Sprintf("Migrating %d slabs", n),
Timestamp: time.Now(),
Data: data,
}
}

func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert {
return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityInfo,
Message: "Critical migration succeeded",
Data: map[string]interface{}{
"slabKey": slabKey.String(),
"hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads",
},
Timestamp: time.Now(),
}
}

func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
} else if health < 0.5 {
severity = alerts.SeverityWarning
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newRefreshHealthFailedAlert(err error) alerts.Alert {
return alerts.Alert{
ID: alertHealthRefreshID,
Expand Down
73 changes: 20 additions & 53 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package autopilot

import (
"context"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"

"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
Expand Down Expand Up @@ -49,20 +47,15 @@ type (
}
)

func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) {
func (j *job) execute(ctx context.Context, w Worker) (time.Duration, error) {
start := time.Now()
slab, err := j.b.Slab(ctx, j.Key)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err)
return 0, fmt.Errorf("failed to fetch slab; %w", err)
}

res, err := w.MigrateSlab(ctx, slab, j.set)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err)
} else if res.Error != "" {
return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error))
}

return res, nil
err = w.MigrateSlab(ctx, slab, j.set)
return time.Since(start), err
}

func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator {
Expand Down Expand Up @@ -157,46 +150,20 @@ func (m *migrator) performMigrations(p *workerPool) {

// process jobs
for j := range jobs {
start := time.Now()
res, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds()))
if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err)
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if !utils.IsErr(err, api.ErrSlabNotFound) {
// list objects for this key
objectIds := make(map[string][]string)
if res, err := m.ap.bus.ListObjects(ctx, api.ListObjectOptions{SlabKey: &j.Key}); err != nil {
m.logger.Errorf("failed to list objects for slab key; %w", err)
} else {
for _, obj := range res.Objects {
objectIds[obj.Bucket] = append(objectIds[obj.Bucket], obj.Name)
}
}

// register the alert
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, objectIds, err))
} else {
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, objectIds, err))
}
}
} else {
m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alerts.IDForSlab(alertMigrationID, j.Key))
if res.SurchargeApplied {
// this alert confirms the user his gouging
// settings are working, it will be dismissed
// automatically the next time this slab is
// successfully migrated
m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.Key))
duration, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(duration.Milliseconds()))
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if err != nil {
m.logger.Errorw("migration failed",
zap.Float64("health", j.Health),
zap.Stringer("slab", j.Key),
zap.String("worker", id))
}
}
}(w)
Expand Down Expand Up @@ -265,8 +232,8 @@ func (m *migrator) performMigrations(p *workerPool) {
})
}

// unregister the migration alert when we're done
defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertMigrationID)
// unregister the ongoing migrations alert when we're done
defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertOngoingMigrationsID)

OUTER:
for {
Expand Down
2 changes: 1 addition & 1 deletion autopilot/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Worker interface {
Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error)
Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error)
ID(ctx context.Context) (string, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) error

RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error)
RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error)
Expand Down
46 changes: 46 additions & 0 deletions worker/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"lukechampine.com/frand"
)

var (
alertMigrationID = alerts.RandomAlertID() // constant until restarted
)

func randomAlertID() types.Hash256 {
return frand.Entropy256()
}
Expand All @@ -30,6 +36,46 @@ func newDownloadFailedAlert(bucket, path string, offset, length, contracts int64
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objects []api.ObjectMetadata, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously. It might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
}

if len(objects) > 0 {
data["objects"] = objects
}

hostErr := err
for errors.Unwrap(hostErr) != nil {
hostErr = errors.Unwrap(hostErr)
}
if set, ok := hostErr.(HostErrorSet); ok {
hostErrors := make(map[string]string, len(set))
for hk, err := range set {
hostErrors[hk.String()] = err.Error()
}
data["hosts"] = hostErrors
}

severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
} else if health < 0.5 {
severity = alerts.SeverityWarning
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newUploadFailedAlert(bucket, path, contractSet, mimeType string, minShards, totalShards, contracts int, packing, multipart bool, err error) alerts.Alert {
data := map[string]any{
"bucket": bucket,
Expand Down
4 changes: 3 additions & 1 deletion worker/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"go.sia.tech/renterd/alerts"
)

// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with a HostErrorSet error registers an alert with all the individual errors of any host in the payload.
// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with
// a HostErrorSet error registers an alert with all the individual errors of any
// host in the payload.
func TestUploadFailedAlertErrorSet(t *testing.T) {
hostErrSet := HostErrorSet{
types.PublicKey{1, 1, 1}: errors.New("test"),
Expand Down
5 changes: 2 additions & 3 deletions worker/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,10 @@ func (c *Client) Memory(ctx context.Context) (resp api.MemoryResponse, err error
}

// MigrateSlab migrates the specified slab.
func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) (res api.MigrateSlabResponse, err error) {
func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) error {
values := make(url.Values)
values.Set("contractset", set)
err = c.c.WithContext(ctx).POST("/slab/migrate?"+values.Encode(), slab, &res)
return
return c.c.WithContext(ctx).POST("/slab/migrate?"+values.Encode(), slab, nil)
}

// ObjectEntries returns the entries at the given path, which must end in /.
Expand Down
Loading

0 comments on commit f997ad0

Please sign in to comment.