diff --git a/stores/hostdb.go b/stores/hostdb.go index 7a76adaac..339123060 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -450,62 +450,11 @@ func (ss *SQLStore) RemoveOfflineHosts(ctx context.Context, minRecentFailures ui if maxDowntime < 0 { return 0, ErrNegativeMaxDowntime } - - // fetch all hosts outside of the transaction - var hosts []dbHost - if err := ss.db. - WithContext(ctx). - Model(&dbHost{}). - Where("recent_downtime >= ? AND recent_scan_failures >= ?", maxDowntime, minRecentFailures). - Find(&hosts). - Error; err != nil { - return 0, err - } - - // return early - if len(hosts) == 0 { - return 0, nil - } - - // remove every host one by one - var errs []error - for _, h := range hosts { - if err := ss.retryTransaction(ctx, func(tx *gorm.DB) error { - // fetch host contracts - hcs, err := contractsForHost(tx, h) - if err != nil { - return err - } - - // create map - toArchive := make(map[types.FileContractID]string) - for _, c := range hcs { - toArchive[types.FileContractID(c.FCID)] = api.ContractArchivalReasonHostPruned - } - - // archive host contracts - if err := archiveContracts(tx, hcs, toArchive); err != nil { - return err - } - - // remove the host - if err := tx.Delete(&h).Error; err != nil { - return err - } - removed++ - return nil - }); err != nil { - errs = append(errs, err) - } - } - - if len(errs) > 0 { - var msgs []string - for _, err := range errs { - msgs = append(msgs, err.Error()) - } - err = errors.New(strings.Join(msgs, ";")) - } + err = ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + n, err := tx.RemoveOfflineHosts(ctx, minRecentFailures, maxDowntime) + removed = uint64(n) + return err + }) return } diff --git a/stores/metadata.go b/stores/metadata.go index b9384c820..efe0fd941 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -667,19 +667,19 @@ func (s *SQLStore) ArchiveContracts(ctx context.Context, toArchive map[types.Fil ids = append(ids, id) } - // fetch contracts - cs, err := contracts(s.db, ids) - if err != nil { - return err + // invalidate health of related sectors before archiving the contracts + // NOTE: even if this is not done in the same transaction it won't have any + // lasting negative effects. + if err := s.invalidateSlabHealthByFCID(ctx, ids); err != nil { + return fmt.Errorf("ArchiveContracts: failed to invalidate slab health: %w", err) } // archive them - if err := s.retryTransaction(ctx, func(tx *gorm.DB) error { - return archiveContracts(tx, cs, toArchive) + if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + return archiveContracts(ctx, tx, toArchive) }); err != nil { - return err + return fmt.Errorf("ArchiveContracts: failed to archive contracts: %w", err) } - return nil } @@ -810,7 +810,7 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds wanted[fileContractID(fcid)] = struct{}{} } - var diff []fileContractID + var diff []types.FileContractID var nContractsAfter int err := s.retryTransaction(ctx, func(tx *gorm.DB) error { // fetch contract set @@ -839,14 +839,14 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds // add removals to the diff for _, contract := range cs.Contracts { if _, ok := wanted[contract.FCID]; !ok { - diff = append(diff, contract.FCID) + diff = append(diff, types.FileContractID(contract.FCID)) } delete(wanted, contract.FCID) } // add additions to the diff for fcid := range wanted { - diff = append(diff, fcid) + diff = append(diff, types.FileContractID(fcid)) } // update the association @@ -1482,33 +1482,51 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, } func (s *SQLStore) RemoveObject(ctx context.Context, bucket, path string) error { - var rowsAffected int64 - var err error - err = s.retryTransaction(ctx, func(tx *gorm.DB) error { - rowsAffected, err = s.deleteObject(tx, bucket, path) - if err != nil { - return fmt.Errorf("RemoveObject: failed to delete object: %w", err) - } - return nil + var prune bool + err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) { + prune, err = tx.DeleteObject(ctx, bucket, path) + return }) if err != nil { - return err - } - if rowsAffected == 0 { + return fmt.Errorf("RemoveObject: failed to delete object: %w", err) + } else if !prune { return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, path) } + s.triggerSlabPruning() return nil } func (s *SQLStore) RemoveObjects(ctx context.Context, bucket, prefix string) error { - var err error - deleted, err := s.deleteObjects(ctx, bucket, prefix) - if err != nil { - return err + var prune bool + batchSizeIdx := 0 + for { + start := time.Now() + var done bool + var duration time.Duration + if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + deleted, err := tx.DeleteObjects(ctx, bucket, prefix, objectDeleteBatchSizes[batchSizeIdx]) + if err != nil { + return err + } + prune = prune || deleted + done = !deleted + return nil + }); err != nil { + return fmt.Errorf("failed to delete objects: %w", err) + } else if done { + break // nothing more to delete + } + duration = time.Since(start) + + // increase the batch size if deletion was faster than the threshold + if duration < batchDurationThreshold && batchSizeIdx < len(objectDeleteBatchSizes)-1 { + batchSizeIdx++ + } } - if !deleted { + if !prune { return fmt.Errorf("%w: prefix: %s", api.ErrObjectNotFound, prefix) } + s.triggerSlabPruning() return nil } @@ -1964,23 +1982,6 @@ func contract(tx *gorm.DB, id fileContractID) (contract dbContract, err error) { return } -// contracts retrieves all contracts for the given ids from the store. -func contracts(tx *gorm.DB, ids []types.FileContractID) (dbContracts []dbContract, err error) { - fcids := make([]fileContractID, len(ids)) - for i, fcid := range ids { - fcids[i] = fileContractID(fcid) - } - - // fetch contracts - err = tx. - Model(&dbContract{}). - Where("fcid IN (?)", fcids). - Joins("Host"). - Find(&dbContracts). - Error - return -} - // contractsForHost retrieves all contracts for the given host func contractsForHost(tx *gorm.DB, host dbHost) (contracts []dbContract, err error) { err = tx. @@ -2047,39 +2048,10 @@ func addContract(tx *gorm.DB, c rhpv2.ContractRevision, contractPrice, totalCost // archival reason // // NOTE: this function archives the contracts without setting a renewed ID -func archiveContracts(tx *gorm.DB, contracts []dbContract, toArchive map[types.FileContractID]string) error { - var toInvalidate []fileContractID - for _, contract := range contracts { - toInvalidate = append(toInvalidate, contract.FCID) - } - // Invalidate the health on the slabs before deleting the contracts to avoid - // breaking the relations beforehand. - if err := invalidateSlabHealthByFCID(tx, toInvalidate); err != nil { - return fmt.Errorf("invalidating slab health failed: %w", err) - } - for _, contract := range contracts { - // sanity check the host is populated - if contract.Host.ID == 0 { - return fmt.Errorf("host not populated for contract %v", contract.FCID) - } - - // create a copy in the archive - if err := tx.Create(&dbArchivedContract{ - Host: publicKey(contract.Host.PublicKey), - Reason: toArchive[types.FileContractID(contract.FCID)], - - ContractCommon: contract.ContractCommon, - }).Error; err != nil { - return err - } - - // remove the contract - res := tx.Delete(&contract) - if err := res.Error; err != nil { - return err - } - if res.RowsAffected != 1 { - return fmt.Errorf("expected to delete 1 row, deleted %d", res.RowsAffected) +func archiveContracts(ctx context.Context, tx sql.DatabaseTx, toArchive map[types.FileContractID]string) error { + for fcid, reason := range toArchive { + if err := tx.ArchiveContract(ctx, fcid, reason); err != nil { + return fmt.Errorf("failed to archive contract %v: %w", fcid, err) } } return nil @@ -2161,112 +2133,20 @@ func (s *SQLStore) triggerSlabPruning() { } } -// deleteObject deletes an object from the store and prunes all slabs which are -// without an obect after the deletion. That means in case of packed uploads, -// the slab is only deleted when no more objects point to it. -func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, error) { - // check if the object exists first to avoid unnecessary locking for the - // common case - var objID uint - resp := tx.Model(&dbObject{}). - Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)). - Select("id"). - Limit(1). - Scan(&objID) - if err := resp.Error; err != nil { - return 0, err - } else if resp.RowsAffected == 0 { - return 0, nil - } - - tx = tx.Where("id", objID). - Delete(&dbObject{}) - if tx.Error != nil { - return 0, tx.Error - } - numDeleted := tx.RowsAffected - if numDeleted == 0 { - return 0, nil // nothing to prune if no object was deleted - } - s.triggerSlabPruning() - return numDeleted, nil -} - -// deleteObjects deletes a batch of objects from the database. The order of -// deletion goes from largest to smallest. That's because the batch size is -// dynamically increased and the smaller objects get the faster we can delete -// them meaning it makes sense to increase the batch size over time. -func (s *SQLStore) deleteObjects(ctx context.Context, bucket string, path string) (deleted bool, _ error) { - batchSizeIdx := 0 - for { - start := time.Now() - done := false - var duration time.Duration - if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { - d, err := tx.DeleteObjects(ctx, bucket, path, objectDeleteBatchSizes[batchSizeIdx]) - if err != nil { - return err - } - deleted = deleted || d - done = !d - return nil - }); err != nil { - return false, fmt.Errorf("failed to delete objects: %w", err) - } - if done { - break // nothing more to delete - } - duration = time.Since(start) - - // increase the batch size if deletion was faster than the threshold - if duration < batchDurationThreshold && batchSizeIdx < len(objectDeleteBatchSizes)-1 { - batchSizeIdx++ - } - } - if deleted { - s.triggerSlabPruning() - } - return deleted, nil -} - -func invalidateSlabHealthByFCID(tx *gorm.DB, fcids []fileContractID) error { - if len(fcids) == 0 { - return nil - } - +func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID) error { for { - now := time.Now().Unix() - if resp := tx.Exec(` - UPDATE slabs SET health_valid_until = ? WHERE id in ( - SELECT * - FROM ( - SELECT slabs.id - FROM slabs - INNER JOIN sectors se ON se.db_slab_id = slabs.id - INNER JOIN contract_sectors cs ON cs.db_sector_id = se.id - INNER JOIN contracts c ON c.id = cs.db_contract_id - WHERE c.fcid IN (?) AND slabs.health_valid_until >= ? - LIMIT ? - ) slab_ids - )`, now, fcids, now, refreshHealthBatchSize); resp.Error != nil { - return fmt.Errorf("failed to invalidate slab health: %w", resp.Error) - } else if resp.RowsAffected < refreshHealthBatchSize { - break // done + var affected int64 + err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) { + affected, err = tx.InvalidateSlabHealthByFCID(ctx, fcids, refreshHealthBatchSize) + return + }) + if err != nil { + return fmt.Errorf("failed to invalidate slab health: %w", err) + } else if affected < refreshHealthBatchSize { + return nil // done } time.Sleep(time.Second) } - return nil -} - -func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []fileContractID) error { - return s.retryTransaction(ctx, func(tx *gorm.DB) error { - return invalidateSlabHealthByFCID(tx, fcids) - }) -} - -// nolint:unparam -func sqlWhereBucket(objTable string, bucket string) clause.Expr { - return gorm.Expr(fmt.Sprintf("%s.db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)", objTable), bucket) } // TODO: we can use ObjectEntries instead of ListObject if we want to use '/' as diff --git a/stores/metadata_test.go b/stores/metadata_test.go index c436feade..d1afa8b68 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -1929,7 +1929,7 @@ func TestUnhealthySlabsNoContracts(t *testing.T) { // delete the sector - we manually invalidate the slabs for the contract // before deletion. - err = invalidateSlabHealthByFCID(ss.db, []fileContractID{fileContractID(fcid1)}) + err = ss.invalidateSlabHealthByFCID(context.Background(), []types.FileContractID{(fcid1)}) if err != nil { t.Fatal(err) } diff --git a/stores/sql/database.go b/stores/sql/database.go index 722cc0b0e..bc1e23ef0 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -34,6 +34,10 @@ type ( // AddMultipartPart adds a part to an unfinished multipart upload. AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices object.SlabSlices) error + // ArchiveContract moves a contract from the regular contracts to the + // archived ones. + ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error + // Bucket returns the bucket with the given name. If the bucket doesn't // exist, it returns api.ErrBucketNotFound. Bucket(ctx context.Context, bucket string) (api.Bucket, error) @@ -65,6 +69,10 @@ type ( // unique upload ID. InsertMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) + // InvalidateSlabHealthByFCID invalidates the health of all slabs that + // are associated with any of the provided contracts. + InvalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID, limit int64) (int64, error) + // DeleteBucket deletes a bucket. If the bucket isn't empty, it returns // api.ErrBucketNotEmpty. If the bucket doesn't exist, it returns // api.ErrBucketNotFound. @@ -108,6 +116,11 @@ type ( // or slab buffer. PruneSlabs(ctx context.Context, limit int64) (int64, error) + // RemoveOfflineHosts removes all hosts that have been offline for + // longer than maxDownTime and been scanned at least minRecentFailures + // times. The contracts of those hosts are also removed. + RemoveOfflineHosts(ctx context.Context, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) + // RenameObject renames an object in the database from keyOld to keyNew // and the new directory dirID. returns api.ErrObjectExists if the an // object already exists at the target location or api.ErrObjectNotFound diff --git a/stores/sql/main.go b/stores/sql/main.go index 7ba04b290..4332c6d2c 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -56,6 +56,33 @@ func AbortMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, up return errors.New("failed to delete multipart upload for unknown reason") } +func ArchiveContract(ctx context.Context, tx sql.Tx, fcid types.FileContractID, reason string) error { + _, err := tx.Exec(ctx, ` + INSERT INTO archived_contracts (created_at, fcid, renewed_from, contract_price, state, total_cost, + proof_height, revision_height, revision_number, size, start_height, window_start, window_end, + upload_spending, download_spending, fund_account_spending, delete_spending, list_spending, renewed_to, + host, reason) + SELECT ?, fcid, renewed_from, contract_price, state, total_cost, proof_height, revision_height, revision_number, + size, start_height, window_start, window_end, upload_spending, download_spending, fund_account_spending, + delete_spending, list_spending, NULL, h.public_key, ? + FROM contracts c + INNER JOIN hosts h ON h.id = c.host_id + WHERE fcid = ? + `, time.Now(), reason, FileContractID(fcid)) + if err != nil { + return fmt.Errorf("failed to copy contract to archived_contracts: %w", err) + } + res, err := tx.Exec(ctx, "DELETE FROM contracts WHERE fcid = ?", FileContractID(fcid)) + if err != nil { + return fmt.Errorf("failed to delete contract from contracts: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to fetch rows affected: %w", err) + } else if n == 0 { + return fmt.Errorf("expected to delete 1 row, deleted %d", n) + } + return nil +} + func Bucket(ctx context.Context, tx sql.Tx, bucket string) (api.Bucket, error) { b, err := scanBucket(tx.QueryRow(ctx, "SELECT created_at, name, COALESCE(policy, '{}') FROM buckets WHERE name = ?", bucket)) if err != nil { @@ -748,6 +775,44 @@ func ObjectsStats(ctx context.Context, tx sql.Tx, opts api.ObjectsStatsOpts) (ap }, nil } +func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) { + // fetch contracts + rows, err := tx.Query(ctx, ` + SELECT fcid + FROM contracts + INNER JOIN hosts h ON h.id = contracts.host_id + WHERE recent_downtime >= ? AND recent_scan_failures >= ? + `, maxDownTime, minRecentFailures) + if err != nil { + return 0, fmt.Errorf("failed to fetch contracts: %w", err) + } + defer rows.Close() + + var fcids []types.FileContractID + for rows.Next() { + var fcid FileContractID + if err := rows.Scan(&fcid); err != nil { + return 0, fmt.Errorf("failed to scan contract: %w", err) + } + fcids = append(fcids, types.FileContractID(fcid)) + } + + // archive contracts + for _, fcid := range fcids { + if err := ArchiveContract(ctx, tx, fcid, api.ContractArchivalReasonHostPruned); err != nil { + return 0, fmt.Errorf("failed to archive contract %v: %w", fcid, err) + } + } + + // delete hosts + res, err := tx.Exec(ctx, "DELETE FROM hosts WHERE recent_downtime >= ? AND recent_scan_failures >= ?", + maxDownTime, minRecentFailures) + if err != nil { + return 0, fmt.Errorf("failed to delete hosts: %w", err) + } + return res.RowsAffected() +} + func UpdateBucketPolicy(ctx context.Context, tx sql.Tx, bucket string, bp api.BucketPolicy) error { policy, err := json.Marshal(bp) if err != nil { @@ -792,7 +857,7 @@ func SearchHosts(ctx context.Context, tx sql.Tx, autopilot, filterMode, usabilit } } - // filter alowlist/blocklist + // filter allowlist/blocklist switch filterMode { case api.HostFilterModeAllowed: if hasAllowlist { diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 03fe6e76d..431b96843 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -130,6 +130,10 @@ func (tx *MainDatabaseTx) AbortMultipartUpload(ctx context.Context, bucket, path return ssql.AbortMultipartUpload(ctx, tx, bucket, path, uploadID) } +func (tx *MainDatabaseTx) ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error { + return ssql.ArchiveContract(ctx, tx, fcid, reason) +} + func (tx *MainDatabaseTx) Bucket(ctx context.Context, bucket string) (api.Bucket, error) { return ssql.Bucket(ctx, tx, bucket) } @@ -309,6 +313,37 @@ func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contrac return nil } +func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID, limit int64) (int64, error) { + if len(fcids) == 0 { + return 0, nil + } + // prepare args + var args []any + for _, fcid := range fcids { + args = append(args, ssql.FileContractID(fcid)) + } + args = append(args, time.Now().Unix()) + args = append(args, limit) + res, err := tx.Exec(ctx, fmt.Sprintf(` + UPDATE slabs SET health_valid_until = 0 WHERE id in ( + SELECT * + FROM ( + SELECT slabs.id + FROM slabs + INNER JOIN sectors se ON se.db_slab_id = slabs.id + INNER JOIN contract_sectors cs ON cs.db_sector_id = se.id + INNER JOIN contracts c ON c.id = cs.db_contract_id + WHERE c.fcid IN (%s) AND slabs.health_valid_until >= ? + LIMIT ? + ) slab_ids + ) + `, strings.Repeat("?, ", len(fcids)-1)+"?"), args...) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + func (tx *MainDatabaseTx) ListBuckets(ctx context.Context) ([]api.Bucket, error) { return ssql.ListBuckets(ctx, tx) } @@ -410,6 +445,10 @@ func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, e return res.RowsAffected() } +func (tx *MainDatabaseTx) RemoveOfflineHosts(ctx context.Context, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) { + return ssql.RemoveOfflineHosts(ctx, tx, minRecentFailures, maxDownTime) +} + func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, dirID int64, force bool) error { if force { // delete potentially existing object at destination diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 81a74e316..79b1940e7 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -129,6 +129,10 @@ func (tx *MainDatabaseTx) AbortMultipartUpload(ctx context.Context, bucket, path return ssql.AbortMultipartUpload(ctx, tx, bucket, path, uploadID) } +func (tx *MainDatabaseTx) ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error { + return ssql.ArchiveContract(ctx, tx, fcid, reason) +} + func (tx *MainDatabaseTx) Bucket(ctx context.Context, bucket string) (api.Bucket, error) { return ssql.Bucket(ctx, tx, bucket) } @@ -298,6 +302,35 @@ func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contrac return nil } +func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID, limit int64) (int64, error) { + if len(fcids) == 0 { + return 0, nil + } + // prepare args + var args []any + for _, fcid := range fcids { + args = append(args, ssql.FileContractID(fcid)) + } + args = append(args, time.Now().Unix()) + args = append(args, limit) + res, err := tx.Exec(ctx, fmt.Sprintf(` + UPDATE slabs SET health_valid_until = 0 WHERE id in ( + SELECT slabs.id + FROM slabs + INNER JOIN sectors se ON se.db_slab_id = slabs.id + INNER JOIN contract_sectors cs ON cs.db_sector_id = se.id + INNER JOIN contracts c ON c.id = cs.db_contract_id + WHERE c.fcid IN (%s) AND slabs.health_valid_until >= ? + LIMIT ? + ) + `, strings.Repeat("?, ", len(fcids)-1)+"?"), args...) + if err != nil { + fmt.Println(strings.Repeat("?, ", len(fcids)-1) + "?") + return 0, err + } + return res.RowsAffected() +} + func (tx *MainDatabaseTx) ListBuckets(ctx context.Context) ([]api.Bucket, error) { return ssql.ListBuckets(ctx, tx) } @@ -409,6 +442,10 @@ func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, e return res.RowsAffected() } +func (tx *MainDatabaseTx) RemoveOfflineHosts(ctx context.Context, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) { + return ssql.RemoveOfflineHosts(ctx, tx, minRecentFailures, maxDownTime) +} + func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, dirID int64, force bool) error { if force { // delete potentially existing object at destination