Skip to content

Commit

Permalink
sql: migrate invalidateslabhealthbyfcid and archivecontracts to raw sql
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Jun 7, 2024
1 parent 002934a commit 9d4d9ab
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 137 deletions.
2 changes: 1 addition & 1 deletion stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (ss *SQLStore) RemoveOfflineHosts(ctx context.Context, minRecentFailures ui
}

// archive host contracts
if err := archiveContracts(tx, hcs, toArchive); err != nil {
if err := archiveContractsGorm(tx, hcs, toArchive); err != nil {
return err
}

Expand Down
216 changes: 81 additions & 135 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,19 +667,19 @@ func (s *SQLStore) ArchiveContracts(ctx context.Context, toArchive map[types.Fil
ids = append(ids, id)
}

// fetch contracts
cs, err := contracts(s.db, ids)
if err != nil {
return err
// invalidate health of related sectors before archiving the contracts
// NOTE: even if this is not done in the same transaction it won't have any
// lasting negative effects.
if err := s.invalidateSlabHealthByFCID(ctx, ids); err != nil {
return fmt.Errorf("ArchiveContracts: failed to invalidate slab health: %w", err)
}

// archive them
if err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
return archiveContracts(tx, cs, toArchive)
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return archiveContracts(ctx, tx, toArchive)
}); err != nil {
return err
return fmt.Errorf("ArchiveContracts: failed to archive contracts: %w", err)
}

return nil
}

Expand Down Expand Up @@ -827,7 +827,7 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds
wanted[fileContractID(fcid)] = struct{}{}
}

var diff []fileContractID
var diff []types.FileContractID
var nContractsAfter int
err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
// fetch contract set
Expand Down Expand Up @@ -856,14 +856,14 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds
// add removals to the diff
for _, contract := range cs.Contracts {
if _, ok := wanted[contract.FCID]; !ok {
diff = append(diff, contract.FCID)
diff = append(diff, types.FileContractID(contract.FCID))
}
delete(wanted, contract.FCID)
}

// add additions to the diff
for fcid := range wanted {
diff = append(diff, fcid)
diff = append(diff, types.FileContractID(fcid))
}

// update the association
Expand Down Expand Up @@ -1499,33 +1499,51 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
}

func (s *SQLStore) RemoveObject(ctx context.Context, bucket, path string) error {
var rowsAffected int64
var err error
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
rowsAffected, err = s.deleteObject(tx, bucket, path)
if err != nil {
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
}
return nil
var prune bool
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
prune, err = tx.DeleteObject(ctx, bucket, path)
return
})
if err != nil {
return err
}
if rowsAffected == 0 {
return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, path)
return fmt.Errorf("RemoveObject: failed to delete object: %w", err)
} else if prune {
// trigger pruning if we deleted an object
s.triggerSlabPruning()
}
return nil
}

func (s *SQLStore) RemoveObjects(ctx context.Context, bucket, prefix string) error {
var err error
deleted, err := s.deleteObjects(ctx, bucket, prefix)
if err != nil {
return err
var prune bool
batchSizeIdx := 0
for {
start := time.Now()
done := false
var duration time.Duration
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
deleted, err := tx.DeleteObjects(ctx, bucket, prefix, objectDeleteBatchSizes[batchSizeIdx])
if err != nil {
return err
}
prune = prune || deleted
done = !deleted
return nil
}); err != nil {
return fmt.Errorf("failed to delete objects: %w", err)
} else if done {
break // nothing more to delete
}
duration = time.Since(start)

// increase the batch size if deletion was faster than the threshold
if duration < batchDurationThreshold && batchSizeIdx < len(objectDeleteBatchSizes)-1 {
batchSizeIdx++
}
}
if !deleted {
if !prune {
return fmt.Errorf("%w: prefix: %s", api.ErrObjectNotFound, prefix)
}
s.triggerSlabPruning()
return nil
}

Expand Down Expand Up @@ -1981,23 +1999,6 @@ func contract(tx *gorm.DB, id fileContractID) (contract dbContract, err error) {
return
}

// contracts retrieves all contracts for the given ids from the store.
func contracts(tx *gorm.DB, ids []types.FileContractID) (dbContracts []dbContract, err error) {
fcids := make([]fileContractID, len(ids))
for i, fcid := range ids {
fcids[i] = fileContractID(fcid)
}

// fetch contracts
err = tx.
Model(&dbContract{}).
Where("fcid IN (?)", fcids).
Joins("Host").
Find(&dbContracts).
Error
return
}

