Skip to content

Commit

Permalink
sql: implement CopyObject
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed May 29, 2024
1 parent 1a21df0 commit b407f0e
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 91 deletions.
93 changes: 4 additions & 89 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,87 +1430,15 @@ func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, t
}

func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (om api.ObjectMetadata, err error) {
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
if srcBucket != dstBucket || srcPath != dstPath {
_, err = s.deleteObject(tx, dstBucket, dstPath)
_, err = tx.DeleteObject(ctx, dstBucket, dstPath)
if err != nil {
return fmt.Errorf("CopyObject: failed to delete object: %w", err)
}
}

var srcObj dbObject
err = tx.Where("objects.object_id = ? AND DBBucket.name = ?", srcPath, srcBucket).
Joins("DBBucket").
Take(&srcObj).
Error
if err != nil {
return fmt.Errorf("failed to fetch src object: %w", err)
}

if srcBucket == dstBucket && srcPath == dstPath {
// No copying is happening. We just update the metadata on the src
// object.
srcObj.MimeType = mimeType
om = newObjectMetadata(
srcObj.ObjectID,
srcObj.Etag,
srcObj.MimeType,
srcObj.Health,
srcObj.CreatedAt,
srcObj.Size,
)
if err := s.updateUserMetadata(tx, srcObj.ID, metadata); err != nil {
return fmt.Errorf("failed to update user metadata: %w", err)
}
return tx.Save(&srcObj).Error
}

var srcSlices []dbSlice
err = tx.Where("db_object_id = ?", srcObj.ID).
Find(&srcSlices).
Error
if err != nil {
return fmt.Errorf("failed to fetch src slices: %w", err)
}
for i := range srcSlices {
srcSlices[i].Model = Model{} // clear model
srcSlices[i].DBObjectID = nil // clear object id
}

var bucket dbBucket
err = tx.Where("name = ?", dstBucket).
Take(&bucket).
Error
if err != nil {
return fmt.Errorf("failed to fetch dst bucket: %w", err)
}

dstObj := srcObj
dstObj.Model = Model{} // clear model
dstObj.DBBucket = bucket // set dst bucket
dstObj.ObjectID = dstPath // set dst path
dstObj.DBBucketID = bucket.ID // set dst bucket id
dstObj.Slabs = srcSlices // set slices
if mimeType != "" {
dstObj.MimeType = mimeType // override mime type
}
if err := tx.Create(&dstObj).Error; err != nil {
return fmt.Errorf("failed to create copy of object: %w", err)
}

if err := s.createUserMetadata(tx, dstObj.ID, metadata); err != nil {
return fmt.Errorf("failed to create object metadata: %w", err)
}

om = newObjectMetadata(
dstObj.ObjectID,
dstObj.Etag,
dstObj.MimeType,
dstObj.Health,
dstObj.CreatedAt,
dstObj.Size,
)
return nil
om, err = tx.CopyObject(ctx, srcBucket, dstBucket, srcPath, dstPath, mimeType, metadata)
return err
})
return
}
Expand Down Expand Up @@ -1912,19 +1840,6 @@ func (s *SQLStore) createMultipartMetadata(tx *gorm.DB, multipartUploadID uint,
return tx.CreateInBatches(&entities, 1000).Error
}

func (s *SQLStore) updateUserMetadata(tx *gorm.DB, objID uint, metadata api.ObjectUserMetadata) error {
// delete all existing metadata
err := tx.
Where("db_object_id = ?", objID).
Delete(&dbObjectUserMetadata{}).
Error
if err != nil {
return err
}

return s.createUserMetadata(tx, objID, metadata)
}

func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice) error {
if (objID == nil && multiPartID == nil) || (objID != nil && multiPartID != nil) {
return fmt.Errorf("either objID or multiPartID must be set")
Expand Down
102 changes: 102 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,108 @@ func Contracts(ctx context.Context, tx sql.Tx, opts api.ContractsOpts) ([]api.Co
return contracts, nil
}

func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) {
// stmt to fetch bucket id
bucketIDStmt, err := tx.Prepare(ctx, "SELECT id FROM buckets WHERE name = ?")
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to prepare statement to fetch bucket id: %w", err)
}
defer bucketIDStmt.Close()

// fetch source bucket
var srcBID int64
err = bucketIDStmt.QueryRow(ctx, srcBucket).Scan(&srcBID)
if errors.Is(err, dsql.ErrNoRows) {
return api.ObjectMetadata{}, fmt.Errorf("%w: source bucket", api.ErrBucketNotFound)
} else if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch src bucket id: %w", err)
}

