diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 9f0a5b9f5..25f3ada8d 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -258,12 +258,18 @@ func (ap *Autopilot) Run() error { } // perform maintenance - err = ap.c.performContractMaintenance(ctx, w) + setChanged, err := ap.c.performContractMaintenance(ctx, w) if err != nil { ap.logger.Errorf("contract maintenance failed, err: %v", err) } maintenanceSuccess := err == nil + // upon success, notify the migrator. The health of slabs might have + // changed. + if maintenanceSuccess && setChanged { + ap.m.SignalMaintenanceFinished() + } + // launch account refills after successful contract maintenance. if maintenanceSuccess { launchAccountRefillsOnce.Do(func() { diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 2c8de4e72..7a4d0b922 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -98,12 +98,12 @@ func newContractor(ap *Autopilot) *contractor { } } -func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) error { +func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) (bool, error) { ctx, span := tracing.Tracer.Start(ctx, "contractor.performContractMaintenance") defer span.End() if c.ap.isStopped() || !c.ap.isSynced() { - return nil // skip contract maintenance if we're not synced + return false, nil // skip contract maintenance if we're not synced } c.logger.Info("performing contract maintenance") @@ -117,31 +117,31 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e // not zero in several places if state.cfg.Contracts.Amount == 0 { c.logger.Warn("contracts is set to zero, skipping contract maintenance") - return nil + return false, nil } // no maintenance if no allowance was set if state.cfg.Contracts.Allowance.IsZero() { c.logger.Warn("allowance is set to zero, skipping contract maintenance") - return nil + return false, nil } // no maintenance if no period was set if state.cfg.Contracts.Period == 0 { c.logger.Warn("period is set to zero, skipping contract maintenance") - return nil + return false, nil } // fetch our wallet address address, err := c.ap.bus.WalletAddress(ctx) if err != nil { - return err + return false, err } // fetch current contract set currentSet, err := c.ap.bus.ContractSetContracts(ctx, state.cfg.Contracts.Set) if err != nil && !strings.Contains(err.Error(), api.ErrContractSetNotFound.Error()) { - return err + return false, err } isInCurrentSet := make(map[types.FileContractID]struct{}) for _, c := range currentSet { @@ -153,7 +153,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e start := time.Now() resp, err := w.Contracts(ctx, timeoutHostRevision) if err != nil { - return err + return false, err } if resp.Error != "" { c.logger.Error(resp.Error) @@ -183,7 +183,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e // fetch all hosts hosts, err := c.ap.bus.Hosts(ctx, 0, -1) if err != nil { - return err + return false, err } // min score to pass checks. @@ -191,7 +191,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e if len(hosts) > 0 { minScore, err = c.managedFindMinAllowedHostScores(ctx, w, hosts, hostData) if err != nil { - return fmt.Errorf("failed to determine min score for contract check: %w", err) + return false, fmt.Errorf("failed to determine min score for contract check: %w", err) } } else { c.logger.Warn("could not calculate min score, no hosts found") @@ -222,7 +222,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e // run checks updatedSet, toArchive, toStopUsing, toRefresh, toRenew, err := c.runContractChecks(ctx, w, contracts, minScore) if err != nil { - return fmt.Errorf("failed to run contract checks, err: %v", err) + return false, fmt.Errorf("failed to run contract checks, err: %v", err) } // archive contracts @@ -236,7 +236,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e // calculate remaining funds remaining, err := c.remainingFunds(contracts) if err != nil { - return err + return false, err } // calculate 'limit' amount of contracts we want to renew @@ -308,19 +308,18 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e // update contract set if c.ap.isStopped() { - return errors.New("autopilot stopped before maintenance could be completed") + return false, errors.New("autopilot stopped before maintenance could be completed") } err = c.ap.bus.SetContractSet(ctx, state.cfg.Contracts.Set, updatedSet) if err != nil { - return err + return false, err } - // log the contract set after maintenance - c.logContractSetUpdate(currentSet, updatedSet, formed, refreshed, renewed, toStopUsing, contractData) - return nil + // return whether the maintenance changed the contract set + return c.computeContractSetChanged(currentSet, updatedSet, formed, refreshed, renewed, toStopUsing, contractData), nil } -func (c *contractor) logContractSetUpdate(oldSet []api.ContractMetadata, newSet, formed []types.FileContractID, refreshed, renewed []renewal, toStopUsing map[types.FileContractID]string, contractData map[types.FileContractID]uint64) { +func (c *contractor) computeContractSetChanged(oldSet []api.ContractMetadata, newSet, formed []types.FileContractID, refreshed, renewed []renewal, toStopUsing map[types.FileContractID]string, contractData map[types.FileContractID]uint64) bool { // build some maps for easier lookups previous := make(map[types.FileContractID]struct{}) for _, c := range oldSet { @@ -385,6 +384,7 @@ func (c *contractor) logContractSetUpdate(oldSet []api.ContractMetadata, newSet, "added", len(added), "removed", len(removed), ) + return len(added)+len(removed) > 0 } func (c *contractor) performWalletMaintenance(ctx context.Context) error { diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 785cb71c8..a92ce26d7 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -3,9 +3,11 @@ package autopilot import ( "context" "math" + "sort" "sync" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/object" "go.sia.tech/renterd/tracing" "go.uber.org/zap" ) @@ -15,9 +17,10 @@ const ( ) type migrator struct { - ap *Autopilot - logger *zap.SugaredLogger - healthCutoff float64 + ap *Autopilot + logger *zap.SugaredLogger + healthCutoff float64 + signalMaintenanceFinished chan struct{} mu sync.Mutex running bool @@ -25,9 +28,17 @@ type migrator struct { func newMigrator(ap *Autopilot, healthCutoff float64) *migrator { return &migrator{ - ap: ap, - logger: ap.logger.Named("migrator"), - healthCutoff: healthCutoff, + ap: ap, + logger: ap.logger.Named("migrator"), + healthCutoff: healthCutoff, + signalMaintenanceFinished: make(chan struct{}, 1), + } +} + +func (m *migrator) SignalMaintenanceFinished() { + select { + case m.signalMaintenanceFinished <- struct{}{}: + default: } } @@ -56,23 +67,11 @@ func (m *migrator) performMigrations(p *workerPool, cfg api.AutopilotConfig) { ctx, span := tracing.Tracer.Start(context.Background(), "migrator.performMigrations") defer span.End() - // fetch slabs for migration - toMigrate, err := b.SlabsForMigration(ctx, m.healthCutoff, cfg.Contracts.Set, migratorBatchSize) - if err != nil { - m.logger.Errorf("failed to fetch slabs for migration, err: %v", err) - return - } - m.logger.Debugf("%d slabs to migrate", len(toMigrate)) - - // return if there are no slabs to migrate - if len(toMigrate) == 0 { - return - } - // prepare a channel to push work to the workers type job struct { api.UnhealthySlab - slabIdx int + slabIdx int + batchSize int } jobs := make(chan job) var wg sync.WaitGroup @@ -81,6 +80,7 @@ func (m *migrator) performMigrations(p *workerPool, cfg api.AutopilotConfig) { wg.Wait() }() + // launch workers p.withWorkers(func(workers []Worker) { for _, w := range workers { wg.Add(1) @@ -96,26 +96,80 @@ func (m *migrator) performMigrations(p *workerPool, cfg api.AutopilotConfig) { for j := range jobs { slab, err := b.Slab(ctx, j.Key) if err != nil { - m.logger.Errorf("%v: failed to fetch slab for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, len(toMigrate), j.Health, err) + m.logger.Errorf("%v: failed to fetch slab for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err) continue } err = w.MigrateSlab(ctx, slab) if err != nil { - m.logger.Errorf("%v: failed to migrate slab %d/%d, health: %v, err: %v", id, j.slabIdx+1, len(toMigrate), j.Health, err) + m.logger.Errorf("%v: failed to migrate slab %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err) continue } - m.logger.Debugf("%v: successfully migrated slab '%v' (health: %v) %d/%d", id, j.Key, j.Health, j.slabIdx+1, len(toMigrate)) + m.logger.Debugf("%v: successfully migrated slab '%v' (health: %v) %d/%d", id, j.Key, j.Health, j.slabIdx+1, j.batchSize) } }(w) } }) + var toMigrate []api.UnhealthySlab + +OUTER: + for { + // fetch slabs for migration + toMigrateNew, err := b.SlabsForMigration(ctx, m.healthCutoff, cfg.Contracts.Set, migratorBatchSize) + if err != nil { + m.logger.Errorf("failed to fetch slabs for migration, err: %v", err) + return + } + m.logger.Debugf("%d potential slabs fetched for migration", len(toMigrateNew)) + + // merge toMigrateNew with toMigrate + // NOTE: when merging, we remove all slabs from toMigrate that don't + // require migration anymore. However, slabs that have been in toMigrate + // before will be repaired before any new slabs. This is to prevent + // starvation. + migrateNewMap := make(map[object.EncryptionKey]*api.UnhealthySlab) + for i, slab := range toMigrateNew { + migrateNewMap[slab.Key] = &toMigrateNew[i] + } + removed := 0 + for i := 0; i < len(toMigrate)-removed; { + slab := toMigrate[i] + if _, exists := migrateNewMap[slab.Key]; exists { + delete(migrateNewMap, slab.Key) // delete from map to leave only new slabs + i++ + } else { + toMigrate[i] = toMigrate[len(toMigrate)-1-removed] + removed++ + } + } + toMigrate = toMigrate[:len(toMigrate)-removed] + for _, slab := range migrateNewMap { + toMigrate = append(toMigrate, *slab) + } - // push work to workers - for i, slab := range toMigrate { - select { - case <-m.ap.stopChan: + // sort the newsly added slabs by health + newSlabs := toMigrate[len(toMigrate)-len(migrateNewMap):] + sort.Slice(newSlabs, func(i, j int) bool { + return newSlabs[i].Health < newSlabs[j].Health + }) + migrateNewMap = nil // free map + + // log the updated list of slabs to migrate + m.logger.Debugf("%d slabs to migrate", len(toMigrate)) + + // return if there are no slabs to migrate + if len(toMigrate) == 0 { return - case jobs <- job{slab, i}: + } + + for i, slab := range toMigrate { + select { + case <-m.ap.stopChan: + return + case <-m.signalMaintenanceFinished: + m.logger.Info("migrations interrupted - updating slabs for migration") + continue OUTER + case jobs <- job{slab, i, len(toMigrate)}: + } } } }