Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate archiving contracts to raw sql #1284

Merged
merged 5 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 5 additions & 56 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,62 +450,11 @@ func (ss *SQLStore) RemoveOfflineHosts(ctx context.Context, minRecentFailures ui
if maxDowntime < 0 {
return 0, ErrNegativeMaxDowntime
}

// fetch all hosts outside of the transaction
var hosts []dbHost
if err := ss.db.
WithContext(ctx).
Model(&dbHost{}).
Where("recent_downtime >= ? AND recent_scan_failures >= ?", maxDowntime, minRecentFailures).
Find(&hosts).
Error; err != nil {
return 0, err
}

// return early
if len(hosts) == 0 {
return 0, nil
}

// remove every host one by one
var errs []error
for _, h := range hosts {
if err := ss.retryTransaction(ctx, func(tx *gorm.DB) error {
// fetch host contracts
hcs, err := contractsForHost(tx, h)
if err != nil {
return err
}

// create map
toArchive := make(map[types.FileContractID]string)
for _, c := range hcs {
toArchive[types.FileContractID(c.FCID)] = api.ContractArchivalReasonHostPruned
}

// archive host contracts
if err := archiveContracts(tx, hcs, toArchive); err != nil {
return err
}

// remove the host
if err := tx.Delete(&h).Error; err != nil {
return err
}
removed++
return nil
}); err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
var msgs []string
for _, err := range errs {
msgs = append(msgs, err.Error())
}
err = errors.New(strings.Join(msgs, ";"))
}
err = ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
n, err := tx.RemoveOfflineHosts(ctx, minRecentFailures, maxDowntime)
removed = uint64(n)
return err
})
return
}

Expand Down
238 changes: 59 additions & 179 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,19 +667,19 @@ func (s *SQLStore) ArchiveContracts(ctx context.Context, toArchive map[types.Fil
ids = append(ids, id)
}

// fetch contracts
cs, err := contracts(s.db, ids)
if err != nil {
return err
// invalidate health of related sectors before archiving the contracts
// NOTE: even if this is not done in the same transaction it won't have any
// lasting negative effects.
if err := s.invalidateSlabHealthByFCID(ctx, ids); err != nil {
return fmt.Errorf("ArchiveContracts: failed to invalidate slab health: %w", err)
}

// archive them
if err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
return archiveContracts(tx, cs, toArchive)
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return archiveContracts(ctx, tx, toArchive)
}); err != nil {
return err
return fmt.Errorf("ArchiveContracts: failed to archive contracts: %w", err)
}

return nil
}

Expand Down Expand Up @@ -810,7 +810,7 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds
wanted[fileContractID(fcid)] = struct{}{}
}

var diff []fileContractID
var diff []types.FileContractID
var nContractsAfter int
err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
// fetch contract set
Expand Down Expand Up @@ -839,14 +839,14 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds
// add removals to the diff
for _, contract := range cs.Contracts {
if _, ok := wanted[contract.FCID]; !ok {
diff = append(diff, contract.FCID)
diff = append(diff, types.FileContractID(contract.FCID))
}
delete(wanted, contract.FCID)
}

// add additions to the diff
for fcid := range wanted {
diff = append(diff, fcid)
diff = append(diff, types.FileContractID(fcid))
}

// update the association
Expand Down Expand Up @@ -1482,33 +1482,51 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
}

func (s *SQLStore) RemoveObject(ctx context.Context, bucket, path string) error {
var rowsAffected int64
var err error
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
rowsAffected, err = s.deleteObject(tx, bucket, path)
if err != nil {
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
}
return nil
var prune bool
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
prune, err = tx.DeleteObject(ctx, bucket, path)
return
})
if err != nil {
return err
}
if rowsAffected == 0 {
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
} else if !prune {
return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, path)
}
s.triggerSlabPruning()
return nil
}

