Skip to content

Commit

Permalink
stores: migrate slab pruning to raw SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed May 17, 2024
1 parent 1d7fbb5 commit a078c74
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 43 deletions.
89 changes: 51 additions & 38 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
// upsert sectors.
sectorInsertionBatchSize = 500

// slabPruningBatchSize is the number of slabs per batch when we prune
// slabs. We limit this to 100 slabs which is 3000 sectors at default
// redundancy.
slabPruningBatchSize = 100

refreshHealthMinHealthValidity = 12 * time.Hour
refreshHealthMaxHealthValidity = 72 * time.Hour
)
Expand All @@ -53,6 +58,7 @@ const (

var (
pruneSlabsAlertID = frand.Entropy256()
pruneDirsAlertID = frand.Entropy256()
)

var (
Expand Down Expand Up @@ -2837,61 +2843,68 @@ func (s *SQLStore) pruneSlabsLoop() {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second+sumDurations(s.retryTransactionIntervals))
err := s.retryTransaction(ctx, func(tx *gorm.DB) error {
if err := pruneSlabs(tx); err != nil {
return fmt.Errorf("failed to prune slabs: %w", err)
} else if err := pruneDirs(tx); err != nil {
return fmt.Errorf("failed to prune directories: %w", err)
// prune slabs
pruneSuccess := true
for {
var deleted int64
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second+sumDurations(s.retryTransactionIntervals))
err := s.bMain.Transaction(ctx, func(dt sql.DatabaseTx) error {
var err error
deleted, err = dt.PruneSlabs(ctx, slabPruningBatchSize)
return err
})
cancel()
if err != nil {
s.logger.Errorw("slab pruning failed", zap.Error(err))
s.alerts.RegisterAlert(s.shutdownCtx, alerts.Alert{
ID: pruneSlabsAlertID,
Severity: alerts.SeverityWarning,
Message: "Failed to prune slabs",
Timestamp: time.Now(),
Data: map[string]interface{}{
"error": err.Error(),
"hint": "This might happen when your database is under a lot of load due to deleting objects rapidly. This alert will disappear the next time slabs are pruned successfully.",
},
})
pruneSuccess = false
} else {
s.alerts.DismissAlerts(s.shutdownCtx, pruneSlabsAlertID)
}
return nil

if deleted < slabPruningBatchSize {
break // done
}
}

// prune dirs
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second+sumDurations(s.retryTransactionIntervals))
err := s.bMain.Transaction(ctx, func(dt sql.DatabaseTx) error {
return dt.PruneDirs(ctx)
})
cancel()
if err != nil {
s.logger.Errorw("pruning failed", zap.Error(err))
s.logger.Errorw("dir pruning failed", zap.Error(err))
s.alerts.RegisterAlert(s.shutdownCtx, alerts.Alert{
ID: pruneSlabsAlertID,
ID: pruneDirsAlertID,
Severity: alerts.SeverityWarning,
Message: "Failed to prune database",
Message: "Failed to prune dirs",
Timestamp: time.Now(),
Data: map[string]interface{}{
"error": err.Error(),
"hint": "This might happen when your database is under a lot of load due to deleting objects rapidly. This alert will disappear the next time slabs are pruned successfully.",
},
})
pruneSuccess = false
} else {
s.alerts.DismissAlerts(s.shutdownCtx, pruneSlabsAlertID)
s.alerts.DismissAlerts(s.shutdownCtx, pruneDirsAlertID)
}

// mark the last prune time where both slabs and dirs were pruned
if pruneSuccess {
s.mu.Lock()
s.lastPrunedAt = time.Now()
s.mu.Unlock()
}
cancel()
}
}

func pruneSlabs(tx *gorm.DB) error {
return tx.Exec(`
DELETE
FROM slabs
WHERE NOT EXISTS (SELECT 1 FROM slices WHERE slices.db_slab_id = slabs.id)
AND slabs.db_buffered_slab_id IS NULL
`).Error
}

func pruneDirs(tx *gorm.DB) error {
for {
res := tx.Exec(`
DELETE
FROM directories
WHERE directories.id != 1
AND NOT EXISTS (SELECT 1 FROM objects WHERE objects.db_directory_id = directories.id)
AND NOT EXISTS (SELECT 1 FROM (SELECT 1 FROM directories AS d WHERE d.db_parent_id = directories.id) i)
`)
if res.Error != nil {
return res.Error
} else if res.RowsAffected == 0 {
return nil
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -328,7 +329,10 @@ func (s *SQLStore) initSlabPruning() error {
}()

// prune once to guarantee consistency on startup
return s.retryTransaction(s.shutdownCtx, pruneSlabs)
return s.bMain.Transaction(s.shutdownCtx, func(tx sql.DatabaseTx) error {
_, err := tx.PruneSlabs(s.shutdownCtx, math.MaxInt64)
return err
})
}

func (ss *SQLStore) updateHasAllowlist(err *error) {
Expand Down
10 changes: 8 additions & 2 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ type (
DeleteObject(ctx context.Context, bucket, key string) (bool, error)

// DeleteObjects deletes a batch of objects starting with the given
// prefix and returns 'true' if any object was deleted. The deletion
// should prioritize larger objects for a more predictable performance.
// prefix and returns 'true' if any object was deleted.
DeleteObjects(ctx context.Context, bucket, prefix string, limit int64) (bool, error)

// MakeDirsForPath creates all directories for a given object's path.
MakeDirsForPath(ctx context.Context, path string) (uint, error)

// PruneDirs prunes any directories that are empty.
PruneDirs(ctx context.Context) error

// PruneSlabs deletes slabs that are no longer referenced by any slice
// or slab buffer.
PruneSlabs(ctx context.Context, limit int64) (int64, error)

// RenameObject renames an object in the database from keyOld to keyNew
// and the new directory dirID. returns api.ErrObjectExists if the an
// object already exists at the target location or api.ErrObjectNotFound
Expand Down
46 changes: 45 additions & 1 deletion stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (tx *MainDatabaseTx) DeleteObjects(ctx context.Context, bucket string, key
WHERE object_id LIKE ? AND db_bucket_id = (
SELECT id FROM buckets WHERE buckets.name = ?
)
ORDER BY size DESC
LIMIT ?
) AS limited ON o.id = limited.id`,
key+"%", bucket, limit)
Expand Down Expand Up @@ -162,6 +161,51 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (uin
return dirID, nil
}

func (tx *MainDatabaseTx) PruneDirs(ctx context.Context) error {
stmt, err := tx.Prepare(ctx, `
DELETE
FROM directories
WHERE directories.id != 1
AND NOT EXISTS (SELECT 1 FROM objects WHERE objects.db_directory_id = directories.id)
AND NOT EXISTS (SELECT 1 FROM (SELECT 1 FROM directories AS d WHERE d.db_parent_id = directories.id) i)
`)
if err != nil {
return err
}
defer stmt.Close()
for {
res, err := stmt.Exec(ctx)
if err != nil {
return err
} else if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
return nil
}
}
}

func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, error) {
res, err := tx.Exec(ctx, `
DELETE FROM slabs
WHERE id IN (
SELECT id
FROM (
SELECT slabs.id
FROM slabs
WHERE NOT EXISTS (
SELECT 1 FROM slices WHERE slices.db_slab_id = slabs.id
)
AND slabs.db_buffered_slab_id IS NULL
LIMIT ?
) AS limited
)`, limit)
if err != nil {
return 0, err
}
return res.RowsAffected()
}

func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, dirID uint) error {
resp, err := tx.Exec(ctx, `UPDATE objects SET object_id = ?, db_directory_id = ? WHERE object_id = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, keyNew, dirID, keyOld, bucket)
if err != nil && strings.Contains(err.Error(), "Duplicate entry") {
Expand Down
46 changes: 45 additions & 1 deletion stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (tx *MainDatabaseTx) DeleteObjects(ctx context.Context, bucket string, key
WHERE id IN (
SELECT id FROM objects
WHERE object_id LIKE ? AND SUBSTR(object_id, 1, ?) = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)
ORDER BY size DESC
LIMIT ?
)`, key+"%", utf8.RuneCountInString(key), key, bucket, limit)
if err != nil {
Expand Down Expand Up @@ -145,6 +144,51 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (uin
return dirID, nil
}

func (tx *MainDatabaseTx) PruneDirs(ctx context.Context) error {
stmt, err := tx.Prepare(ctx, `
DELETE
FROM directories
WHERE directories.id != 1
AND NOT EXISTS (SELECT 1 FROM objects WHERE objects.db_directory_id = directories.id)
AND NOT EXISTS (SELECT 1 FROM (SELECT 1 FROM directories AS d WHERE d.db_parent_id = directories.id) i)
`)
if err != nil {
return err
}
defer stmt.Close()
for {
res, err := stmt.Exec(ctx)
if err != nil {
return err
} else if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
return nil
}
}
}

func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, error) {
res, err := tx.Exec(ctx, `
DELETE FROM slabs
WHERE id IN (
SELECT id
FROM (
SELECT slabs.id
FROM slabs
WHERE NOT EXISTS (
SELECT 1 FROM slices WHERE slices.db_slab_id = slabs.id
)
AND slabs.db_buffered_slab_id IS NULL
LIMIT ?
) AS limited
)`, limit)
if err != nil {
return 0, err
}
return res.RowsAffected()
}

func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, dirID uint) error {
resp, err := tx.Exec(ctx, `UPDATE objects SET object_id = ?, db_directory_id = ? WHERE object_id = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, keyNew, dirID, keyOld, bucket)
if err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") {
Expand Down

0 comments on commit a078c74

Please sign in to comment.