Skip to content

Commit

Permalink
Merge pull request #417 from SiaFoundation/chris/migration-interrupt
Browse files Browse the repository at this point in the history
Interrupt migrations when the contractset was updated to avoid unnecessary migrations
  • Loading branch information
ChrisSchinnerl committed Jun 16, 2023
2 parents ff5c244 + 433d8dc commit 002a04c
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 47 deletions.
8 changes: 7 additions & 1 deletion autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,18 @@ func (ap *Autopilot) Run() error {
}

// perform maintenance
err = ap.c.performContractMaintenance(ctx, w)
setChanged, err := ap.c.performContractMaintenance(ctx, w)
if err != nil {
ap.logger.Errorf("contract maintenance failed, err: %v", err)
}
maintenanceSuccess := err == nil

// upon success, notify the migrator. The health of slabs might have
// changed.
if maintenanceSuccess && setChanged {
ap.m.SignalMaintenanceFinished()
}

// launch account refills after successful contract maintenance.
if maintenanceSuccess {
launchAccountRefillsOnce.Do(func() {
Expand Down
36 changes: 18 additions & 18 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ func newContractor(ap *Autopilot) *contractor {
}
}

func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) error {
func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) (bool, error) {
ctx, span := tracing.Tracer.Start(ctx, "contractor.performContractMaintenance")
defer span.End()

if c.ap.isStopped() || !c.ap.isSynced() {
return nil // skip contract maintenance if we're not synced
return false, nil // skip contract maintenance if we're not synced
}

c.logger.Info("performing contract maintenance")
Expand All @@ -117,31 +117,31 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e
// not zero in several places
if state.cfg.Contracts.Amount == 0 {
c.logger.Warn("contracts is set to zero, skipping contract maintenance")
return nil
return false, nil
}

// no maintenance if no allowance was set
if state.cfg.Contracts.Allowance.IsZero() {
c.logger.Warn("allowance is set to zero, skipping contract maintenance")
return nil
return false, nil
}

// no maintenance if no period was set
if state.cfg.Contracts.Period == 0 {
c.logger.Warn("period is set to zero, skipping contract maintenance")
return nil
return false, nil
}

// fetch our wallet address
address, err := c.ap.bus.WalletAddress(ctx)
if err != nil {
return err
return false, err
}

// fetch current contract set
currentSet, err := c.ap.bus.ContractSetContracts(ctx, state.cfg.Contracts.Set)
if err != nil && !strings.Contains(err.Error(), api.ErrContractSetNotFound.Error()) {
return err
return false, err
}
isInCurrentSet := make(map[types.FileContractID]struct{})
for _, c := range currentSet {
Expand All @@ -153,7 +153,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e
start := time.Now()
resp, err := w.Contracts(ctx, timeoutHostRevision)
if err != nil {
return err
return false, err
}
if resp.Error != "" {
c.logger.Error(resp.Error)
Expand Down Expand Up @@ -183,15 +183,15 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e
// fetch all hosts
hosts, err := c.ap.bus.Hosts(ctx, 0, -1)
if err != nil {
return err
return false, err
}

// min score to pass checks.
var minScore float64
if len(hosts) > 0 {
minScore, err = c.managedFindMinAllowedHostScores(ctx, w, hosts, hostData)
if err != nil {
return fmt.Errorf("failed to determine min score for contract check: %w", err)
return false, fmt.Errorf("failed to determine min score for contract check: %w", err)
}
} else {
c.logger.Warn("could not calculate min score, no hosts found")
Expand Down Expand Up @@ -222,7 +222,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e
// run checks
updatedSet, toArchive, toStopUsing, toRefresh, toRenew, err := c.runContractChecks(ctx, w, contracts, minScore)
if err != nil {
return fmt.Errorf("failed to run contract checks, err: %v", err)
return false, fmt.Errorf("failed to run contract checks, err: %v", err)
}

// archive contracts
Expand All @@ -236,7 +236,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e
// calculate remaining funds
remaining, err := c.remainingFunds(contracts)
if err != nil {
return err
return false, err
}

// calculate 'limit' amount of contracts we want to renew
Expand Down Expand Up @@ -308,19 +308,18 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) e

// update contract set
if c.ap.isStopped() {
return errors.New("autopilot stopped before maintenance could be completed")
return false, errors.New("autopilot stopped before maintenance could be completed")
}
err = c.ap.bus.SetContractSet(ctx, state.cfg.Contracts.Set, updatedSet)
if err != nil {
return err
return false, err
}

// log the contract set after maintenance
c.logContractSetUpdate(currentSet, updatedSet, formed, refreshed, renewed, toStopUsing, contractData)
return nil
// return whether the maintenance changed the contract set
return c.computeContractSetChanged(currentSet, updatedSet, formed, refreshed, renewed, toStopUsing, contractData), nil
}

func (c *contractor) logContractSetUpdate(oldSet []api.ContractMetadata, newSet, formed []types.FileContractID, refreshed, renewed []renewal, toStopUsing map[types.FileContractID]string, contractData map[types.FileContractID]uint64) {
func (c *contractor) computeContractSetChanged(oldSet []api.ContractMetadata, newSet, formed []types.FileContractID, refreshed, renewed []renewal, toStopUsing map[types.FileContractID]string, contractData map[types.FileContractID]uint64) bool {
// build some maps for easier lookups
previous := make(map[types.FileContractID]struct{})
for _, c := range oldSet {
Expand Down Expand Up @@ -385,6 +384,7 @@ func (c *contractor) logContractSetUpdate(oldSet []api.ContractMetadata, newSet,
"added", len(added),
"removed", len(removed),
)
return len(added)+len(removed) > 0
}

func (c *contractor) performWalletMaintenance(ctx context.Context) error {
Expand Down
110 changes: 82 additions & 28 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package autopilot
import (
"context"
"math"
"sort"
"sync"

"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/tracing"
"go.uber.org/zap"
)
Expand All @@ -15,19 +17,28 @@ const (
)

type migrator struct {
ap *Autopilot
logger *zap.SugaredLogger
healthCutoff float64
ap *Autopilot
logger *zap.SugaredLogger
healthCutoff float64
signalMaintenanceFinished chan struct{}

mu sync.Mutex
running bool
}

func newMigrator(ap *Autopilot, healthCutoff float64) *migrator {
return &migrator{
ap: ap,
logger: ap.logger.Named("migrator"),
healthCutoff: healthCutoff,
ap: ap,
logger: ap.logger.Named("migrator"),
healthCutoff: healthCutoff,
signalMaintenanceFinished: make(chan struct{}, 1),
}
}

func (m *migrator) SignalMaintenanceFinished() {
select {
case m.signalMaintenanceFinished <- struct{}{}:
default:
}
}

Expand Down Expand Up @@ -56,23 +67,11 @@ func (m *migrator) performMigrations(p *workerPool, cfg api.AutopilotConfig) {
ctx, span := tracing.Tracer.Start(context.Background(), "migrator.performMigrations")
defer span.End()

// fetch slabs for migration
toMigrate, err := b.SlabsForMigration(ctx, m.healthCutoff, cfg.Contracts.Set, migratorBatchSize)
if err != nil {
m.logger.Errorf("failed to fetch slabs for migration, err: %v", err)
return
}
m.logger.Debugf("%d slabs to migrate", len(toMigrate))

// return if there are no slabs to migrate
if len(toMigrate) == 0 {
return
}

// prepare a channel to push work to the workers
type job struct {
api.UnhealthySlab
slabIdx int
slabIdx int
batchSize int
}
jobs := make(chan job)
var wg sync.WaitGroup
Expand All @@ -81,6 +80,7 @@ func (m *migrator) performMigrations(p *workerPool, cfg api.AutopilotConfig) {
wg.Wait()
}()

// launch workers
p.withWorkers(func(workers []Worker) {
for _, w := range workers {
wg.Add(1)
Expand All @@ -96,26 +96,80 @@ func (m *migrator) performMigrations(p *workerPool, cfg api.AutopilotConfig) {
for j := range jobs {
slab, err := b.Slab(ctx, j.Key)
if err != nil {
m.logger.Errorf("%v: failed to fetch slab for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, len(toMigrate), j.Health, err)
m.logger.Errorf("%v: failed to fetch slab for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err)
continue
}
err = w.MigrateSlab(ctx, slab)
if err != nil {
m.logger.Errorf("%v: failed to migrate slab %d/%d, health: %v, err: %v", id, j.slabIdx+1, len(toMigrate), j.Health, err)
m.logger.Errorf("%v: failed to migrate slab %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err)
continue
}
m.logger.Debugf("%v: successfully migrated slab '%v' (health: %v) %d/%d", id, j.Key, j.Health, j.slabIdx+1, len(toMigrate))
m.logger.Debugf("%v: successfully migrated slab '%v' (health: %v) %d/%d", id, j.Key, j.Health, j.slabIdx+1, j.batchSize)
}
}(w)
}
})
var toMigrate []api.UnhealthySlab

OUTER:
for {
// fetch slabs for migration
toMigrateNew, err := b.SlabsForMigration(ctx, m.healthCutoff, cfg.Contracts.Set, migratorBatchSize)
if err != nil {
m.logger.Errorf("failed to fetch slabs for migration, err: %v", err)
return
}
m.logger.Debugf("%d potential slabs fetched for migration", len(toMigrateNew))

// merge toMigrateNew with toMigrate
// NOTE: when merging, we remove all slabs from toMigrate that don't
// require migration anymore. However, slabs that have been in toMigrate
// before will be repaired before any new slabs. This is to prevent
// starvation.
migrateNewMap := make(map[object.EncryptionKey]*api.UnhealthySlab)
for i, slab := range toMigrateNew {
migrateNewMap[slab.Key] = &toMigrateNew[i]
}
removed := 0
for i := 0; i < len(toMigrate)-removed; {
slab := toMigrate[i]
if _, exists := migrateNewMap[slab.Key]; exists {
delete(migrateNewMap, slab.Key) // delete from map to leave only new slabs
i++
} else {
toMigrate[i] = toMigrate[len(toMigrate)-1-removed]
removed++
}
}
toMigrate = toMigrate[:len(toMigrate)-removed]
for _, slab := range migrateNewMap {
toMigrate = append(toMigrate, *slab)
}

// push work to workers
for i, slab := range toMigrate {
select {
case <-m.ap.stopChan:
// sort the newsly added slabs by health
newSlabs := toMigrate[len(toMigrate)-len(migrateNewMap):]
sort.Slice(newSlabs, func(i, j int) bool {
return newSlabs[i].Health < newSlabs[j].Health
})
migrateNewMap = nil // free map

// log the updated list of slabs to migrate
m.logger.Debugf("%d slabs to migrate", len(toMigrate))

// return if there are no slabs to migrate
if len(toMigrate) == 0 {
return
case jobs <- job{slab, i}:
}

for i, slab := range toMigrate {
select {
case <-m.ap.stopChan:
return
case <-m.signalMaintenanceFinished:
m.logger.Info("migrations interrupted - updating slabs for migration")
continue OUTER
case jobs <- job{slab, i, len(toMigrate)}:
}
}
}
}

0 comments on commit 002a04c

Please sign in to comment.