func (s *SQLStore) RemoveObjects(ctx context.Context, bucket, prefix string) error {
var err error
deleted, err := s.deleteObjects(ctx, bucket, prefix)
if err != nil {
return err
var prune bool
batchSizeIdx := 0
for {
start := time.Now()
var done bool
var duration time.Duration
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
deleted, err := tx.DeleteObjects(ctx, bucket, prefix, objectDeleteBatchSizes[batchSizeIdx])
if err != nil {
return err
}
prune = prune || deleted
done = !deleted
return nil
}); err != nil {
return fmt.Errorf("failed to delete objects: %w", err)
} else if done {
break // nothing more to delete
}
duration = time.Since(start)

// increase the batch size if deletion was faster than the threshold
if duration < batchDurationThreshold && batchSizeIdx < len(objectDeleteBatchSizes)-1 {
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
batchSizeIdx++
}
}
if !deleted {
if !prune {
return fmt.Errorf("%w: prefix: %s", api.ErrObjectNotFound, prefix)
}
s.triggerSlabPruning()
return nil
}

Expand Down Expand Up @@ -1964,23 +1982,6 @@ func contract(tx *gorm.DB, id fileContractID) (contract dbContract, err error) {
return
}

// contracts retrieves all contracts for the given ids from the store.
func contracts(tx *gorm.DB, ids []types.FileContractID) (dbContracts []dbContract, err error) {
fcids := make([]fileContractID, len(ids))
for i, fcid := range ids {
fcids[i] = fileContractID(fcid)
}

// fetch contracts
err = tx.
Model(&dbContract{}).
Where("fcid IN (?)", fcids).
Joins("Host").
Find(&dbContracts).
Error
return
}

// contractsForHost retrieves all contracts for the given host
func contractsForHost(tx *gorm.DB, host dbHost) (contracts []dbContract, err error) {
err = tx.
Expand Down Expand Up @@ -2047,39 +2048,10 @@ func addContract(tx *gorm.DB, c rhpv2.ContractRevision, contractPrice, totalCost
// archival reason
//
// NOTE: this function archives the contracts without setting a renewed ID
func archiveContracts(tx *gorm.DB, contracts []dbContract, toArchive map[types.FileContractID]string) error {
var toInvalidate []fileContractID
for _, contract := range contracts {
toInvalidate = append(toInvalidate, contract.FCID)
}
// Invalidate the health on the slabs before deleting the contracts to avoid
// breaking the relations beforehand.
if err := invalidateSlabHealthByFCID(tx, toInvalidate); err != nil {
return fmt.Errorf("invalidating slab health failed: %w", err)
}
for _, contract := range contracts {
// sanity check the host is populated
if contract.Host.ID == 0 {
return fmt.Errorf("host not populated for contract %v", contract.FCID)
}

// create a copy in the archive
if err := tx.Create(&dbArchivedContract{
Host: publicKey(contract.Host.PublicKey),
Reason: toArchive[types.FileContractID(contract.FCID)],

ContractCommon: contract.ContractCommon,
}).Error; err != nil {
return err
}

// remove the contract
res := tx.Delete(&contract)
if err := res.Error; err != nil {
return err
}
if res.RowsAffected != 1 {
return fmt.Errorf("expected to delete 1 row, deleted %d", res.RowsAffected)
func archiveContracts(ctx context.Context, tx sql.DatabaseTx, toArchive map[types.FileContractID]string) error {
for fcid, reason := range toArchive {
if err := tx.ArchiveContract(ctx, fcid, reason); err != nil {
return fmt.Errorf("failed to archive contract %v: %w", fcid, err)
}
}
return nil
Expand Down Expand Up @@ -2161,112 +2133,20 @@ func (s *SQLStore) triggerSlabPruning() {
}
}

// deleteObject deletes an object from the store and prunes all slabs which are
// without an obect after the deletion. That means in case of packed uploads,
// the slab is only deleted when no more objects point to it.
func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, error) {
// check if the object exists first to avoid unnecessary locking for the
// common case
var objID uint
resp := tx.Model(&dbObject{}).
Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)).
Select("id").
Limit(1).
Scan(&objID)
if err := resp.Error; err != nil {
return 0, err
} else if resp.RowsAffected == 0 {
return 0, nil
}

tx = tx.Where("id", objID).
Delete(&dbObject{})
if tx.Error != nil {
return 0, tx.Error
}
numDeleted := tx.RowsAffected
if numDeleted == 0 {
return 0, nil // nothing to prune if no object was deleted
}
s.triggerSlabPruning()
return numDeleted, nil
}

// deleteObjects deletes a batch of objects from the database. The order of
// deletion goes from largest to smallest. That's because the batch size is
// dynamically increased and the smaller objects get the faster we can delete
// them meaning it makes sense to increase the batch size over time.
func (s *SQLStore) deleteObjects(ctx context.Context, bucket string, path string) (deleted bool, _ error) {
batchSizeIdx := 0
for {
start := time.Now()
done := false
var duration time.Duration
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
d, err := tx.DeleteObjects(ctx, bucket, path, objectDeleteBatchSizes[batchSizeIdx])
if err != nil {
return err
}
deleted = deleted || d
done = !d
return nil
}); err != nil {
return false, fmt.Errorf("failed to delete objects: %w", err)
}
if done {
break // nothing more to delete
}
duration = time.Since(start)

// increase the batch size if deletion was faster than the threshold
if duration < batchDurationThreshold && batchSizeIdx < len(objectDeleteBatchSizes)-1 {
batchSizeIdx++
}
}
if deleted {
s.triggerSlabPruning()
}
return deleted, nil
}

