Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update list object options, add slab encryption key and make bucket optional #1517

Merged
merged 14 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (

// ObjectMetadata contains various metadata about an object.
ObjectMetadata struct {
Bucket string `json:"bucket"`
ETag string `json:"eTag,omitempty"`
Health float64 `json:"health"`
ModTime TimeRFC3339 `json:"modTime"`
Expand Down Expand Up @@ -206,12 +207,14 @@ type (
}

ListObjectOptions struct {
Delimiter string
Limit int
Marker string
SortBy string
SortDir string
Substring string
Bucket string
Delimiter string
Limit int
Marker string
SortBy string
SortDir string
Substring string
SlabEncryptionKey object.EncryptionKey
}

// UploadObjectOptions is the options type for the worker client.
Expand Down Expand Up @@ -304,6 +307,9 @@ func (opts GetObjectOptions) Apply(values url.Values) {
}

func (opts ListObjectOptions) Apply(values url.Values) {
if opts.Bucket != "" {
values.Set("bucket", opts.Bucket)
}
if opts.Delimiter != "" {
values.Set("delimiter", opts.Delimiter)
}
Expand All @@ -322,6 +328,9 @@ func (opts ListObjectOptions) Apply(values url.Values) {
if opts.Substring != "" {
values.Set("substring", opts.Substring)
}
if opts.SlabEncryptionKey != (object.EncryptionKey{}) {
values.Set("slabEncryptionKey", opts.SlabEncryptionKey.String())
}
}

func FormatETag(eTag string) string {
Expand Down
7 changes: 0 additions & 7 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,6 @@ type (
Total uint64 `json:"total"`
}

// MigrateSlabResponse is the response type for the /slab/migrate endpoint.
MigrateSlabResponse struct {
NumShardsMigrated int `json:"numShardsMigrated"`
SurchargeApplied bool `json:"surchargeApplied,omitempty"`
Error string `json:"error,omitempty"`
}

// RHPFormResponse is the response type for the /rhp/form endpoint.
RHPFormResponse struct {
ContractID types.FileContractID `json:"contractID"`
Expand Down
71 changes: 5 additions & 66 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/object"
)

var (
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertMigrationID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertOngoingMigrationsID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
)

func (ap *Autopilot) RegisterAlert(ctx context.Context, a alerts.Alert) {
Expand Down Expand Up @@ -74,74 +73,14 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert {
}

return alerts.Alert{
ID: alertMigrationID,
ID: alertOngoingMigrationsID,
Severity: alerts.SeverityInfo,
Message: fmt.Sprintf("Migrating %d slabs", n),
Timestamp: time.Now(),
Data: data,
}
}

func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert {
return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityInfo,
Message: "Critical migration succeeded",
Data: map[string]interface{}{
"slabKey": slabKey.String(),
"hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads",
},
Timestamp: time.Now(),
}
}

func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
} else if health < 0.5 {
severity = alerts.SeverityWarning
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newRefreshHealthFailedAlert(err error) alerts.Alert {
return alerts.Alert{
ID: alertHealthRefreshID,
Expand Down
2 changes: 1 addition & 1 deletion autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Bus interface {
ListBuckets(ctx context.Context) ([]api.Bucket, error)

// objects
ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error)
ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error)
RefreshHealth(ctx context.Context) error
Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error)
SlabsForMigration(ctx context.Context, healthCutoff float64, set string, limit int) ([]api.UnhealthySlab, error)
Expand Down
102 changes: 20 additions & 82 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package autopilot

import (
"context"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"

"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
Expand Down Expand Up @@ -49,20 +47,15 @@ type (
}
)

func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) {
func (j *job) execute(ctx context.Context, w Worker) (time.Duration, error) {
start := time.Now()
slab, err := j.b.Slab(ctx, j.EncryptionKey)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err)
return 0, fmt.Errorf("failed to fetch slab; %w", err)
}

res, err := w.MigrateSlab(ctx, slab, j.set)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err)
} else if res.Error != "" {
return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error))
}

return res, nil
err = w.MigrateSlab(ctx, slab, j.set)
return time.Since(start), err
}

