Skip to content

Commit

Permalink
Migrate SlabBuffers and InsertSlabBuffer to raw SQL (#1318)
Browse files Browse the repository at this point in the history
Co-authored-by: Peter-Jan Brone <peterjan.brone@gmail.com>
  • Loading branch information
ChrisSchinnerl and peterjan committed Jun 20, 2024
1 parent b65d0c0 commit dd35323
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 39 deletions.
31 changes: 12 additions & 19 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,21 +544,16 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts)
}

func (s *SQLStore) SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error) {
// Slab buffer info from the database.
var bufferedSlabs []dbBufferedSlab
err := s.db.Model(&dbBufferedSlab{}).
Joins("DBSlab").
Joins("DBSlab.DBContractSet").
Find(&bufferedSlabs).
Error
var err error
var fileNameToContractSet map[string]string
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
fileNameToContractSet, err = tx.SlabBuffers(ctx)
return err
})
if err != nil {
return nil, err
}
// Translate buffers to contract set.
fileNameToContractSet := make(map[string]string)
for _, slab := range bufferedSlabs {
fileNameToContractSet[slab.Filename] = slab.DBSlab.DBContractSet.Name
return nil, fmt.Errorf("failed to fetch slab buffers: %w", err)
}

// Fetch in-memory buffer info and fill in contract set name.
buffers := s.slabBufferMgr.SlabBuffers()
for i := range buffers {
Expand Down Expand Up @@ -1869,14 +1864,12 @@ func (s *SQLStore) markPackedSlabUploaded(tx *gorm.DB, slab api.UploadedPackedSl
}

// delete buffer
var buffer dbBufferedSlab
if err := tx.Take(&buffer, "id = ?", slab.BufferID).Error; err != nil {
var fileName string
if err := tx.Raw("SELECT filename FROM buffered_slabs WHERE id = ?", slab.BufferID).
Scan(&fileName).Error; err != nil {
return "", err
}
fileName := buffer.Filename
err = tx.Delete(&buffer).
Error
if err != nil {
if err := tx.Exec("DELETE FROM buffered_slabs WHERE id = ?", slab.BufferID).Error; err != nil {
return "", err
}

Expand Down
30 changes: 10 additions & 20 deletions stores/slabbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"gorm.io/gorm"
sql "go.sia.tech/renterd/stores/sql"
"lukechampine.com/frand"
)

Expand Down Expand Up @@ -204,8 +204,8 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m
// If there is still data left, create a new buffer.
if len(data) > 0 {
var sb *SlabBuffer
err = mgr.s.retryTransaction(ctx, func(tx *gorm.DB) error {
sb, err = createSlabBuffer(tx, contractSet, mgr.dir, minShards, totalShards)
err = mgr.s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
sb, err = createSlabBuffer(ctx, tx, contractSet, mgr.dir, minShards, totalShards)
return err
})
if err != nil {
Expand Down Expand Up @@ -472,31 +472,21 @@ func bufferedSlabSize(minShards uint8) int {
return int(rhpv2.SectorSize) * int(minShards)
}

func createSlabBuffer(tx *gorm.DB, contractSetID uint, dir string, minShards, totalShards uint8) (*SlabBuffer, error) {
ec := object.GenerateEncryptionKey()
key, err := ec.MarshalBinary()
if err != nil {
return nil, err
}
func createSlabBuffer(ctx context.Context, tx sql.DatabaseTx, contractSetID uint, dir string, minShards, totalShards uint8) (*SlabBuffer, error) {
// Create a new buffer and slab.
fileName := bufferFilename(contractSetID, minShards, totalShards)
file, err := os.Create(filepath.Join(dir, fileName))
if err != nil {
return nil, err
}
createdSlab := dbBufferedSlab{
DBSlab: dbSlab{
DBContractSetID: contractSetID,
Key: key,
MinShards: minShards,
TotalShards: totalShards,
},
Filename: fileName,

ec := object.GenerateEncryptionKey()
bufferedSlabID, err := tx.InsertBufferedSlab(ctx, fileName, int64(contractSetID), ec, minShards, totalShards)
if err != nil {
return nil, fmt.Errorf("failed to insert buffered slab: %w", err)
}
err = tx.Create(&createdSlab).
Error
return &SlabBuffer{
dbID: createdSlab.ID,
dbID: uint(bufferedSlabID),
filename: fileName,
slabKey: ec,
maxSize: int64(bufferedSlabSize(minShards)),
Expand Down
10 changes: 10 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ type (
// contains the root, latest_host is updated to that host.
DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error)

// InsertBufferedSlab inserts a buffered slab into the database. This
// includes the creation of a buffered slab as well as the corresponding
// regular slab it is linked to. It returns the ID of the buffered slab
// that was created.
InsertBufferedSlab(ctx context.Context, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error)

// InsertMultipartUpload creates a new multipart upload and returns a
// unique upload ID.
InsertMultipartUpload(ctx context.Context, bucket, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error)
Expand Down Expand Up @@ -190,6 +196,10 @@ type (
// 'false' and also marks them as requiring a resync.
SetUncleanShutdown(ctx context.Context) error

// SlabBuffers returns the filenames and associated contract sets of all
// slab buffers.
SlabBuffers(ctx context.Context) (map[string]string, error)

// UpdateAutopilot updates the autopilot with the provided one or
// creates a new one if it doesn't exist yet.
UpdateAutopilot(ctx context.Context, ap api.Autopilot) error
Expand Down
50 changes: 50 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,32 @@ func HostsForScanning(ctx context.Context, tx sql.Tx, maxLastScan time.Time, off
return hosts, nil
}

func InsertBufferedSlab(ctx context.Context, tx sql.Tx, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) {
// insert buffered slab
res, err := tx.Exec(ctx, `INSERT INTO buffered_slabs (created_at, filename) VALUES (?, ?)`,
time.Now(), fileName)
if err != nil {
return 0, fmt.Errorf("failed to insert buffered slab: %w", err)
}
bufferedSlabID, err := res.LastInsertId()
if err != nil {
return 0, fmt.Errorf("failed to fetch buffered slab id: %w", err)
}

key, err := ec.MarshalBinary()
if err != nil {
return 0, err
}
_, err = tx.Exec(ctx, `
INSERT INTO slabs (created_at, db_contract_set_id, db_buffered_slab_id, `+"`key`"+`, min_shards, total_shards)
VALUES (?, ?, ?, ?, ?, ?)`,
time.Now(), contractSetID, bufferedSlabID, SecretKey(key), minShards, totalShards)
if err != nil {
return 0, fmt.Errorf("failed to insert slab: %w", err)
}
return bufferedSlabID, nil
}

func InsertMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) {
// fetch bucket id
var bucketID int64
Expand Down Expand Up @@ -1327,6 +1353,30 @@ func SetUncleanShutdown(ctx context.Context, tx sql.Tx) error {
return err
}

func SlabBuffers(ctx context.Context, tx sql.Tx) (map[string]string, error) {
rows, err := tx.Query(ctx, `
SELECT buffered_slabs.filename, cs.name
FROM buffered_slabs
INNER JOIN slabs sla ON sla.db_buffered_slab_id = buffered_slabs.id
INNER JOIN contract_sets cs ON cs.id = sla.db_contract_set_id
`)
if err != nil {
return nil, fmt.Errorf("failed to fetch contract sets")
}
defer rows.Close()

fileNameToContractSet := make(map[string]string)
for rows.Next() {
var fileName string
var contractSetName string
if err := rows.Scan(&fileName, &contractSetName); err != nil {
return nil, fmt.Errorf("failed to scan contract set: %w", err)
}
fileNameToContractSet[fileName] = contractSetName
}
return fileNameToContractSet, nil
}

func UpdateBucketPolicy(ctx context.Context, tx sql.Tx, bucket string, bp api.BucketPolicy) error {
policy, err := json.Marshal(bp)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func (tx *MainDatabaseTx) DeleteHostSector(ctx context.Context, hk types.PublicK
return ssql.DeleteHostSector(ctx, tx, hk, root)
}

func (tx *MainDatabaseTx) InsertBufferedSlab(ctx context.Context, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) {
return ssql.InsertBufferedSlab(ctx, tx, fileName, contractSetID, ec, minShards, totalShards)
}

func (tx *MainDatabaseTx) InsertMultipartUpload(ctx context.Context, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) {
return ssql.InsertMultipartUpload(ctx, tx, bucket, key, ec, mimeType, metadata)
}
Expand Down Expand Up @@ -597,6 +601,10 @@ func (tx *MainDatabaseTx) SetUncleanShutdown(ctx context.Context) error {
return ssql.SetUncleanShutdown(ctx, tx)
}

func (tx *MainDatabaseTx) SlabBuffers(ctx context.Context) (map[string]string, error) {
return ssql.SlabBuffers(ctx, tx)
}

func (tx *MainDatabaseTx) UpdateAutopilot(ctx context.Context, ap api.Autopilot) error {
res, err := tx.Exec(ctx, `
INSERT INTO autopilots (created_at, identifier, config, current_period)
Expand Down
8 changes: 8 additions & 0 deletions stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ func (tx *MainDatabaseTx) DeleteHostSector(ctx context.Context, hk types.PublicK
return ssql.DeleteHostSector(ctx, tx, hk, root)
}

func (tx *MainDatabaseTx) InsertBufferedSlab(ctx context.Context, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) {
return ssql.InsertBufferedSlab(ctx, tx, fileName, contractSetID, ec, minShards, totalShards)
}

func (tx *MainDatabaseTx) InsertMultipartUpload(ctx context.Context, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) {
return ssql.InsertMultipartUpload(ctx, tx, bucket, key, ec, mimeType, metadata)
}
Expand Down Expand Up @@ -595,6 +599,10 @@ func (tx *MainDatabaseTx) SetUncleanShutdown(ctx context.Context) error {
return ssql.SetUncleanShutdown(ctx, tx)
}

func (tx *MainDatabaseTx) SlabBuffers(ctx context.Context) (map[string]string, error) {
return ssql.SlabBuffers(ctx, tx)
}

func (tx *MainDatabaseTx) UpdateAutopilot(ctx context.Context, ap api.Autopilot) error {
res, err := tx.Exec(ctx, `
INSERT INTO autopilots (created_at, identifier, config, current_period)
Expand Down

0 comments on commit dd35323

Please sign in to comment.