func invalidateSlabHealthByFCID(tx *gorm.DB, fcids []fileContractID) error {
if len(fcids) == 0 {
return nil
}

func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID) error {
for {
now := time.Now().Unix()
if resp := tx.Exec(`
UPDATE slabs SET health_valid_until = ? WHERE id in (
SELECT *
FROM (
SELECT slabs.id
FROM slabs
INNER JOIN sectors se ON se.db_slab_id = slabs.id
INNER JOIN contract_sectors cs ON cs.db_sector_id = se.id
INNER JOIN contracts c ON c.id = cs.db_contract_id
WHERE c.fcid IN (?) AND slabs.health_valid_until >= ?
LIMIT ?
) slab_ids
)`, now, fcids, now, refreshHealthBatchSize); resp.Error != nil {
return fmt.Errorf("failed to invalidate slab health: %w", resp.Error)
} else if resp.RowsAffected < refreshHealthBatchSize {
break // done
var affected int64
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
affected, err = tx.InvalidateSlabHealthByFCID(ctx, fcids, refreshHealthBatchSize)
return
})
if err != nil {
return fmt.Errorf("failed to invalidate slab health: %w", err)
} else if affected < refreshHealthBatchSize {
return nil // done
}
time.Sleep(time.Second)
}
return nil
}

func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []fileContractID) error {
return s.retryTransaction(ctx, func(tx *gorm.DB) error {
return invalidateSlabHealthByFCID(tx, fcids)
})
}

// nolint:unparam
func sqlWhereBucket(objTable string, bucket string) clause.Expr {
return gorm.Expr(fmt.Sprintf("%s.db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)", objTable), bucket)
}

// TODO: we can use ObjectEntries instead of ListObject if we want to use '/' as
Expand Down
2 changes: 1 addition & 1 deletion stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ func TestUnhealthySlabsNoContracts(t *testing.T) {

// delete the sector - we manually invalidate the slabs for the contract
// before deletion.
err = invalidateSlabHealthByFCID(ss.db, []fileContractID{fileContractID(fcid1)})
err = ss.invalidateSlabHealthByFCID(context.Background(), []types.FileContractID{(fcid1)})
if err != nil {
t.Fatal(err)
}
Expand Down
Loading
Loading