// fetch src object id
var srcObjID int64
err = tx.QueryRow(ctx, "SELECT id FROM objects WHERE db_bucket_id = ? AND object_id = ?", srcBID, srcKey).
Scan(&srcObjID)
if errors.Is(err, dsql.ErrNoRows) {
return api.ObjectMetadata{}, api.ErrObjectNotFound
} else if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch object id: %w", err)
}

// helper to fetch metadata
fetchMetadata := func(objID int64) (om api.ObjectMetadata, err error) {
err = tx.QueryRow(ctx, "SELECT etag, health, created_at, object_id, size, mime_type FROM objects WHERE id = ?", objID).
Scan(&om.ETag, &om.Health, (*time.Time)(&om.ModTime), &om.Name, &om.Size, &om.MimeType)
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch new object: %w", err)
}
return om, nil
}

if srcBucket == dstBucket && srcKey == dstKey {
// No copying is happening. We just update the metadata on the src
// object.
if _, err := tx.Exec(ctx, "UPDATE objects SET mime_type = ? WHERE id = ?", mimeType, srcObjID); err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to update mime type: %w", err)
} else if err := UpdateMetadata(ctx, tx, srcObjID, metadata); err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to update metadata: %w", err)
}
return fetchMetadata(srcObjID)
}

// fetch destination bucket
var dstBID int64
err = bucketIDStmt.QueryRow(ctx, dstBucket).Scan(&dstBID)
if errors.Is(err, dsql.ErrNoRows) {
return api.ObjectMetadata{}, fmt.Errorf("%w: destination bucket", api.ErrBucketNotFound)
} else if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch dest bucket id: %w", err)
}

// copy object
res, err := tx.Exec(ctx, `INSERT INTO objects (created_at, object_id, db_directory_id, db_bucket_id,`+"`key`"+`, size, mime_type, etag)
SELECT ?, ?, db_directory_id, ?, `+"`key`"+`, size, ?, etag
FROM objects
WHERE id = ?`, time.Now(), dstKey, dstBID, mimeType, srcObjID)
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to insert object: %w", err)
}
dstObjID, err := res.LastInsertId()
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to fetch object id: %w", err)
}

// copy slices
_, err = tx.Exec(ctx, `INSERT INTO slices (created_at, db_object_id, object_index, db_slab_id, offset, length)
SELECT ?, ?, object_index, db_slab_id, offset, length
FROM slices
WHERE db_object_id = ?`, time.Now(), dstObjID, srcObjID)
if err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to copy slices: %w", err)
}

// create metadata
if err := InsertMetadata(ctx, tx, dstObjID, metadata); err != nil {
return api.ObjectMetadata{}, fmt.Errorf("failed to insert metadata: %w", err)
}

// fetch copied object
return fetchMetadata(dstObjID)
}

func UpdateMetadata(ctx context.Context, tx sql.Tx, objID int64, md api.ObjectUserMetadata) error {
if err := DeleteMetadata(ctx, tx, objID); err != nil {
return err
} else if err := InsertMetadata(ctx, tx, objID, md); err != nil {
return err
}
return nil
}

func DeleteMetadata(ctx context.Context, tx sql.Tx, objID int64) error {
_, err := tx.Exec(ctx, "DELETE FROM object_user_metadata WHERE db_object_id = ?", objID)
return err
}

func InsertMetadata(ctx context.Context, tx sql.Tx, objID int64, md api.ObjectUserMetadata) error {
insertMetadataStmt, err := tx.Prepare(ctx, "INSERT INTO object_user_metadata (created_at, db_object_id, `key`, value) VALUES (?, ?, ?, ?)")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts)
}

func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) {
panic("implement me")
return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata)
}

func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error {
Expand Down
2 changes: 1 addition & 1 deletion stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (tx *MainDatabaseTx) Contracts(ctx context.Context, opts api.ContractsOpts)
}

func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) {
panic("implement me")
return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata)
}

func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error {
Expand Down

0 comments on commit b407f0e

Please sign in to comment.