Skip to content

Commit

Permalink
Migrate archiving contracts to raw sql (#1284)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Jun 10, 2024
1 parent 55f5b1c commit 501b60f
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 237 deletions.
61 changes: 5 additions & 56 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,62 +450,11 @@ func (ss *SQLStore) RemoveOfflineHosts(ctx context.Context, minRecentFailures ui
if maxDowntime < 0 {
return 0, ErrNegativeMaxDowntime
}

// fetch all hosts outside of the transaction
var hosts []dbHost
if err := ss.db.
WithContext(ctx).
Model(&dbHost{}).
Where("recent_downtime >= ? AND recent_scan_failures >= ?", maxDowntime, minRecentFailures).
Find(&hosts).
Error; err != nil {
return 0, err
}

// return early
if len(hosts) == 0 {
return 0, nil
}

// remove every host one by one
var errs []error
for _, h := range hosts {
if err := ss.retryTransaction(ctx, func(tx *gorm.DB) error {
// fetch host contracts
hcs, err := contractsForHost(tx, h)
if err != nil {
return err
}

// create map
toArchive := make(map[types.FileContractID]string)
for _, c := range hcs {
toArchive[types.FileContractID(c.FCID)] = api.ContractArchivalReasonHostPruned
}

// archive host contracts
if err := archiveContracts(tx, hcs, toArchive); err != nil {
return err
}

// remove the host
if err := tx.Delete(&h).Error; err != nil {
return err
}
removed++
return nil
}); err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
var msgs []string
for _, err := range errs {
msgs = append(msgs, err.Error())
}
err = errors.New(strings.Join(msgs, ";"))
}
err = ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
n, err := tx.RemoveOfflineHosts(ctx, minRecentFailures, maxDowntime)
removed = uint64(n)
return err
})
return
}

Expand Down
238 changes: 59 additions & 179 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,19 +667,19 @@ func (s *SQLStore) ArchiveContracts(ctx context.Context, toArchive map[types.Fil
ids = append(ids, id)
}

// fetch contracts
cs, err := contracts(s.db, ids)
if err != nil {
return err
// invalidate health of related sectors before archiving the contracts
// NOTE: even if this is not done in the same transaction it won't have any
// lasting negative effects.
if err := s.invalidateSlabHealthByFCID(ctx, ids); err != nil {
return fmt.Errorf("ArchiveContracts: failed to invalidate slab health: %w", err)
}

// archive them
if err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
return archiveContracts(tx, cs, toArchive)
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return archiveContracts(ctx, tx, toArchive)
}); err != nil {
return err
return fmt.Errorf("ArchiveContracts: failed to archive contracts: %w", err)
}

return nil
}

Expand Down Expand Up @@ -810,7 +810,7 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds
wanted[fileContractID(fcid)] = struct{}{}
}

var diff []fileContractID
var diff []types.FileContractID
var nContractsAfter int
err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
// fetch contract set
Expand Down Expand Up @@ -839,14 +839,14 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds
// add removals to the diff
for _, contract := range cs.Contracts {
if _, ok := wanted[contract.FCID]; !ok {
diff = append(diff, contract.FCID)
diff = append(diff, types.FileContractID(contract.FCID))
}
delete(wanted, contract.FCID)
}

// add additions to the diff
for fcid := range wanted {
diff = append(diff, fcid)
diff = append(diff, types.FileContractID(fcid))
}

// update the association
Expand Down Expand Up @@ -1482,33 +1482,51 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
}

func (s *SQLStore) RemoveObject(ctx context.Context, bucket, path string) error {
var rowsAffected int64
var err error
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
rowsAffected, err = s.deleteObject(tx, bucket, path)
if err != nil {
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
}
return nil
var prune bool
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
prune, err = tx.DeleteObject(ctx, bucket, path)
return
})
if err != nil {
return err
}
if rowsAffected == 0 {
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
} else if !prune {
return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, path)
}
s.triggerSlabPruning()
return nil
}

