Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate CopyObject to raw SQL #1255

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 4 additions & 89 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,87 +1430,15 @@ func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, t
}

func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (om api.ObjectMetadata, err error) {
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
if srcBucket != dstBucket || srcPath != dstPath {
_, err = s.deleteObject(tx, dstBucket, dstPath)
_, err = tx.DeleteObject(ctx, dstBucket, dstPath)
if err != nil {
return fmt.Errorf("CopyObject: failed to delete object: %w", err)
}
}

var srcObj dbObject
err = tx.Where("objects.object_id = ? AND DBBucket.name = ?", srcPath, srcBucket).
Joins("DBBucket").
Take(&srcObj).
Error
if err != nil {
return fmt.Errorf("failed to fetch src object: %w", err)
}

if srcBucket == dstBucket && srcPath == dstPath {
// No copying is happening. We just update the metadata on the src
// object.
srcObj.MimeType = mimeType
om = newObjectMetadata(
srcObj.ObjectID,
srcObj.Etag,
srcObj.MimeType,
srcObj.Health,
srcObj.CreatedAt,
srcObj.Size,
)
if err := s.updateUserMetadata(tx, srcObj.ID, metadata); err != nil {
return fmt.Errorf("failed to update user metadata: %w", err)
}
return tx.Save(&srcObj).Error
}

var srcSlices []dbSlice
err = tx.Where("db_object_id = ?", srcObj.ID).
Find(&srcSlices).
Error
if err != nil {
return fmt.Errorf("failed to fetch src slices: %w", err)
}
for i := range srcSlices {
srcSlices[i].Model = Model{} // clear model
srcSlices[i].DBObjectID = nil // clear object id
}

var bucket dbBucket
err = tx.Where("name = ?", dstBucket).
Take(&bucket).
Error
if err != nil {
return fmt.Errorf("failed to fetch dst bucket: %w", err)
}

dstObj := srcObj
dstObj.Model = Model{} // clear model
dstObj.DBBucket = bucket // set dst bucket
dstObj.ObjectID = dstPath // set dst path
dstObj.DBBucketID = bucket.ID // set dst bucket id
dstObj.Slabs = srcSlices // set slices
if mimeType != "" {
dstObj.MimeType = mimeType // override mime type
}
if err := tx.Create(&dstObj).Error; err != nil {
return fmt.Errorf("failed to create copy of object: %w", err)
}

if err := s.createUserMetadata(tx, dstObj.ID, metadata); err != nil {
return fmt.Errorf("failed to create object metadata: %w", err)
}

om = newObjectMetadata(
dstObj.ObjectID,
dstObj.Etag,
dstObj.MimeType,
dstObj.Health,
dstObj.CreatedAt,
dstObj.Size,
)
return nil
om, err = tx.CopyObject(ctx, srcBucket, dstBucket, srcPath, dstPath, mimeType, metadata)
return err
})
return
}
Expand Down Expand Up @@ -1912,19 +1840,6 @@ func (s *SQLStore) createMultipartMetadata(tx *gorm.DB, multipartUploadID uint,
return tx.CreateInBatches(&entities, 1000).Error
}

func (s *SQLStore) updateUserMetadata(tx *gorm.DB, objID uint, metadata api.ObjectUserMetadata) error {
// delete all existing metadata
err := tx.
Where("db_object_id = ?", objID).
Delete(&dbObjectUserMetadata{}).
Error
if err != nil {
return err
}

return s.createUserMetadata(tx, objID, metadata)
}

func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice) error {
if (objID == nil && multiPartID == nil) || (objID != nil && multiPartID != nil) {
return fmt.Errorf("either objID or multiPartID must be set")
Expand Down
5 changes: 5 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type (
// opts argument can be used to filter the result.
Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error)

// CopyObject copies an object from one bucket and key to another. If
// src and dst are the same, only the metadata and mimeType are
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
// overwritten with the provided ones.
CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error)

// CreateBucket creates a new bucket with the given name and policy. If
// the bucket already exists, api.ErrBucketExists is returned.
CreateBucket(ctx context.Context, bucket string, policy api.BucketPolicy) error
Expand Down
117 changes: 117 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,123 @@ func Contracts(ctx context.Context, tx sql.Tx, opts api.ContractsOpts) ([]api.Co
return contracts, nil
}

func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) {
// stmt to fetch bucket id
bucketIDStmt, err := tx.Prepare(ctx, "SELECT id FROM buckets WHERE name = ?")
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to prepare statement to fetch bucket id: %w", err)
}
defer bucketIDStmt.Close()

// fetch source bucket
var srcBID int64
err = bucketIDStmt.QueryRow(ctx, srcBucket).Scan(&srcBID)
if errors.Is(err, dsql.ErrNoRows) {
return api.ObjectMetadata{}, fmt.Errorf("%w: source bucket", api.ErrBucketNotFound)
} else if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch src bucket id: %w", err)
}

// fetch src object id
var srcObjID int64
err = tx.QueryRow(ctx, "SELECT id FROM objects WHERE db_bucket_id = ? AND object_id = ?", srcBID, srcKey).
Scan(&srcObjID)
if errors.Is(err, dsql.ErrNoRows) {
return api.ObjectMetadata{}, api.ErrObjectNotFound
} else if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch object id: %w", err)
}

