Skip to content

Commit

Permalink
satellite/metabase: make DeleteObjects respect Object Lock
Browse files Browse the repository at this point in the history
This change allows objects with active Object Lock settings to be
protected from deletion by the DeleteObjects metainfo DB method.

References storj/edge#515

Change-Id: I1d5b3eaf8b4e02317bf7bdea630219df555c6f2f
  • Loading branch information
jewharton authored and Storj Robot committed Feb 21, 2025
1 parent a4f4a6c commit 327c6b0
Show file tree
Hide file tree
Showing 2 changed files with 546 additions and 66 deletions.
23 changes: 17 additions & 6 deletions satellite/metabase/delete_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ func (opts DeleteObjects) Verify() error {
switch {
case opts.Versioned && opts.Suspended:
return ErrInvalidRequest.New("Versioned and Suspended must not be simultaneously enabled")
case opts.ObjectLock.Enabled:
return ErrInvalidRequest.New("deletion from buckets with Object Lock enabled is not yet supported")
case opts.ProjectID.IsZero():
return ErrInvalidRequest.New("ProjectID missing")
case opts.BucketName == "":
Expand Down Expand Up @@ -79,6 +77,8 @@ const (
DeleteStatusNotFound
// DeleteStatusOK indicates that the object was successfully deleted.
DeleteStatusOK
// DeleteStatusObjectLocked indicates that the object's Object Lock configuration prevented its deletion.
DeleteStatusObjectLocked
)

// DeleteObjectsResultItem contains the result of an attempt to delete a specific object from a bucket.
Expand All @@ -100,8 +100,6 @@ type DeleteObjectsInfo struct {
}

// DeleteObjects deletes specific objects from a bucket.
//
// TODO: Support Object Lock.
func (db *DB) DeleteObjects(ctx context.Context, opts DeleteObjects) (result DeleteObjectsResult, err error) {
defer mon.Task()(&ctx)(&err)

Expand Down Expand Up @@ -133,6 +131,7 @@ func (db *DB) DeleteObjects(ctx context.Context, opts DeleteObjects) (result Del
BucketName: opts.BucketName,
ObjectKey: resultItem.ObjectKey,
},
ObjectLock: opts.ObjectLock,
}

var deleteObjectResult DeleteObjectResult
Expand Down Expand Up @@ -193,7 +192,12 @@ func (db *DB) DeleteObjects(ctx context.Context, opts DeleteObjects) (result Del
}

if err != nil {
return result, err
if ErrObjectLock.Has(err) {
resultItem.Status = DeleteStatusObjectLocked
err = nil
} else {
return result, err
}
}

if resultItem.Status == DeleteStatusUnprocessed {
Expand All @@ -214,6 +218,7 @@ func (db *DB) DeleteObjects(ctx context.Context, opts DeleteObjects) (result Del
}]; ok {
marker := processedOpts.results[linkedItemIdx].Marker
if marker != nil && marker.StreamVersionID == resultItem.RequestedStreamVersionID {
resultItem.Status = DeleteStatusNotFound
continue
}
}
Expand All @@ -228,6 +233,7 @@ func (db *DB) DeleteObjects(ctx context.Context, opts DeleteObjects) (result Del
},
Version: resultItem.RequestedStreamVersionID.Version(),
StreamIDSuffix: resultItem.RequestedStreamVersionID.StreamIDSuffix(),
ObjectLock: opts.ObjectLock,
})

result.DeletedSegmentCount += int64(deleteObjectResult.DeletedSegmentCount)
Expand All @@ -241,7 +247,12 @@ func (db *DB) DeleteObjects(ctx context.Context, opts DeleteObjects) (result Del
}

if err != nil {
return result, err
if ErrObjectLock.Has(err) {
resultItem.Status = DeleteStatusObjectLocked
err = nil
} else {
return result, err
}
}

if resultItem.Status == DeleteStatusUnprocessed {
Expand Down
Loading

0 comments on commit 327c6b0

Please sign in to comment.