func (s *SQLStore) RemoveObjects(ctx context.Context, bucket, prefix string) error {
var err error
deleted, err := s.deleteObjects(ctx, bucket, prefix)
if err != nil {
return err
var prune bool
batchSizeIdx := 0
for {
start := time.Now()
var done bool
var duration time.Duration
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
deleted, err := tx.DeleteObjects(ctx, bucket, prefix, objectDeleteBatchSizes[batchSizeIdx])
if err != nil {
return err
}
prune = prune || deleted
done = !deleted
return nil
}); err != nil {
return fmt.Errorf("failed to delete objects: %w", err)
} else if done {
break // nothing more to delete
}
duration = time.Since(start)

// increase the batch size if deletion was faster than the threshold
if duration < batchDurationThreshold && batchSizeIdx < len(objectDeleteBatchSizes)-1 {
batchSizeIdx++
}
}
if !deleted {
if !prune {
return fmt.Errorf("%w: prefix: %s", api.ErrObjectNotFound, prefix)
}
s.triggerSlabPruning()
return nil
}

Expand Down Expand Up @@ -1964,23 +1982,6 @@ func contract(tx *gorm.DB, id fileContractID) (contract dbContract, err error) {
return
}

// contracts retrieves all contracts for the given ids from the store.
func contracts(tx *gorm.DB, ids []types.FileContractID) (dbContracts []dbContract, err error) {
fcids := make([]fileContractID, len(ids))
for i, fcid := range ids {
fcids[i] = fileContractID(fcid)
}

// fetch contracts
err = tx.
Model(&dbContract{}).
Where("fcid IN (?)", fcids).
Joins("Host").
Find(&dbContracts).
Error
return
}

// contractsForHost retrieves all contracts for the given host
func contractsForHost(tx *gorm.DB, host dbHost) (contracts []dbContract, err error) {
err = tx.
Expand Down Expand Up @@ -2047,39 +2048,10 @@ func addContract(tx *gorm.DB, c rhpv2.ContractRevision, contractPrice, totalCost
// archival reason
//
// NOTE: this function archives the contracts without setting a renewed ID
func archiveContracts(tx *gorm.DB, contracts []dbContract, toArchive map[types.FileContractID]string) error {
var toInvalidate []fileContractID
for _, contract := range contracts {
toInvalidate = append(toInvalidate, contract.FCID)
}
// Invalidate the health on the slabs before deleting the contracts to avoid
// breaking the relations beforehand.
if err := invalidateSlabHealthByFCID(tx, toInvalidate); err != nil {
return fmt.Errorf("invalidating slab health failed: %w", err)
}
for _, contract := range contracts {
// sanity check the host is populated
if contract.Host.ID == 0 {
return fmt.Errorf("host not populated for contract %v", contract.FCID)
}

// create a copy in the archive
if err := tx.Create(&dbArchivedContract{
Host: publicKey(contract.Host.PublicKey),
Reason: toArchive[types.FileContractID(contract.FCID)],

ContractCommon: contract.ContractCommon,
}).Error; err != nil {
return err
}

// remove the contract
res := tx.Delete(&contract)
if err := res.Error; err != nil {
return err
}
if res.RowsAffected != 1 {
return fmt.Errorf("expected to delete 1 row, deleted %d", res.RowsAffected)
func archiveContracts(ctx context.Context, tx sql.DatabaseTx, toArchive map[types.FileContractID]string) error {
for fcid, reason := range toArchive {
if err := tx.ArchiveContract(ctx, fcid, reason); err != nil {
return fmt.Errorf("failed to archive contract %v: %w", fcid, err)
}
}
return nil
Expand Down Expand Up @@ -2161,112 +2133,20 @@ func (s *SQLStore) triggerSlabPruning() {
}
}

// deleteObject deletes an object from the store and prunes all slabs which are
// without an obect after the deletion. That means in case of packed uploads,
// the slab is only deleted when no more objects point to it.
func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, error) {
// check if the object exists first to avoid unnecessary locking for the
// common case
var objID uint
resp := tx.Model(&dbObject{}).
Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)).
Select("id").
Limit(1).
Scan(&objID)
if err := resp.Error; err != nil {
return 0, err
} else if resp.RowsAffected == 0 {
return 0, nil
}

tx = tx.Where("id", objID).
Delete(&dbObject{})
if tx.Error != nil {
return 0, tx.Error
}
numDeleted := tx.RowsAffected
if numDeleted == 0 {
return 0, nil // nothing to prune if no object was deleted
}
s.triggerSlabPruning()
return numDeleted, nil
}

// deleteObjects deletes a batch of objects from the database. The order of
// deletion goes from largest to smallest. That's because the batch size is
// dynamically increased and the smaller objects get the faster we can delete
// them meaning it makes sense to increase the batch size over time.
func (s *SQLStore) deleteObjects(ctx context.Context, bucket string, path string) (deleted bool, _ error) {
batchSizeIdx := 0
for {
start := time.Now()
done := false
var duration time.Duration
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
d, err := tx.DeleteObjects(ctx, bucket, path, objectDeleteBatchSizes[batchSizeIdx])
if err != nil {
return err
}
deleted = deleted || d
done = !d
return nil
}); err != nil {
return false, fmt.Errorf("failed to delete objects: %w", err)
}
if done {
break // nothing more to delete
}
duration = time.Since(start)

// increase the batch size if deletion was faster than the threshold
if duration < batchDurationThreshold && batchSizeIdx < len(objectDeleteBatchSizes)-1 {
batchSizeIdx++
}
}
if deleted {
s.triggerSlabPruning()
}
return deleted, nil
}