// helper to fetch metadata
fetchMetadata := func(objID int64) (om api.ObjectMetadata, err error) {
err = tx.QueryRow(ctx, "SELECT etag, health, created_at, object_id, size, mime_type FROM objects WHERE id = ?", objID).
Scan(&om.ETag, &om.Health, (*time.Time)(&om.ModTime), &om.Name, &om.Size, &om.MimeType)
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch new object: %w", err)
}
return om, nil
}

if srcBucket == dstBucket && srcKey == dstKey {
// No copying is happening. We just update the metadata on the src
// object.
if _, err := tx.Exec(ctx, "UPDATE objects SET mime_type = ? WHERE id = ?", mimeType, srcObjID); err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to update mime type: %w", err)
} else if err := UpdateMetadata(ctx, tx, srcObjID, metadata); err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to update metadata: %w", err)
}
return fetchMetadata(srcObjID)
}

// fetch destination bucket
var dstBID int64
err = bucketIDStmt.QueryRow(ctx, dstBucket).Scan(&dstBID)
if errors.Is(err, dsql.ErrNoRows) {
return api.ObjectMetadata{}, fmt.Errorf("%w: destination bucket", api.ErrBucketNotFound)
} else if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch dest bucket id: %w", err)
}

// copy object
res, err := tx.Exec(ctx, `INSERT INTO objects (created_at, object_id, db_directory_id, db_bucket_id,`+"`key`"+`, size, mime_type, etag)
SELECT ?, ?, db_directory_id, ?, `+"`key`"+`, size, ?, etag
FROM objects
WHERE id = ?`, time.Now(), dstKey, dstBID, mimeType, srcObjID)
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to insert object: %w", err)
}
dstObjID, err := res.LastInsertId()
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch object id: %w", err)
}

// copy slices
_, err = tx.Exec(ctx, `INSERT INTO slices (created_at, db_object_id, object_index, db_slab_id, offset, length)
SELECT ?, ?, object_index, db_slab_id, offset, length
FROM slices
WHERE db_object_id = ?`, time.Now(), dstObjID, srcObjID)
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to copy slices: %w", err)
}

// create metadata
if err := InsertMetadata(ctx, tx, dstObjID, metadata); err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to insert metadata: %w", err)
}

// fetch copied object
return fetchMetadata(dstObjID)
}

func UpdateMetadata(ctx context.Context, tx sql.Tx, objID int64, md api.ObjectUserMetadata) error {
if err := DeleteMetadata(ctx, tx, objID); err != nil {
return err
} else if err := InsertMetadata(ctx, tx, objID, md); err != nil {
return err
}
return nil
}

func DeleteMetadata(ctx context.Context, tx sql.Tx, objID int64) error {
_, err := tx.Exec(ctx, "DELETE FROM object_user_metadata WHERE db_object_id = ?", objID)
return err
}

func InsertMetadata(ctx context.Context, tx sql.Tx, objID int64, md api.ObjectUserMetadata) error {
insertMetadataStmt, err := tx.Prepare(ctx, "INSERT INTO object_user_metadata (created_at, db_object_id, `key`, value) VALUES (?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert object metadata: %w", err)
}
defer insertMetadataStmt.Close()

for k, v := range md {
if _, err := insertMetadataStmt.Exec(ctx, time.Now(), objID, k, v); err != nil {
return fmt.Errorf("failed to insert object metadata: %w", err)
}
}
return nil
}

func DeleteBucket(ctx context.Context, tx sql.Tx, bucket string) error {
var id int64
err := tx.QueryRow(ctx, "SELECT id FROM buckets WHERE name = ?", bucket).Scan(&id)
Expand Down
18 changes: 7 additions & 11 deletions stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts)
return ssql.Contracts(ctx, tx, opts)
}

func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) {
return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata)
}

func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error {
policy, err := json.Marshal(bp)
if err != nil {
Expand Down Expand Up @@ -314,17 +318,9 @@ func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contrac
return err
}

// update metadata
insertMetadataStmt, err := tx.Prepare(ctx, "INSERT INTO object_user_metadata (created_at, db_object_id, `key`, value) VALUES (?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert object metadata: %w", err)
}
defer insertMetadataStmt.Close()

for k, v := range md {
if _, err := insertMetadataStmt.Exec(ctx, time.Now(), objID, k, v); err != nil {
return fmt.Errorf("failed to insert object metadata: %w", err)
}
// insert metadata
if err := ssql.InsertMetadata(ctx, tx, objID, md); err != nil {
return fmt.Errorf("failed to insert object metadata: %w", err)
}
return nil
}
Expand Down
18 changes: 7 additions & 11 deletions stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts)
return ssql.Contracts(ctx, tx, opts)
}

func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) {
return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata)
}

func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error {
policy, err := json.Marshal(bp)
if err != nil {
Expand Down Expand Up @@ -295,17 +299,9 @@ func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contrac
return err
}

// update metadata
insertMetadataStmt, err := tx.Prepare(ctx, "INSERT INTO object_user_metadata (created_at, db_object_id, key, value) VALUES (?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert object metadata: %w", err)
}
defer insertMetadataStmt.Close()

for k, v := range md {
if _, err := insertMetadataStmt.Exec(ctx, time.Now(), objID, k, v); err != nil {
return fmt.Errorf("failed to insert object metadata: %w", err)
}
// insert metadata
if err := ssql.InsertMetadata(ctx, tx, objID, md); err != nil {
return fmt.Errorf("failed to insert object metadata: %w", err)
}
return nil
}
Expand Down
Loading