From 1a21df03fce7ba3442698c4ac2fa26db054f2fbb Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 28 May 2024 16:26:17 +0200 Subject: [PATCH 1/2] sql: add CopyObject to interface --- stores/sql/database.go | 5 +++++ stores/sql/main.go | 15 +++++++++++++++ stores/sql/mysql/main.go | 18 +++++++----------- stores/sql/sqlite/main.go | 18 +++++++----------- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/stores/sql/database.go b/stores/sql/database.go index 830b1961c..baf3f7d36 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -34,6 +34,11 @@ type ( // opts argument can be used to filter the result. Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) + // CopyObject copies an object from one bucket and key to another. If + // src and dst are the same, only the metadata and mimeType are + // overwritten with the provided ones. + CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) + // CreateBucket creates a new bucket with the given name and policy. If // the bucket already exists, api.ErrBucketExists is returned. CreateBucket(ctx context.Context, bucket string, policy api.BucketPolicy) error diff --git a/stores/sql/main.go b/stores/sql/main.go index a0e0a84a2..f23d45ab0 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -103,6 +103,21 @@ func Contracts(ctx context.Context, tx sql.Tx, opts api.ContractsOpts) ([]api.Co return contracts, nil } +func InsertMetadata(ctx context.Context, tx sql.Tx, objID int64, md api.ObjectUserMetadata) error { + insertMetadataStmt, err := tx.Prepare(ctx, "INSERT INTO object_user_metadata (created_at, db_object_id, `key`, value) VALUES (?, ?, ?, ?)") + if err != nil { + return fmt.Errorf("failed to prepare statement to insert object metadata: %w", err) + } + defer insertMetadataStmt.Close() + + for k, v := range md { + if _, err := insertMetadataStmt.Exec(ctx, time.Now(), objID, k, v); err != nil { + return fmt.Errorf("failed to insert object metadata: %w", err) + } + } + return nil +} + func DeleteBucket(ctx context.Context, tx sql.Tx, bucket string) error { var id int64 err := tx.QueryRow(ctx, "SELECT id FROM buckets WHERE name = ?", bucket).Scan(&id) diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 19d90c652..1642a3450 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -90,6 +90,10 @@ func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts) return ssql.Contracts(ctx, tx, opts) } +func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) { + panic("implement me") +} + func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error { policy, err := json.Marshal(bp) if err != nil { @@ -314,17 +318,9 @@ func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contrac return err } - // update metadata - insertMetadataStmt, err := tx.Prepare(ctx, "INSERT INTO object_user_metadata (created_at, db_object_id, `key`, value) VALUES (?, ?, ?, ?)") - if err != nil { - return fmt.Errorf("failed to prepare statement to insert object metadata: %w", err) - } - defer insertMetadataStmt.Close() - - for k, v := range md { - if _, err := insertMetadataStmt.Exec(ctx, time.Now(), objID, k, v); err != nil { - return fmt.Errorf("failed to insert object metadata: %w", err) - } + // insert metadata + if err := ssql.InsertMetadata(ctx, tx, objID, md); err != nil { + return fmt.Errorf("failed to insert object metadata: %w", err) } return nil } diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 1a3cb715f..369455978 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -89,6 +89,10 @@ func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts) return ssql.Contracts(ctx, tx, opts) } +func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) { + panic("implement me") +} + func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error { policy, err := json.Marshal(bp) if err != nil { @@ -295,17 +299,9 @@ func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contrac return err } - // update metadata - insertMetadataStmt, err := tx.Prepare(ctx, "INSERT INTO object_user_metadata (created_at, db_object_id, key, value) VALUES (?, ?, ?, ?)") - if err != nil { - return fmt.Errorf("failed to prepare statement to insert object metadata: %w", err) - } - defer insertMetadataStmt.Close() - - for k, v := range md { - if _, err := insertMetadataStmt.Exec(ctx, time.Now(), objID, k, v); err != nil { - return fmt.Errorf("failed to insert object metadata: %w", err) - } + // insert metadata + if err := ssql.InsertMetadata(ctx, tx, objID, md); err != nil { + return fmt.Errorf("failed to insert object metadata: %w", err) } return nil } From b407f0eb33e8e6dd6a3484df3bf13389508df21c Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 28 May 2024 17:23:08 +0200 Subject: [PATCH 2/2] sql: implement CopyObject --- stores/metadata.go | 93 ++-------------------------------- stores/sql/main.go | 102 ++++++++++++++++++++++++++++++++++++++ stores/sql/mysql/main.go | 2 +- stores/sql/sqlite/main.go | 2 +- 4 files changed, 108 insertions(+), 91 deletions(-) diff --git a/stores/metadata.go b/stores/metadata.go index 365319cd7..abcc15816 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1430,87 +1430,15 @@ func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, t } func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (om api.ObjectMetadata, err error) { - err = s.retryTransaction(ctx, func(tx *gorm.DB) error { + err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { if srcBucket != dstBucket || srcPath != dstPath { - _, err = s.deleteObject(tx, dstBucket, dstPath) + _, err = tx.DeleteObject(ctx, dstBucket, dstPath) if err != nil { return fmt.Errorf("CopyObject: failed to delete object: %w", err) } } - - var srcObj dbObject - err = tx.Where("objects.object_id = ? AND DBBucket.name = ?", srcPath, srcBucket). - Joins("DBBucket"). - Take(&srcObj). - Error - if err != nil { - return fmt.Errorf("failed to fetch src object: %w", err) - } - - if srcBucket == dstBucket && srcPath == dstPath { - // No copying is happening. We just update the metadata on the src - // object. - srcObj.MimeType = mimeType - om = newObjectMetadata( - srcObj.ObjectID, - srcObj.Etag, - srcObj.MimeType, - srcObj.Health, - srcObj.CreatedAt, - srcObj.Size, - ) - if err := s.updateUserMetadata(tx, srcObj.ID, metadata); err != nil { - return fmt.Errorf("failed to update user metadata: %w", err) - } - return tx.Save(&srcObj).Error - } - - var srcSlices []dbSlice - err = tx.Where("db_object_id = ?", srcObj.ID). - Find(&srcSlices). - Error - if err != nil { - return fmt.Errorf("failed to fetch src slices: %w", err) - } - for i := range srcSlices { - srcSlices[i].Model = Model{} // clear model - srcSlices[i].DBObjectID = nil // clear object id - } - - var bucket dbBucket - err = tx.Where("name = ?", dstBucket). - Take(&bucket). - Error - if err != nil { - return fmt.Errorf("failed to fetch dst bucket: %w", err) - } - - dstObj := srcObj - dstObj.Model = Model{} // clear model - dstObj.DBBucket = bucket // set dst bucket - dstObj.ObjectID = dstPath // set dst path - dstObj.DBBucketID = bucket.ID // set dst bucket id - dstObj.Slabs = srcSlices // set slices - if mimeType != "" { - dstObj.MimeType = mimeType // override mime type - } - if err := tx.Create(&dstObj).Error; err != nil { - return fmt.Errorf("failed to create copy of object: %w", err) - } - - if err := s.createUserMetadata(tx, dstObj.ID, metadata); err != nil { - return fmt.Errorf("failed to create object metadata: %w", err) - } - - om = newObjectMetadata( - dstObj.ObjectID, - dstObj.Etag, - dstObj.MimeType, - dstObj.Health, - dstObj.CreatedAt, - dstObj.Size, - ) - return nil + om, err = tx.CopyObject(ctx, srcBucket, dstBucket, srcPath, dstPath, mimeType, metadata) + return err }) return } @@ -1912,19 +1840,6 @@ func (s *SQLStore) createMultipartMetadata(tx *gorm.DB, multipartUploadID uint, return tx.CreateInBatches(&entities, 1000).Error } -func (s *SQLStore) updateUserMetadata(tx *gorm.DB, objID uint, metadata api.ObjectUserMetadata) error { - // delete all existing metadata - err := tx. - Where("db_object_id = ?", objID). - Delete(&dbObjectUserMetadata{}). - Error - if err != nil { - return err - } - - return s.createUserMetadata(tx, objID, metadata) -} - func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice) error { if (objID == nil && multiPartID == nil) || (objID != nil && multiPartID != nil) { return fmt.Errorf("either objID or multiPartID must be set") diff --git a/stores/sql/main.go b/stores/sql/main.go index f23d45ab0..4c24e3344 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -103,6 +103,108 @@ func Contracts(ctx context.Context, tx sql.Tx, opts api.ContractsOpts) ([]api.Co return contracts, nil } +func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) { + // stmt to fetch bucket id + bucketIDStmt, err := tx.Prepare(ctx, "SELECT id FROM buckets WHERE name = ?") + if err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to prepare statement to fetch bucket id: %w", err) + } + defer bucketIDStmt.Close() + + // fetch source bucket + var srcBID int64 + err = bucketIDStmt.QueryRow(ctx, srcBucket).Scan(&srcBID) + if errors.Is(err, dsql.ErrNoRows) { + return api.ObjectMetadata{}, fmt.Errorf("%w: source bucket", api.ErrBucketNotFound) + } else if err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to fetch src bucket id: %w", err) + } + + // fetch src object id + var srcObjID int64 + err = tx.QueryRow(ctx, "SELECT id FROM objects WHERE db_bucket_id = ? AND object_id = ?", srcBID, srcKey). + Scan(&srcObjID) + if errors.Is(err, dsql.ErrNoRows) { + return api.ObjectMetadata{}, api.ErrObjectNotFound + } else if err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to fetch object id: %w", err) + } + + // helper to fetch metadata + fetchMetadata := func(objID int64) (om api.ObjectMetadata, err error) { + err = tx.QueryRow(ctx, "SELECT etag, health, created_at, object_id, size, mime_type FROM objects WHERE id = ?", objID). + Scan(&om.ETag, &om.Health, (*time.Time)(&om.ModTime), &om.Name, &om.Size, &om.MimeType) + if err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to fetch new object: %w", err) + } + return om, nil + } + + if srcBucket == dstBucket && srcKey == dstKey { + // No copying is happening. We just update the metadata on the src + // object. + if _, err := tx.Exec(ctx, "UPDATE objects SET mime_type = ? WHERE id = ?", mimeType, srcObjID); err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to update mime type: %w", err) + } else if err := UpdateMetadata(ctx, tx, srcObjID, metadata); err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to update metadata: %w", err) + } + return fetchMetadata(srcObjID) + } + + // fetch destination bucket + var dstBID int64 + err = bucketIDStmt.QueryRow(ctx, dstBucket).Scan(&dstBID) + if errors.Is(err, dsql.ErrNoRows) { + return api.ObjectMetadata{}, fmt.Errorf("%w: destination bucket", api.ErrBucketNotFound) + } else if err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to fetch dest bucket id: %w", err) + } + + // copy object + res, err := tx.Exec(ctx, `INSERT INTO objects (created_at, object_id, db_directory_id, db_bucket_id,`+"`key`"+`, size, mime_type, etag) + SELECT ?, ?, db_directory_id, ?, `+"`key`"+`, size, ?, etag + FROM objects + WHERE id = ?`, time.Now(), dstKey, dstBID, mimeType, srcObjID) + if err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to insert object: %w", err) + } + dstObjID, err := res.LastInsertId() + if err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to fetch object id: %w", err) + } + + // copy slices + _, err = tx.Exec(ctx, `INSERT INTO slices (created_at, db_object_id, object_index, db_slab_id, offset, length) + SELECT ?, ?, object_index, db_slab_id, offset, length + FROM slices + WHERE db_object_id = ?`, time.Now(), dstObjID, srcObjID) + if err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to copy slices: %w", err) + } + + // create metadata + if err := InsertMetadata(ctx, tx, dstObjID, metadata); err != nil { + return api.ObjectMetadata{}, fmt.Errorf("failed to insert metadata: %w", err) + } + + // fetch copied object + return fetchMetadata(dstObjID) +} + +func UpdateMetadata(ctx context.Context, tx sql.Tx, objID int64, md api.ObjectUserMetadata) error { + if err := DeleteMetadata(ctx, tx, objID); err != nil { + return err + } else if err := InsertMetadata(ctx, tx, objID, md); err != nil { + return err + } + return nil +} + +func DeleteMetadata(ctx context.Context, tx sql.Tx, objID int64) error { + _, err := tx.Exec(ctx, "DELETE FROM object_user_metadata WHERE db_object_id = ?", objID) + return err +} + func InsertMetadata(ctx context.Context, tx sql.Tx, objID int64, md api.ObjectUserMetadata) error { insertMetadataStmt, err := tx.Prepare(ctx, "INSERT INTO object_user_metadata (created_at, db_object_id, `key`, value) VALUES (?, ?, ?, ?)") if err != nil { diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 1642a3450..c68d8c2e8 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -91,7 +91,7 @@ func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts) } func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) { - panic("implement me") + return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata) } func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error { diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 369455978..3377a0ae6 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -90,7 +90,7 @@ func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts) } func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) { - panic("implement me") + return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata) } func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error {