// contractsForHost retrieves all contracts for the given host
func contractsForHost(tx *gorm.DB, host dbHost) (contracts []dbContract, err error) {
err = tx.
Expand Down Expand Up @@ -2064,14 +2065,23 @@ func addContract(tx *gorm.DB, c rhpv2.ContractRevision, contractPrice, totalCost
// archival reason
//
// NOTE: this function archives the contracts without setting a renewed ID
func archiveContracts(tx *gorm.DB, contracts []dbContract, toArchive map[types.FileContractID]string) error {
func archiveContracts(ctx context.Context, tx sql.DatabaseTx, toArchive map[types.FileContractID]string) error {
for fcid, reason := range toArchive {
if err := tx.ArchiveContract(ctx, fcid, reason); err != nil {
return fmt.Errorf("failed to archive contract %v: %w", fcid, err)
}
}
return nil
}

func archiveContractsGorm(tx *gorm.DB, contracts []dbContract, toArchive map[types.FileContractID]string) error {
var toInvalidate []fileContractID
for _, contract := range contracts {
toInvalidate = append(toInvalidate, contract.FCID)
}
// Invalidate the health on the slabs before deleting the contracts to avoid
// breaking the relations beforehand.
if err := invalidateSlabHealthByFCID(tx, toInvalidate); err != nil {
if err := invalidateSlabHealthByFCIDGorm(tx, toInvalidate); err != nil {
return fmt.Errorf("invalidating slab health failed: %w", err)
}
for _, contract := range contracts {
Expand Down Expand Up @@ -2178,94 +2188,41 @@ func (s *SQLStore) triggerSlabPruning() {
}
}

// deleteObject deletes an object from the store and prunes all slabs which are
// without an obect after the deletion. That means in case of packed uploads,
// the slab is only deleted when no more objects point to it.
func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, error) {
// check if the object exists first to avoid unnecessary locking for the
// common case
var objID uint
resp := tx.Model(&dbObject{}).
Where("object_id = ? AND ?", path, sqlWhereBucket("objects", bucket)).
Select("id").
Limit(1).
Scan(&objID)
if err := resp.Error; err != nil {
return 0, err
} else if resp.RowsAffected == 0 {
return 0, nil
}

tx = tx.Where("id", objID).
Delete(&dbObject{})
if tx.Error != nil {
return 0, tx.Error
}
numDeleted := tx.RowsAffected
if numDeleted == 0 {
return 0, nil // nothing to prune if no object was deleted
}
s.triggerSlabPruning()
return numDeleted, nil
}

// deleteObjects deletes a batch of objects from the database. The order of
// deletion goes from largest to smallest. That's because the batch size is
// dynamically increased and the smaller objects get the faster we can delete
// them meaning it makes sense to increase the batch size over time.
func (s *SQLStore) deleteObjects(ctx context.Context, bucket string, path string) (deleted bool, _ error) {
batchSizeIdx := 0
func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID) error {
for {
start := time.Now()
done := false
var duration time.Duration
if err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
d, err := tx.DeleteObjects(ctx, bucket, path, objectDeleteBatchSizes[batchSizeIdx])
if err != nil {
return err
}
deleted = deleted || d
done = !d
return nil
}); err != nil {
return false, fmt.Errorf("failed to delete objects: %w", err)
}
if done {
break // nothing more to delete
}
duration = time.Since(start)

// increase the batch size if deletion was faster than the threshold
if duration < batchDurationThreshold && batchSizeIdx < len(objectDeleteBatchSizes)-1 {
batchSizeIdx++
var affected int64
err := s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) (err error) {
affected, err = tx.InvalidateSlabHealthByFCID(ctx, fcids, refreshHealthBatchSize)
return
})
if err != nil {
return fmt.Errorf("failed to invalidate slab health: %w", err)
} else if affected < refreshHealthBatchSize {
return nil // done
}
time.Sleep(time.Second)
}
if deleted {
s.triggerSlabPruning()
}
return deleted, nil
}

func invalidateSlabHealthByFCID(tx *gorm.DB, fcids []fileContractID) error {
func invalidateSlabHealthByFCIDGorm(tx *gorm.DB, fcids []fileContractID) error {
if len(fcids) == 0 {
return nil
}

for {
now := time.Now().Unix()
if resp := tx.Exec(`
UPDATE slabs SET health_valid_until = ? WHERE id in (
SELECT *
FROM (
SELECT slabs.id
FROM slabs
INNER JOIN sectors se ON se.db_slab_id = slabs.id
INNER JOIN contract_sectors cs ON cs.db_sector_id = se.id
INNER JOIN contracts c ON c.id = cs.db_contract_id
WHERE c.fcid IN (?) AND slabs.health_valid_until >= ?
LIMIT ?
) slab_ids
)`, now, fcids, now, refreshHealthBatchSize); resp.Error != nil {
UPDATE slabs SET health_valid_until = ? WHERE id in (
SELECT *
FROM (
SELECT slabs.id
FROM slabs
INNER JOIN sectors se ON se.db_slab_id = slabs.id
INNER JOIN contract_sectors cs ON cs.db_sector_id = se.id
INNER JOIN contracts c ON c.id = cs.db_contract_id
WHERE c.fcid IN (?) AND slabs.health_valid_until >= ?
LIMIT ?
) slab_ids
)`, now, fcids, now, refreshHealthBatchSize); resp.Error != nil {
return fmt.Errorf("failed to invalidate slab health: %w", resp.Error)
} else if resp.RowsAffected < refreshHealthBatchSize {
break // done
Expand All @@ -2275,17 +2232,6 @@ func invalidateSlabHealthByFCID(tx *gorm.DB, fcids []fileContractID) error {
return nil
}

func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []fileContractID) error {
return s.retryTransaction(ctx, func(tx *gorm.DB) error {
return invalidateSlabHealthByFCID(tx, fcids)
})
}

// nolint:unparam
func sqlWhereBucket(objTable string, bucket string) clause.Expr {
return gorm.Expr(fmt.Sprintf("%s.db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)", objTable), bucket)
}

// TODO: we can use ObjectEntries instead of ListObject if we want to use '/' as
// a delimiter for now (see backend.go) but it would be interesting to have
// arbitrary 'delim' support in ListObjects.
Expand Down
2 changes: 1 addition & 1 deletion stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ func TestUnhealthySlabsNoContracts(t *testing.T) {

// delete the sector - we manually invalidate the slabs for the contract
// before deletion.
err = invalidateSlabHealthByFCID(ss.db, []fileContractID{fileContractID(fcid1)})
err = invalidateSlabHealthByFCIDGorm(ss.db, []fileContractID{fileContractID(fcid1)})
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 8 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type (
// AddMultipartPart adds a part to an unfinished multipart upload.
AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices object.SlabSlices) error

// ArchiveContract moves a contract from the regular contracts to the
// archived ones.
ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error

// Bucket returns the bucket with the given name. If the bucket doesn't
// exist, it returns api.ErrBucketNotFound.
Bucket(ctx context.Context, bucket string) (api.Bucket, error)
Expand Down Expand Up @@ -61,6 +65,10 @@ type (
// unique upload ID.
InsertMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error)

// InvalidateSlabHealthByFCID invalidates the health of all slabs that
// are associated with any of the provided contracts.
InvalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID, limit int64) (int64, error)

// DeleteBucket deletes a bucket. If the bucket isn't empty, it returns
// api.ErrBucketNotEmpty. If the bucket doesn't exist, it returns
// api.ErrBucketNotFound.
Expand Down
27 changes: 27 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ func AbortMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, up
return errors.New("failed to delete multipart upload for unknown reason")
}

func ArchiveContract(ctx context.Context, tx sql.Tx, fcid types.FileContractID, reason string) error {
_, err := tx.Exec(ctx, `
INSERT INTO archived_contracts (created_at, fcid, renewed_from, contract_price, state, total_cost,
proof_height, revision_height, revision_number, size, start_height, window_start, window_end,
upload_spending, download_spending, fund_account_spending, delete_spending, list_spending, renewed_to,
host, reason)
SELECT ?, fcid, renewed_from, contract_price, state, total_cost, proof_height, revision_height, revision_number,
size, start_height, window_start, window_end, upload_spending, download_spending, fund_account_spending,
delete_spending, list_spending, renewed_to, h.public_key, ?
FROM contracts c
INNER JOIN hosts h ON h.id = c.host_id
WHERE fcid = ?
`, time.Now(), reason, FileContractID(fcid))
if err != nil {
return fmt.Errorf("failed to copy contract to archived_contracts: %w", err)
}
res, err := tx.Exec(ctx, "DELETE FROM contracts WHERE fcid = ?", fcid)
if err != nil {
return fmt.Errorf("failed to delete contract from contracts: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to fetch rows affected: %w", err)
} else if n == 0 {
return fmt.Errorf("expected to delete 1 row, deleted %d", n)
}
return nil
}

func Bucket(ctx context.Context, tx sql.Tx, bucket string) (api.Bucket, error) {
b, err := scanBucket(tx.QueryRow(ctx, "SELECT created_at, name, COALESCE(policy, '{}') FROM buckets WHERE name = ?", bucket))
if err != nil {
Expand Down
Loading

0 comments on commit 9d4d9ab

Please sign in to comment.