func invalidateSlabHealthByFCID(tx *gorm.DB, fcids []fileContractID) error {
if len(fcids) == 0 {
return nil
}

func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID) error {
for {
now := time.Now().Unix()
if resp := tx.Exec(`
UPDATE slabs SET health_valid_until = ? WHERE id in (
SELECT *
FROM (
SELECT slabs.id
FROM slabs
INNER JOIN sectors se ON se.db_slab_id = slabs.id
INNER JOIN contract_sectors cs ON cs.db_sector_id = se.id
INNER JOIN contracts c ON c.id = cs.db_contract_id
WHERE c.fcid IN (?) AND slabs.health_valid_until >= ?
LIMIT ?
) slab_ids
)`, now, fcids, now, refreshHealthBatchSize); resp.Error != nil {
return fmt.Errorf("failed to invalidate slab health: %w", resp.Error)
} else if resp.RowsAffected < refreshHealthBatchSize {
break // done
var affected int64
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
affected, err = tx.InvalidateSlabHealthByFCID(ctx, fcids, refreshHealthBatchSize)
return
})
if err != nil {
return fmt.Errorf("failed to invalidate slab health: %w", err)
} else if affected < refreshHealthBatchSize {
return nil // done
}
time.Sleep(time.Second)
}
return nil
}

func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []fileContractID) error {
return s.retryTransaction(ctx, func(tx *gorm.DB) error {
return invalidateSlabHealthByFCID(tx, fcids)
})
}

// nolint:unparam
func sqlWhereBucket(objTable string, bucket string) clause.Expr {
return gorm.Expr(fmt.Sprintf("%s.db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)", objTable), bucket)
}

// TODO: we can use ObjectEntries instead of ListObject if we want to use '/' as
Expand Down
2 changes: 1 addition & 1 deletion stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ func TestUnhealthySlabsNoContracts(t *testing.T) {

// delete the sector - we manually invalidate the slabs for the contract
// before deletion.
err = invalidateSlabHealthByFCID(ss.db, []fileContractID{fileContractID(fcid1)})
err = ss.invalidateSlabHealthByFCID(context.Background(), []types.FileContractID{(fcid1)})
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 501b60f

Please sign in to comment.