Skip to content

Commit

Permalink
Merge pull request #429 from SiaFoundation/chris/batch-deletion
Browse files Browse the repository at this point in the history
Add batch deletion support
  • Loading branch information
ChrisSchinnerl committed Jun 21, 2023
2 parents 7ca016a + 4d2214d commit 27b7549
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 9 deletions.
12 changes: 11 additions & 1 deletion bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type (
SearchObjects(ctx context.Context, substring string, offset, limit int) ([]api.ObjectMetadata, error)
UpdateObject(ctx context.Context, path, contractSet string, o object.Object, ps *object.PartialSlab, usedContracts map[types.PublicKey]types.FileContractID) error
RemoveObject(ctx context.Context, path string) error
RemoveObjects(ctx context.Context, prefix string) error

ObjectsStats(ctx context.Context) (api.ObjectsStats, error)

Expand Down Expand Up @@ -756,7 +757,16 @@ func (b *bus) objectsHandlerPUT(jc jape.Context) {
}

func (b *bus) objectsHandlerDELETE(jc jape.Context) {
err := b.ms.RemoveObject(jc.Request.Context(), jc.PathParam("path"))
var batch bool
if jc.DecodeForm("batch", &batch) != nil {
return
}
var err error
if batch {
err = b.ms.RemoveObjects(jc.Request.Context(), jc.PathParam("path"))
} else {
err = b.ms.RemoveObject(jc.Request.Context(), jc.PathParam("path"))
}
if errors.Is(err, api.ErrObjectNotFound) {
jc.Error(err, http.StatusNotFound)
return
Expand Down
9 changes: 6 additions & 3 deletions bus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,12 @@ func (c *Client) AddObject(ctx context.Context, path, contractSet string, o obje
return
}

// DeleteObject deletes the object at the given path.
func (c *Client) DeleteObject(ctx context.Context, path string) (err error) {
err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/objects/%s", path))
// DeleteObject either deletes the object at the given path or if batch=true
// deletes all objects that start with the given path.
func (c *Client) DeleteObject(ctx context.Context, path string, batch bool) (err error) {
values := url.Values{}
values.Set("batch", fmt.Sprint(batch))
err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/objects/%s?"+values.Encode(), path))
return
}

Expand Down
48 changes: 47 additions & 1 deletion internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func TestParallelUpload(t *testing.T) {
}

// upload the data
name := fmt.Sprintf("data_%v", hex.EncodeToString(data[:16]))
name := fmt.Sprintf("/dir/data_%v", hex.EncodeToString(data[:16]))
if err := w.UploadObject(context.Background(), bytes.NewReader(data), name); err != nil {
return err
}
Expand All @@ -898,6 +898,52 @@ func TestParallelUpload(t *testing.T) {
}()
}
wg.Wait()

// Check if objects exist.
objects, err := cluster.Bus.SearchObjects(context.Background(), "/dir/", 0, 100)
if err != nil {
t.Fatal(err)
}
if len(objects) != 3 {
t.Fatal("wrong number of objects", len(objects))
}

// Upload one more object.
if err := w.UploadObject(context.Background(), bytes.NewReader([]byte("data")), "/foo"); err != nil {
t.Fatal(err)
}

objects, err = cluster.Bus.SearchObjects(context.Background(), "/", 0, 100)
if err != nil {
t.Fatal(err)
}
if len(objects) != 4 {
t.Fatal("wrong number of objects", len(objects))
}

// Delete all objects under /dir/.
if err := cluster.Bus.DeleteObject(context.Background(), "/dir/", true); err != nil {
t.Fatal(err)
}
objects, err = cluster.Bus.SearchObjects(context.Background(), "/", 0, 100)
if err != nil {
t.Fatal(err)
}
if len(objects) != 1 {
t.Fatal("objects weren't deleted")
}

// Delete all objects under /.
if err := cluster.Bus.DeleteObject(context.Background(), "/", true); err != nil {
t.Fatal(err)
}
objects, err = cluster.Bus.SearchObjects(context.Background(), "/", 0, 100)
if err != nil {
t.Fatal(err)
}
if len(objects) != 0 {
t.Fatal("objects weren't deleted")
}
}

// TestParallelDownload tests downloading a file in parallel.
Expand Down
36 changes: 34 additions & 2 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,14 +960,35 @@ func createSlabBuffer(tx *gorm.DB, objectID, contractSetID uint, partialSlab obj
}

func (s *SQLStore) RemoveObject(ctx context.Context, key string) error {
rowsAffected, err := deleteObject(s.db, key)
var rowsAffected int64
var err error
err = s.retryTransaction(func(tx *gorm.DB) error {
rowsAffected, err = deleteObject(tx, key)
return err
})
if err != nil {
return err
}
if rowsAffected == 0 {
return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, key)
}
return err
return nil
}

func (s *SQLStore) RemoveObjects(ctx context.Context, prefix string) error {
var rowsAffected int64
var err error
err = s.retryTransaction(func(tx *gorm.DB) error {
rowsAffected, err = deleteObjects(tx, prefix)
return err
})
if err != nil {
return err
}
if rowsAffected == 0 {
return fmt.Errorf("%w: prefix: %s", api.ErrObjectNotFound, prefix)
}
return nil
}

func (s *SQLStore) Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) {
Expand Down Expand Up @@ -1410,3 +1431,14 @@ func deleteObject(tx *gorm.DB, key string) (int64, error) {
}
return tx.RowsAffected, nil
}

func deleteObjects(tx *gorm.DB, path string) (int64, error) {
tx = tx.Exec("DELETE FROM objects WHERE object_id LIKE ?", path+"%")
if tx.Error != nil {
return 0, tx.Error
}
if err := pruneSlabs(tx); err != nil {
return 0, err
}
return tx.RowsAffected, nil
}
8 changes: 6 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ type Bus interface {

Object(ctx context.Context, path, prefix string, offset, limit int) (object.Object, []api.ObjectMetadata, error)
AddObject(ctx context.Context, path, contractSet string, o object.Object, usedContracts map[types.PublicKey]types.FileContractID) error
DeleteObject(ctx context.Context, path string) error
DeleteObject(ctx context.Context, path string, batch bool) error

Accounts(ctx context.Context) ([]api.Account, error)
UpdateSlab(ctx context.Context, s object.Slab, contractSet string, goodContracts map[types.PublicKey]types.FileContractID) error
Expand Down Expand Up @@ -1098,8 +1098,12 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) {
}

func (w *worker) objectsHandlerDELETE(jc jape.Context) {
var batch bool
if jc.DecodeForm("batch", &batch) != nil {
return
}
path := strings.TrimPrefix(jc.PathParam("path"), "/")
err := w.bus.DeleteObject(jc.Request.Context(), path)
err := w.bus.DeleteObject(jc.Request.Context(), path, batch)
if err != nil && strings.Contains(err.Error(), api.ErrObjectNotFound.Error()) {
jc.Error(err, http.StatusNotFound)
return
Expand Down

0 comments on commit 27b7549

Please sign in to comment.