func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator {
Expand Down Expand Up @@ -157,44 +150,20 @@ func (m *migrator) performMigrations(p *workerPool) {

// process jobs
for j := range jobs {
start := time.Now()
res, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds()))
if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, err)
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if !utils.IsErr(err, api.ErrSlabNotFound) {
// fetch all object IDs for the slab we failed to migrate
var objectIds map[string][]string
if res, err := m.objectIDsForSlabKey(ctx, j.EncryptionKey); err != nil {
m.logger.Errorf("failed to fetch object ids for slab key; %w", err)
} else {
objectIds = res
}

// register the alert
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err))
} else {
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err))
}
}
} else {
m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alerts.IDForSlab(alertMigrationID, j.EncryptionKey))
if res.SurchargeApplied {
// this alert confirms the user his gouging
// settings are working, it will be dismissed
// automatically the next time this slab is
// successfully migrated
m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.EncryptionKey))
duration, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(duration.Milliseconds()))
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if err != nil {
m.logger.Errorw("migration failed",
zap.Float64("health", j.Health),
zap.Stringer("slab", j.EncryptionKey),
zap.String("worker", id))
}
}
}(w)
Expand Down Expand Up @@ -263,8 +232,8 @@ func (m *migrator) performMigrations(p *workerPool) {
})
}

// unregister the migration alert when we're done
defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertMigrationID)
// unregister the ongoing migrations alert when we're done
defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertOngoingMigrationsID)

OUTER:
for {
Expand Down Expand Up @@ -312,34 +281,3 @@ OUTER:
return
}
}

func (m *migrator) objectIDsForSlabKey(ctx context.Context, key object.EncryptionKey) (map[string][]string, error) {
// fetch all buckets
//
// NOTE:at the time of writing the bus does not support fetching objects by
// slab key across all buckets at once, therefor we have to list all buckets
// and loop over them, revisit on the next major release
buckets, err := m.ap.bus.ListBuckets(ctx)
if err != nil {
return nil, fmt.Errorf("%w; failed to list buckets", err)
}

// fetch all objects per bucket
idsPerBucket := make(map[string][]string)
for _, bucket := range buckets {
objects, err := m.ap.bus.ObjectsBySlabKey(ctx, bucket.Name, key)
if err != nil {
m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err)
continue
} else if len(objects) == 0 {
continue
}

idsPerBucket[bucket.Name] = make([]string, len(objects))
for i, object := range objects {
idsPerBucket[bucket.Name][i] = object.Key
}
}

return idsPerBucket, nil
}
2 changes: 1 addition & 1 deletion autopilot/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Worker interface {
Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error)
Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error)
ID(ctx context.Context) (string, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) error

RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error)
RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error)
Expand Down
4 changes: 1 addition & 3 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,9 @@ type (
UpdateBucketPolicy(ctx context.Context, bucketName string, policy api.BucketPolicy) error

CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error)
ListObjects(ctx context.Context, bucketName, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error)
ListObjects(ctx context.Context, bucketName, prefix, substring, delim, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (api.ObjectsListResponse, error)
Object(ctx context.Context, bucketName, key string) (api.Object, error)
ObjectMetadata(ctx context.Context, bucketName, key string) (api.Object, error)
ObjectsBySlabKey(ctx context.Context, bucketName string, slabKey object.EncryptionKey) ([]api.ObjectMetadata, error)
ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error)
RemoveObject(ctx context.Context, bucketName, key string) error
RemoveObjects(ctx context.Context, bucketName, prefix string) error
Expand Down Expand Up @@ -485,7 +484,6 @@ func (b *Bus) Handler() http.Handler {
"POST /slabs/partial": b.slabsPartialHandlerPOST,
"POST /slabs/refreshhealth": b.slabsRefreshHealthHandlerPOST,
"GET /slab/:key": b.slabHandlerGET,
"GET /slab/:key/objects": b.slabObjectsHandlerGET,
"PUT /slab": b.slabHandlerPUT,

"GET /state": b.stateHandlerGET,
Expand Down
13 changes: 2 additions & 11 deletions bus/client/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ func (c *Client) Object(ctx context.Context, bucket, key string, opts api.GetObj
return
}

// Objects lists objects in the given bucket.
func (c *Client) Objects(ctx context.Context, bucket string, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) {
// ListObjects lists objects in the given bucket.
func (c *Client) ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) {
values := url.Values{}
values.Set("bucket", bucket)
opts.Apply(values)

prefix = api.ObjectKeyEscape(prefix)
Expand All @@ -75,14 +74,6 @@ func (c *Client) Objects(ctx context.Context, bucket string, prefix string, opts
return
}

// ObjectsBySlabKey returns all objects that reference a given slab.
func (c *Client) ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error) {
values := url.Values{}
values.Set("bucket", bucket)
err = c.c.WithContext(ctx).GET(fmt.Sprintf("/slab/%v/objects?"+values.Encode(), key), &objects)
return
}

// ObjectsStats returns information about the number of objects and their size.
func (c *Client) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (osr api.ObjectsStatsResponse, err error) {
values := url.Values{}
Expand Down
Loading
Loading