From 5af724bd9a2c765b456a688932ec821a33a6500a Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 17 Sep 2024 10:18:05 +0200 Subject: [PATCH 1/4] bus,worker: update object endpoints --- api/object.go | 16 +++++------- bus/bus.go | 13 +++++----- bus/client/objects.go | 21 ++++++++++------ bus/routes.go | 31 ++++++++++++++--------- internal/test/e2e/cluster_test.go | 10 ++++---- internal/test/e2e/metrics_test.go | 2 +- internal/test/e2e/pruning_test.go | 4 +-- internal/utils/web.go | 2 +- worker/client/client.go | 17 ++++++------- worker/mocks_test.go | 6 ++++- worker/s3/backend.go | 4 +-- worker/s3/s3.go | 2 +- worker/worker.go | 42 ++++++++++++++++++++----------- 13 files changed, 99 insertions(+), 71 deletions(-) diff --git a/api/object.go b/api/object.go index 09e33dba1..1ae4f455a 100644 --- a/api/object.go +++ b/api/object.go @@ -103,6 +103,12 @@ type ( Objects []ObjectMetadata `json:"objects"` } + // ObjectsRemoveRequest is the request type for the /bus/objects/remove endpoint. + ObjectsRemoveRequest struct { + Bucket string `json:"bucket"` + Prefix string `json:"prefix"` + } + // ObjectsRenameRequest is the request type for the /bus/objects/rename endpoint. ObjectsRenameRequest struct { Bucket string `json:"bucket"` @@ -189,10 +195,6 @@ type ( Metadata ObjectUserMetadata `json:"metadata"` } - DeleteObjectOptions struct { - Batch bool - } - HeadObjectOptions struct { Range *DownloadRange } @@ -284,12 +286,6 @@ func (opts DownloadObjectOptions) ApplyHeaders(h http.Header) { } } -func (opts DeleteObjectOptions) Apply(values url.Values) { - if opts.Batch { - values.Set("batch", "true") - } -} - func (opts HeadObjectOptions) Apply(values url.Values) { } diff --git a/bus/bus.go b/bus/bus.go index 90db3764d..3f92ef9bf 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -457,12 +457,13 @@ func (b *Bus) Handler() http.Handler { "POST /multipart/listuploads": b.multipartHandlerListUploadsPOST, "POST /multipart/listparts": b.multipartHandlerListPartsPOST, - "GET /listobjects/*prefix": b.objectsHandlerGET, - "GET /objects/*key": b.objectHandlerGET, - "PUT /objects/*key": b.objectsHandlerPUT, - "DELETE /objects/*key": b.objectsHandlerDELETE, - "POST /objects/copy": b.objectsCopyHandlerPOST, - "POST /objects/rename": b.objectsRenameHandlerPOST, + "GET /object/*key": b.objectHandlerGET, + "PUT /object/*key": b.objectHandlerPUT, + "DELETE /object/*key": b.objectHandlerDELETE, + "GET /objects/*prefix": b.objectsHandlerGET, + "POST /objects/copy": b.objectsCopyHandlerPOST, + "POST /objects/remove": b.objectsRemoveHandlerPOST, + "POST /objects/rename": b.objectsRenameHandlerPOST, "GET /params/gouging": b.paramsHandlerGougingGET, "GET /params/upload": b.paramsHandlerUploadGET, diff --git a/bus/client/objects.go b/bus/client/objects.go index 1a1f7fc0b..a6e06c15e 100644 --- a/bus/client/objects.go +++ b/bus/client/objects.go @@ -12,7 +12,7 @@ import ( // AddObject stores the provided object under the given path. func (c *Client) AddObject(ctx context.Context, bucket, path, contractSet string, o object.Object, opts api.AddObjectOptions) (err error) { path = api.ObjectKeyEscape(path) - err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/objects/%s", path), api.AddObjectRequest{ + err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/object/%s", path), api.AddObjectRequest{ Bucket: bucket, ContractSet: contractSet, Object: o, @@ -37,15 +37,22 @@ func (c *Client) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, d return } -// DeleteObject either deletes the object at the given key or if batch=true -// deletes all objects that start with the given key. -func (c *Client) DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) (err error) { +// DeleteObject deletes the object with given key. +func (c *Client) DeleteObject(ctx context.Context, bucket, key string) (err error) { values := url.Values{} values.Set("bucket", bucket) - opts.Apply(values) key = api.ObjectKeyEscape(key) - err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/objects/%s?"+values.Encode(), key)) + err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/object/%s?"+values.Encode(), key)) + return +} + +// RemoveObjects removes objects with given prefix. +func (c *Client) RemoveObjects(ctx context.Context, bucket, prefix string) (err error) { + err = c.c.WithContext(ctx).POST("/objects/remove", api.ObjectsRemoveRequest{ + Bucket: bucket, + Prefix: prefix, + }, nil) return } @@ -58,7 +65,7 @@ func (c *Client) Object(ctx context.Context, bucket, key string, opts api.GetObj key = api.ObjectKeyEscape(key) key += "?" + values.Encode() - err = c.c.WithContext(ctx).GET(fmt.Sprintf("/objects/%s", key), &res) + err = c.c.WithContext(ctx).GET(fmt.Sprintf("/object/%s", key), &res) return } diff --git a/bus/routes.go b/bus/routes.go index d1505af0d..78de89558 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -1183,7 +1183,7 @@ func (b *Bus) objectsHandlerGET(jc jape.Context) { jc.Encode(resp) } -func (b *Bus) objectsHandlerPUT(jc jape.Context) { +func (b *Bus) objectHandlerPUT(jc jape.Context) { var aor api.AddObjectRequest if jc.Decode(&aor) != nil { return @@ -1208,6 +1208,22 @@ func (b *Bus) objectsCopyHandlerPOST(jc jape.Context) { jc.Encode(om) } +func (b *Bus) objectsRemoveHandlerPOST(jc jape.Context) { + var orr api.ObjectsRemoveRequest + if jc.Decode(&orr) != nil { + return + } else if orr.Bucket == "" { + orr.Bucket = api.DefaultBucketName + } + + if orr.Prefix == "" { + jc.Error(errors.New("prefix cannot be empty"), http.StatusBadRequest) + return + } + + jc.Check("failed to remove objects", b.ms.RemoveObjects(jc.Request.Context(), orr.Bucket, orr.Prefix)) +} + func (b *Bus) objectsRenameHandlerPOST(jc jape.Context) { var orr api.ObjectsRenameRequest if jc.Decode(&orr) != nil { @@ -1238,21 +1254,12 @@ func (b *Bus) objectsRenameHandlerPOST(jc jape.Context) { } } -func (b *Bus) objectsHandlerDELETE(jc jape.Context) { - var batch bool - if jc.DecodeForm("batch", &batch) != nil { - return - } +func (b *Bus) objectHandlerDELETE(jc jape.Context) { bucket := api.DefaultBucketName if jc.DecodeForm("bucket", &bucket) != nil { return } - var err error - if batch { - err = b.ms.RemoveObjects(jc.Request.Context(), bucket, jc.PathParam("key")) - } else { - err = b.ms.RemoveObject(jc.Request.Context(), bucket, jc.PathParam("key")) - } + err := b.ms.RemoveObject(jc.Request.Context(), bucket, jc.PathParam("key")) if errors.Is(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) return diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index c6827297f..1136fc10c 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -542,7 +542,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { // delete all uploads for _, upload := range uploads { - tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, upload.key, api.DeleteObjectOptions{})) + tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, upload.key)) } // assert root dir is empty @@ -870,7 +870,7 @@ func TestUploadDownloadExtended(t *testing.T) { } // delete the object - tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, path, api.DeleteObjectOptions{})) + tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, path)) } } @@ -1216,7 +1216,7 @@ func TestParallelUpload(t *testing.T) { } // Delete all objects under /dir/. - if err := cluster.Bus.DeleteObject(context.Background(), api.DefaultBucketName, "/dir/", api.DeleteObjectOptions{Batch: true}); err != nil { + if err := cluster.Bus.RemoveObjects(context.Background(), api.DefaultBucketName, "/dir/"); err != nil { t.Fatal(err) } resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100}) @@ -1227,7 +1227,7 @@ func TestParallelUpload(t *testing.T) { } // Delete all objects under /. - if err := cluster.Bus.DeleteObject(context.Background(), api.DefaultBucketName, "/", api.DeleteObjectOptions{Batch: true}); err != nil { + if err := cluster.Bus.RemoveObjects(context.Background(), api.DefaultBucketName, "/"); err != nil { t.Fatal(err) } resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100}) @@ -1402,7 +1402,7 @@ func TestUploadDownloadSameHost(t *testing.T) { } // delete the object - tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, fmt.Sprintf("foo_%d", i), api.DeleteObjectOptions{})) + tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, fmt.Sprintf("foo_%d", i))) } // wait until the slabs and sectors were pruned before constructing the diff --git a/internal/test/e2e/metrics_test.go b/internal/test/e2e/metrics_test.go index fcec2d6c1..31dde6a81 100644 --- a/internal/test/e2e/metrics_test.go +++ b/internal/test/e2e/metrics_test.go @@ -42,7 +42,7 @@ func TestMetrics(t *testing.T) { data := frand.Bytes(rhpv2.SectorSize) tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, "foo", api.UploadObjectOptions{})) tt.OK(w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, "foo", api.DownloadObjectOptions{})) - tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, "foo", api.DeleteObjectOptions{})) + tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, "foo")) tt.Retry(30, time.Second, func() (err error) { defer func() { diff --git a/internal/test/e2e/pruning_test.go b/internal/test/e2e/pruning_test.go index c612cbe32..c316f6ed6 100644 --- a/internal/test/e2e/pruning_test.go +++ b/internal/test/e2e/pruning_test.go @@ -181,7 +181,7 @@ func TestSectorPruning(t *testing.T) { // delete every other object for i := 0; i < numObjects; i += 2 { filename := fmt.Sprintf("obj_%d", i) - tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename, api.DeleteObjectOptions{})) + tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename)) } // assert amount of prunable data @@ -227,7 +227,7 @@ func TestSectorPruning(t *testing.T) { // delete other object for i := 1; i < numObjects; i += 2 { filename := fmt.Sprintf("obj_%d", i) - tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename, api.DeleteObjectOptions{})) + tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename)) } // assert amount of prunable data diff --git a/internal/utils/web.go b/internal/utils/web.go index 471270deb..d79ae0818 100644 --- a/internal/utils/web.go +++ b/internal/utils/web.go @@ -44,7 +44,7 @@ func (t TreeMux) ServeHTTP(w http.ResponseWriter, req *http.Request) { func Auth(password string, unauthenticatedDownloads bool) func(http.Handler) http.Handler { return func(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if unauthenticatedDownloads && req.Method == http.MethodGet && strings.HasPrefix(req.URL.Path, "/objects/") { + if unauthenticatedDownloads && req.Method == http.MethodGet && strings.HasPrefix(req.URL.Path, "/object/") { h.ServeHTTP(w, req) } else { jape.BasicAuth(password)(h).ServeHTTP(w, req) diff --git a/worker/client/client.go b/worker/client/client.go index ca5aee3c8..d60b1bb69 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -58,13 +58,12 @@ func (c *Client) Contracts(ctx context.Context, hostTimeout time.Duration) (resp } // DeleteObject deletes the object at the given path. -func (c *Client) DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) (err error) { +func (c *Client) DeleteObject(ctx context.Context, bucket, key string) (err error) { values := url.Values{} values.Set("bucket", bucket) - opts.Apply(values) key = api.ObjectKeyEscape(key) - err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/objects/%s?"+values.Encode(), key)) + err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/object/%s?"+values.Encode(), key)) return } @@ -92,7 +91,7 @@ func (c *Client) DownloadStats() (resp api.DownloadStatsResponse, err error) { // HeadObject returns the metadata of the object at the given key. func (c *Client) HeadObject(ctx context.Context, bucket, key string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) { - c.c.Custom("HEAD", fmt.Sprintf("/objects/%s", key), nil, nil) + c.c.Custom("HEAD", fmt.Sprintf("/object/%s", key), nil, nil) values := url.Values{} values.Set("bucket", url.QueryEscape(bucket)) @@ -101,7 +100,7 @@ func (c *Client) HeadObject(ctx context.Context, bucket, key string, opts api.He key += "?" + values.Encode() // TODO: support HEAD in jape client - req, err := http.NewRequestWithContext(ctx, "HEAD", fmt.Sprintf("%s/objects/%s", c.c.BaseURL, key), http.NoBody) + req, err := http.NewRequestWithContext(ctx, "HEAD", fmt.Sprintf("%s/object/%s", c.c.BaseURL, key), http.NoBody) if err != nil { panic(err) } @@ -213,12 +212,12 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc // UploadObject uploads the data in r, creating an object at the given path. func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, key string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) { key = api.ObjectKeyEscape(key) - c.c.Custom("PUT", fmt.Sprintf("/objects/%s", key), []byte{}, nil) + c.c.Custom("PUT", fmt.Sprintf("/object/%s", key), []byte{}, nil) values := make(url.Values) values.Set("bucket", bucket) opts.ApplyValues(values) - u, err := url.Parse(fmt.Sprintf("%v/objects/%v", c.c.BaseURL, key)) + u, err := url.Parse(fmt.Sprintf("%v/object/%v", c.c.BaseURL, key)) if err != nil { panic(err) } @@ -258,8 +257,8 @@ func (c *Client) object(ctx context.Context, bucket, key string, opts api.Downlo values.Set("bucket", url.QueryEscape(bucket)) key += "?" + values.Encode() - c.c.Custom("GET", fmt.Sprintf("/objects/%s", key), nil, (*[]api.ObjectMetadata)(nil)) - req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/objects/%s", c.c.BaseURL, key), http.NoBody) + c.c.Custom("GET", fmt.Sprintf("/object/%s", key), nil, (*[]api.ObjectMetadata)(nil)) + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/object/%s", c.c.BaseURL, key), http.NoBody) if err != nil { panic(err) } diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 24a70adcb..93fcd9888 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -384,7 +384,7 @@ func (os *objectStoreMock) DeleteHostSector(ctx context.Context, hk types.Public return nil } -func (os *objectStoreMock) DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) error { +func (os *objectStoreMock) DeleteObject(ctx context.Context, bucket, key string) error { return nil } @@ -587,6 +587,10 @@ func (os *objectStoreMock) MultipartUpload(ctx context.Context, uploadID string) return api.MultipartUpload{}, nil } +func (os *objectStoreMock) RemoveObjects(ctx context.Context, bucket, prefix string) error { + return nil +} + func (os *objectStoreMock) totalSlabBufferSize() (total int) { for _, p := range os.partials { if time.Now().After(p.lockedUntil) { diff --git a/worker/s3/backend.go b/worker/s3/backend.go index 28e1f7299..c27add7cc 100644 --- a/worker/s3/backend.go +++ b/worker/s3/backend.go @@ -313,7 +313,7 @@ func (s *s3) HeadObject(ctx context.Context, bucketName, key string) (*gofakes3. // delete marker, which becomes the latest version of the object. If there // isn't a null version, Amazon S3 does not remove any objects. func (s *s3) DeleteObject(ctx context.Context, bucketName, key string) (gofakes3.ObjectDeleteResult, error) { - err := s.b.DeleteObject(ctx, bucketName, key, api.DeleteObjectOptions{}) + err := s.b.DeleteObject(ctx, bucketName, key) if utils.IsErr(err, api.ErrBucketNotFound) { return gofakes3.ObjectDeleteResult{}, gofakes3.BucketNotFound(bucketName) } else if utils.IsErr(err, api.ErrObjectNotFound) { @@ -354,7 +354,7 @@ func (s *s3) PutObject(ctx context.Context, bucketName, key string, meta map[str func (s *s3) DeleteMulti(ctx context.Context, bucketName string, objects ...string) (gofakes3.MultiDeleteResult, error) { var res gofakes3.MultiDeleteResult for _, key := range objects { - err := s.b.DeleteObject(ctx, bucketName, key, api.DeleteObjectOptions{}) + err := s.b.DeleteObject(ctx, bucketName, key) if err != nil && !utils.IsErr(err, api.ErrObjectNotFound) { res.Error = append(res.Error, gofakes3.ErrorResult{ Key: key, diff --git a/worker/s3/s3.go b/worker/s3/s3.go index 10b074306..c278bf37f 100644 --- a/worker/s3/s3.go +++ b/worker/s3/s3.go @@ -32,7 +32,7 @@ type Bus interface { AddObject(ctx context.Context, bucket, key, contractSet string, o object.Object, opts api.AddObjectOptions) (err error) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey string, opts api.CopyObjectOptions) (om api.ObjectMetadata, err error) - DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) (err error) + DeleteObject(ctx context.Context, bucket, key string) (err error) Objects(ctx context.Context, bucket, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) AbortMultipartUpload(ctx context.Context, bucket, key string, uploadID string) (err error) diff --git a/worker/worker.go b/worker/worker.go index 3fffec52d..f10300be1 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -121,9 +121,10 @@ type ( // NOTE: used by worker Bucket(_ context.Context, bucket string) (api.Bucket, error) Object(ctx context.Context, bucket, key string, opts api.GetObjectOptions) (api.Object, error) - DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) error + DeleteObject(ctx context.Context, bucket, key string) error MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, err error) PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error) + RemoveObjects(ctx context.Context, bucket, prefix string) error } SettingStore interface { @@ -477,7 +478,7 @@ func (w *Worker) uploadsStatsHandlerGET(jc jape.Context) { }) } -func (w *Worker) objectsHandlerHEAD(jc jape.Context) { +func (w *Worker) objectHandlerHEAD(jc jape.Context) { // parse bucket bucket := api.DefaultBucketName if jc.DecodeForm("bucket", &bucket) != nil { @@ -527,7 +528,7 @@ func (w *Worker) objectsHandlerHEAD(jc jape.Context) { serveContent(jc.ResponseWriter, jc.Request, path, bytes.NewReader(nil), *hor) } -func (w *Worker) objectsHandlerGET(jc jape.Context) { +func (w *Worker) objectHandlerGET(jc jape.Context) { jc.Custom(nil, []api.ObjectMetadata{}) ctx := jc.Request.Context() @@ -601,7 +602,7 @@ func (w *Worker) objectsHandlerGET(jc jape.Context) { serveContent(jc.ResponseWriter, jc.Request, key, gor.Content, gor.HeadObjectResponse) } -func (w *Worker) objectsHandlerPUT(jc jape.Context) { +func (w *Worker) objectHandlerPUT(jc jape.Context) { jc.Custom((*[]byte)(nil), nil) ctx := jc.Request.Context() @@ -760,16 +761,12 @@ func (w *Worker) multipartUploadHandlerPUT(jc jape.Context) { jc.ResponseWriter.Header().Set("ETag", api.FormatETag(resp.ETag)) } -func (w *Worker) objectsHandlerDELETE(jc jape.Context) { - var batch bool - if jc.DecodeForm("batch", &batch) != nil { - return - } +func (w *Worker) objectHandlerDELETE(jc jape.Context) { var bucket string if jc.DecodeForm("bucket", &bucket) != nil { return } - err := w.bus.DeleteObject(jc.Request.Context(), bucket, jc.PathParam("key"), api.DeleteObjectOptions{Batch: batch}) + err := w.bus.DeleteObject(jc.Request.Context(), bucket, jc.PathParam("key")) if utils.IsErr(err, api.ErrObjectNotFound) { jc.Error(err, http.StatusNotFound) return @@ -777,6 +774,22 @@ func (w *Worker) objectsHandlerDELETE(jc jape.Context) { jc.Check("couldn't delete object", err) } +func (w *Worker) objectsRemoveHandlerPOST(jc jape.Context) { + var orr api.ObjectsRemoveRequest + if jc.Decode(&orr) != nil { + return + } else if orr.Bucket == "" { + orr.Bucket = api.DefaultBucketName + } + + if orr.Prefix == "" { + jc.Error(errors.New("prefix cannot be empty"), http.StatusBadRequest) + return + } + + jc.Check("couldn't remove objects", w.bus.RemoveObjects(jc.Request.Context(), orr.Bucket, orr.Prefix)) +} + func (w *Worker) rhpContractsHandlerGET(jc jape.Context) { ctx := jc.Request.Context() @@ -957,10 +970,11 @@ func (w *Worker) Handler() http.Handler { "GET /stats/uploads": w.uploadsStatsHandlerGET, "POST /slab/migrate": w.slabMigrateHandler, - "HEAD /objects/*key": w.objectsHandlerHEAD, - "GET /objects/*key": w.objectsHandlerGET, - "PUT /objects/*key": w.objectsHandlerPUT, - "DELETE /objects/*key": w.objectsHandlerDELETE, + "HEAD /object/*key": w.objectHandlerHEAD, + "GET /object/*key": w.objectHandlerGET, + "PUT /object/*key": w.objectHandlerPUT, + "DELETE /object/*key": w.objectHandlerDELETE, + "POST /objects/remove": w.objectsRemoveHandlerPOST, "PUT /multipart/*key": w.multipartUploadHandlerPUT, From ba60fea320693868dc5315d6529189b4a9d0f935 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 17 Sep 2024 10:26:22 +0200 Subject: [PATCH 2/4] stores: remove list prefix --- api/object.go | 4 +- bus/bus.go | 4 +- bus/client/objects.go | 4 +- bus/routes.go | 4 +- stores/metadata.go | 20 ++++----- stores/metadata_test.go | 70 +++++++++++++++---------------- stores/sql/database.go | 12 +++--- stores/sql/main.go | 88 +++++++++++++++++++-------------------- stores/sql/mysql/main.go | 16 +++---- stores/sql/sqlite/main.go | 16 +++---- worker/mocks_test.go | 4 +- worker/s3/s3.go | 2 +- 12 files changed, 122 insertions(+), 122 deletions(-) diff --git a/api/object.go b/api/object.go index 1ae4f455a..4b79fddf3 100644 --- a/api/object.go +++ b/api/object.go @@ -96,8 +96,8 @@ type ( Metadata ObjectUserMetadata } - // ObjectsListResponse is the response type for the /bus/objects/list endpoint. - ObjectsListResponse struct { + // ObjectsResponse is the response type for the /bus/objects endpoint. + ObjectsResponse struct { HasMore bool `json:"hasMore"` NextMarker string `json:"nextMarker"` Objects []ObjectMetadata `json:"objects"` diff --git a/bus/bus.go b/bus/bus.go index 3f92ef9bf..be5ca227e 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -229,14 +229,14 @@ type ( DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) Bucket(_ context.Context, bucketName string) (api.Bucket, error) + Buckets(_ context.Context) ([]api.Bucket, error) CreateBucket(_ context.Context, bucketName string, policy api.BucketPolicy) error DeleteBucket(_ context.Context, bucketName string) error - ListBuckets(_ context.Context) ([]api.Bucket, error) UpdateBucketPolicy(ctx context.Context, bucketName string, policy api.BucketPolicy) error CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) - ListObjects(ctx context.Context, bucketName, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) Object(ctx context.Context, bucketName, key string) (api.Object, error) + Objects(ctx context.Context, bucketName, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsResponse, error) ObjectMetadata(ctx context.Context, bucketName, key string) (api.Object, error) ObjectsBySlabKey(ctx context.Context, bucketName string, slabKey object.EncryptionKey) ([]api.ObjectMetadata, error) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) diff --git a/bus/client/objects.go b/bus/client/objects.go index a6e06c15e..c55ac13bb 100644 --- a/bus/client/objects.go +++ b/bus/client/objects.go @@ -70,7 +70,7 @@ func (c *Client) Object(ctx context.Context, bucket, key string, opts api.GetObj } // Objects lists objects in the given bucket. -func (c *Client) Objects(ctx context.Context, bucket string, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) { +func (c *Client) Objects(ctx context.Context, bucket string, prefix string, opts api.ListObjectOptions) (resp api.ObjectsResponse, err error) { values := url.Values{} values.Set("bucket", bucket) opts.Apply(values) @@ -78,7 +78,7 @@ func (c *Client) Objects(ctx context.Context, bucket string, prefix string, opts prefix = api.ObjectKeyEscape(prefix) prefix += "?" + values.Encode() - err = c.c.WithContext(ctx).GET(fmt.Sprintf("/listobjects/%s", prefix), &resp) + err = c.c.WithContext(ctx).GET(fmt.Sprintf("/objects/%s", prefix), &resp) return } diff --git a/bus/routes.go b/bus/routes.go index 78de89558..773f764e2 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -194,7 +194,7 @@ func (b *Bus) txpoolBroadcastHandler(jc jape.Context) { } func (b *Bus) bucketsHandlerGET(jc jape.Context) { - resp, err := b.ms.ListBuckets(jc.Request.Context()) + resp, err := b.ms.Buckets(jc.Request.Context()) if jc.Check("couldn't list buckets", err) != nil { return } @@ -1173,7 +1173,7 @@ func (b *Bus) objectsHandlerGET(jc jape.Context) { return } - resp, err := b.ms.ListObjects(jc.Request.Context(), bucket, jc.PathParam("prefix"), substring, delim, sortBy, sortDir, marker, limit) + resp, err := b.ms.Objects(jc.Request.Context(), bucket, jc.PathParam("prefix"), substring, delim, sortBy, sortDir, marker, limit) if errors.Is(err, api.ErrUnsupportedDelimiter) { jc.Error(err, http.StatusBadRequest) return diff --git a/stores/metadata.go b/stores/metadata.go index 64acb2ec8..d6658a09c 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -53,6 +53,14 @@ func (s *SQLStore) Bucket(ctx context.Context, bucket string) (b api.Bucket, err return } +func (s *SQLStore) Buckets(ctx context.Context) (buckets []api.Bucket, err error) { + err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) (err error) { + buckets, err = tx.Buckets(ctx) + return + }) + return +} + func (s *SQLStore) CreateBucket(ctx context.Context, bucket string, policy api.BucketPolicy) error { return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { return tx.CreateBucket(ctx, bucket, policy) @@ -71,14 +79,6 @@ func (s *SQLStore) DeleteBucket(ctx context.Context, bucket string) error { }) } -func (s *SQLStore) ListBuckets(ctx context.Context) (buckets []api.Bucket, err error) { - err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) (err error) { - buckets, err = tx.ListBuckets(ctx) - return - }) - return -} - // ObjectsStats returns some info related to the objects stored in the store. To // reduce locking and make sure all results are consistent, everything is done // within a single transaction. @@ -790,9 +790,9 @@ func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []types } } -func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsListResponse, err error) { +func (s *SQLStore) Objects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsResponse, err error) { err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { - resp, err = tx.ListObjects(ctx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) + resp, err = tx.Objects(ctx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) return err }) return diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 0195b264f..0fc8f4717 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -1455,7 +1455,7 @@ func TestObjectHealth(t *testing.T) { } // assert health is returned correctly by ObjectEntries - resp, err := ss.ListObjects(context.Background(), api.DefaultBucketName, "/", "", "", "", "", "", -1) + resp, err := ss.Objects(context.Background(), api.DefaultBucketName, "/", "", "", "", "", "", -1) entries := resp.Objects if err != nil { t.Fatal(err) @@ -1466,7 +1466,7 @@ func TestObjectHealth(t *testing.T) { } // assert health is returned correctly by SearchObject - resp, err = ss.ListObjects(context.Background(), api.DefaultBucketName, "/", "foo", "", "", "", "", -1) + resp, err = ss.Objects(context.Background(), api.DefaultBucketName, "/", "foo", "", "", "", "", -1) if err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { @@ -1507,9 +1507,9 @@ func TestObjectHealth(t *testing.T) { } } -// TestListObjectsWithDelimiterSlash is a test for the -// TestListObjects method with '/' as the prefix. -func TestListObjectsWithDelimiterSlash(t *testing.T) { +// TestObjectsWithDelimiterSlash is a test for the TestObjects method with '/' +// as the prefix. +func TestObjectsWithDelimiterSlash(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() @@ -1608,7 +1608,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { {"/", "", "size", "ASC", []api.ObjectMetadata{{Key: "/gab/", Size: 5, Health: 1}, {Key: "/fileÅ›/", Size: 6, Health: 1}, {Key: "/FOO/", Size: 7, Health: 1}, {Key: "/foo/", Size: 10, Health: .5}}}, } for _, test := range tests { - resp, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1) + resp, err := ss.Objects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1) if err != nil { t.Fatal(err) } @@ -1621,7 +1621,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { var marker string for offset := 0; offset < len(test.want); offset++ { - resp, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, marker, 1) + resp, err := ss.Objects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, marker, 1) if err != nil { t.Fatal(err) } @@ -1643,7 +1643,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { continue } - resp, err = ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, test.want[offset].Key, 1) + resp, err = ss.Objects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, test.want[offset].Key, 1) if err != nil { t.Fatal(err) } @@ -1662,7 +1662,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { } } -func TestListObjectsExplicitDir(t *testing.T) { +func TestObjectsExplicitDir(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() @@ -1710,7 +1710,7 @@ func TestListObjectsExplicitDir(t *testing.T) { {"/dir/", "", "", "", []api.ObjectMetadata{{ETag: "d34db33f", Key: "/dir/file", Size: 1, Health: 0.5, MimeType: testMimeType}}}, } for _, test := range tests { - got, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1) + got, err := ss.Objects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1) if err != nil { t.Fatal(err) } @@ -1723,9 +1723,9 @@ func TestListObjectsExplicitDir(t *testing.T) { } } -// TestListObjectsSubstring is a test for the ListObjects fuzzy +// TestObjectsSubstring is a test for the ListObjects fuzzy // search via the "substring" argument. -func TestListObjectsSubstring(t *testing.T) { +func TestObjectsSubstring(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() objects := []struct { @@ -1778,7 +1778,7 @@ func TestListObjectsSubstring(t *testing.T) { {"uu", []api.ObjectMetadata{{Key: "/foo/baz/quux", Size: 3, Health: 1}, {Key: "/foo/baz/quuz", Size: 4, Health: 1}, {Key: "/gab/guub", Size: 5, Health: 1}}}, } for _, test := range tests { - resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", test.key, "", "", "", "", -1) + resp, err := ss.Objects(ctx, api.DefaultBucketName, "", test.key, "", "", "", "", -1) if err != nil { t.Fatal(err) } @@ -1786,7 +1786,7 @@ func TestListObjectsSubstring(t *testing.T) { assertEqual(got, test.want) var marker string for offset := 0; offset < len(test.want); offset++ { - if resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", test.key, "", "", "", marker, 1); err != nil { + if resp, err := ss.Objects(ctx, api.DefaultBucketName, "", test.key, "", "", "", marker, 1); err != nil { t.Fatal(err) } else if got := resp.Objects; len(got) != 1 { t.Errorf("\nkey: %v unexpected number of objects, %d != 1", test.key, len(got)) @@ -2631,7 +2631,7 @@ func TestRenameObjects(t *testing.T) { } // Assert that number of objects matches. - resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", "/", "", "", "", "", 100) + resp, err := ss.Objects(ctx, api.DefaultBucketName, "", "/", "", "", "", "", 100) if err != nil { t.Fatal(err) } @@ -3322,7 +3322,7 @@ func TestBuckets(t *testing.T) { defer ss.Close() // List the buckets. Should be the default one. - buckets, err := ss.ListBuckets(context.Background()) + buckets, err := ss.Buckets(context.Background()) if err != nil { t.Fatal(err) } else if len(buckets) != 1 { @@ -3340,7 +3340,7 @@ func TestBuckets(t *testing.T) { t.Fatal(err) } else if err := ss.DeleteBucket(context.Background(), api.DefaultBucketName); err != nil { t.Fatal(err) - } else if buckets, err := ss.ListBuckets(context.Background()); err != nil { + } else if buckets, err := ss.Buckets(context.Background()); err != nil { t.Fatal(err) } else if len(buckets) != 2 { t.Fatal("expected 2 buckets", len(buckets)) @@ -3408,13 +3408,13 @@ func TestBucketObjects(t *testing.T) { } // List the objects in the buckets. - if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { + if resp, err := ss.Objects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) } else if entries[0].Size != 1 { t.Fatal("unexpected size", entries[0].Size) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) @@ -3423,13 +3423,13 @@ func TestBucketObjects(t *testing.T) { } // Search the objects in the buckets. - if resp, err := ss.ListObjects(context.Background(), b1, "", "", "", "", "", "", -1); err != nil { + if resp, err := ss.Objects(context.Background(), b1, "", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if objects := resp.Objects; len(objects) != 2 { t.Fatal("expected 2 objects", len(objects)) } else if objects[0].Size != 3 || objects[1].Size != 1 { t.Fatal("unexpected size", objects[0].Size, objects[1].Size) - } else if resp, err := ss.ListObjects(context.Background(), b2, "", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b2, "", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if objects := resp.Objects; len(objects) != 2 { t.Fatal("expected 2 objects", len(objects)) @@ -3440,13 +3440,13 @@ func TestBucketObjects(t *testing.T) { // Rename object foo/bar in bucket 1 to foo/baz but not in bucket 2. if err := ss.RenameObjectBlocking(context.Background(), b1, "/foo/bar", "/foo/baz", false); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 2 entries", len(entries)) } else if entries[0].Key != "/foo/baz" { t.Fatal("unexpected name", entries[0].Key) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 2 entries", len(entries)) @@ -3457,13 +3457,13 @@ func TestBucketObjects(t *testing.T) { // Rename foo/bar in bucket 2 using the batch rename. if err := ss.RenameObjectsBlocking(context.Background(), b2, "/foo/bar", "/foo/bam", false); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 2 entries", len(entries)) } else if entries[0].Key != "/foo/baz" { t.Fatal("unexpected name", entries[0].Key) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 2 entries", len(entries)) @@ -3476,28 +3476,28 @@ func TestBucketObjects(t *testing.T) { t.Fatal(err) } else if err := ss.RemoveObjectBlocking(context.Background(), b1, "/foo/baz"); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) > 0 { t.Fatal("expected 0 entries", len(entries)) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) } // Delete all files in bucket 2. - if resp, err := ss.ListObjects(context.Background(), b2, "/", "", "", "", "", "", -1); err != nil { + if resp, err := ss.Objects(context.Background(), b2, "/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 2 { t.Fatal("expected 2 entries", len(entries)) } else if err := ss.RemoveObjectsBlocking(context.Background(), b2, "/"); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b2, "/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 0 { t.Fatal("expected 0 entries", len(entries)) - } else if resp, err := ss.ListObjects(context.Background(), b1, "/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(context.Background(), b1, "/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) @@ -3548,7 +3548,7 @@ func TestCopyObject(t *testing.T) { // Copy it within the same bucket. if om, err := ss.CopyObject(ctx, "src", "src", "/foo", "/bar", "", nil); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(ctx, "src", "/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(ctx, "src", "/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 2 { t.Fatal("expected 2 entries", len(entries)) @@ -3561,7 +3561,7 @@ func TestCopyObject(t *testing.T) { // Copy it cross buckets. if om, err := ss.CopyObject(ctx, "src", "dst", "/foo", "/bar", "", nil); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(ctx, "dst", "/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.Objects(ctx, "dst", "/", "", "", "", "", "", -1); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) @@ -3647,7 +3647,7 @@ func TestMarkSlabUploadedAfterRenew(t *testing.T) { } } -func TestListObjectsNoDelimiter(t *testing.T) { +func TestObjectsNoDelimiter(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() objects := []struct { @@ -3721,7 +3721,7 @@ func TestListObjectsNoDelimiter(t *testing.T) { } } for _, test := range tests { - res, err := ss.ListObjects(ctx, api.DefaultBucketName, test.prefix, "", "", test.sortBy, test.sortDir, "", -1) + res, err := ss.Objects(ctx, api.DefaultBucketName, test.prefix, "", "", test.sortBy, test.sortDir, "", -1) if err != nil { t.Fatal(err) } @@ -3736,7 +3736,7 @@ func TestListObjectsNoDelimiter(t *testing.T) { if len(res.Objects) > 0 { marker := "" for offset := 0; offset < len(test.want); offset++ { - res, err := ss.ListObjects(ctx, api.DefaultBucketName, test.prefix, "", "", test.sortBy, test.sortDir, marker, 1) + res, err := ss.Objects(ctx, api.DefaultBucketName, test.prefix, "", "", test.sortBy, test.sortDir, marker, 1) if err != nil { t.Fatal(err) } diff --git a/stores/sql/database.go b/stores/sql/database.go index 651fe81f3..a645fd3bb 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -88,6 +88,9 @@ type ( // exist, it returns api.ErrBucketNotFound. Bucket(ctx context.Context, bucket string) (api.Bucket, error) + // Buckets returns a list of all buckets in the database. + Buckets(ctx context.Context) ([]api.Bucket, error) + // CompleteMultipartUpload completes a multipart upload by combining the // provided parts into an object in bucket 'bucket' with key 'key'. The // parts need to be provided in ascending partNumber order without @@ -192,12 +195,6 @@ type ( // scanned since at least maxLastScan. HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) - // ListBuckets returns a list of all buckets in the database. - ListBuckets(ctx context.Context) ([]api.Bucket, error) - - // ListObjects returns a list of objects from the given bucket. - ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsListResponse, err error) - // MakeDirsForPath creates all directories for a given object's path. MakeDirsForPath(ctx context.Context, path string) (int64, error) @@ -220,6 +217,9 @@ type ( // Object returns an object from the database. Object(ctx context.Context, bucket, key string) (api.Object, error) + // Objects returns a list of objects from the given bucket. + Objects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsResponse, err error) + // ObjectMetadata returns an object's metadata. ObjectMetadata(ctx context.Context, bucket, key string) (api.Object, error) diff --git a/stores/sql/main.go b/stores/sql/main.go index 7d66fcc00..e17aecb0b 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -221,6 +221,24 @@ func Bucket(ctx context.Context, tx sql.Tx, bucket string) (api.Bucket, error) { return b, nil } +func Buckets(ctx context.Context, tx sql.Tx) ([]api.Bucket, error) { + rows, err := tx.Query(ctx, "SELECT created_at, name, COALESCE(policy, '{}') FROM buckets") + if err != nil { + return nil, fmt.Errorf("failed to fetch buckets: %w", err) + } + defer rows.Close() + + var buckets []api.Bucket + for rows.Next() { + bucket, err := scanBucket(rows) + if err != nil { + return nil, fmt.Errorf("failed to scan bucket: %w", err) + } + buckets = append(buckets, bucket) + } + return buckets, nil +} + func Contract(ctx context.Context, tx sql.Tx, fcid types.FileContractID) (api.ContractMetadata, error) { contracts, err := QueryContracts(ctx, tx, []string{"c.fcid = ?"}, []any{FileContractID(fcid)}) if err != nil { @@ -1158,24 +1176,6 @@ func PrepareSlabHealth(ctx context.Context, tx sql.Tx, limit int64, now time.Tim return err } -func ListBuckets(ctx context.Context, tx sql.Tx) ([]api.Bucket, error) { - rows, err := tx.Query(ctx, "SELECT created_at, name, COALESCE(policy, '{}') FROM buckets") - if err != nil { - return nil, fmt.Errorf("failed to fetch buckets: %w", err) - } - defer rows.Close() - - var buckets []api.Bucket - for rows.Next() { - bucket, err := scanBucket(rows) - if err != nil { - return nil, fmt.Errorf("failed to scan bucket: %w", err) - } - buckets = append(buckets, bucket) - } - return buckets, nil -} - func whereObjectMarker(marker, sortBy, sortDir string, queryMarker func(dst any, marker, col string) error) (whereExprs []string, whereArgs []any, _ error) { if marker == "" { return nil, nil, nil @@ -1250,18 +1250,6 @@ func orderByObject(sortBy, sortDir string) (orderByExprs []string, _ error) { return orderByExprs, nil } -func ListObjects(ctx context.Context, tx Tx, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsListResponse, err error) { - switch delim { - case "": - resp, err = listObjectsNoDelim(ctx, tx, bucket, prefix, substring, sortBy, sortDir, marker, limit) - case "/": - resp, err = listObjectsSlashDelim(ctx, tx, bucket, prefix, sortBy, sortDir, marker, limit) - default: - err = fmt.Errorf("unsupported delimiter: '%s'", delim) - } - return -} - func MultipartUpload(ctx context.Context, tx sql.Tx, uploadID string) (api.MultipartUpload, error) { resp, err := scanMultipartUpload(tx.QueryRow(ctx, "SELECT b.name, mu.key, mu.object_id, mu.upload_id, mu.created_at FROM multipart_uploads mu INNER JOIN buckets b ON b.id = mu.db_bucket_id WHERE mu.upload_id = ?", uploadID)) if err != nil { @@ -1504,6 +1492,18 @@ func dirID(ctx context.Context, tx sql.Tx, dirPath string) (int64, error) { return id, nil } +func Objects(ctx context.Context, tx Tx, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsResponse, err error) { + switch delim { + case "": + resp, err = listObjectsNoDelim(ctx, tx, bucket, prefix, substring, sortBy, sortDir, marker, limit) + case "/": + resp, err = listObjectsSlashDelim(ctx, tx, bucket, prefix, sortBy, sortDir, marker, limit) + default: + err = fmt.Errorf("unsupported delimiter: '%s'", delim) + } + return +} + func ObjectMetadata(ctx context.Context, tx Tx, bucket, key string) (api.Object, error) { // fetch object id var objID int64 @@ -2633,7 +2633,7 @@ func Object(ctx context.Context, tx Tx, bucket, key string) (api.Object, error) }, nil } -func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) { +func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, sortBy, sortDir, marker string, limit int) (api.ObjectsResponse, error) { // fetch one more to see if there are more entries if limit <= -1 { limit = math.MaxInt @@ -2668,7 +2668,7 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s // apply sorting orderByExprs, err := orderByObject(sortBy, sortDir) if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to apply sorting: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to apply sorting: %w", err) } // apply marker @@ -2686,7 +2686,7 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s } }) if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to get marker exprs: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to get marker exprs: %w", err) } whereExprs = append(whereExprs, markerExprs...) whereArgs = append(whereArgs, markerArgs...) @@ -2707,7 +2707,7 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s strings.Join(orderByExprs, ", ")), whereArgs...) if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to fetch objects: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to fetch objects: %w", err) } defer rows.Close() @@ -2715,7 +2715,7 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s for rows.Next() { om, err := tx.ScanObjectMetadata(rows) if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to scan object metadata: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to scan object metadata: %w", err) } objects = append(objects, om) } @@ -2730,14 +2730,14 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s } } - return api.ObjectsListResponse{ + return api.ObjectsResponse{ HasMore: hasMore, NextMarker: nextMarker, Objects: objects, }, nil } -func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) { +func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, sortDir, marker string, limit int) (api.ObjectsResponse, error) { // split prefix into path and object prefix path := "/" // root of bucket if idx := strings.LastIndex(prefix, "/"); idx != -1 { @@ -2766,9 +2766,9 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s // fetch directory id dirID, err := dirID(ctx, tx, path) if errors.Is(err, dsql.ErrNoRows) { - return api.ObjectsListResponse{}, nil + return api.ObjectsResponse{}, nil } else if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to fetch directory id: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to fetch directory id: %w", err) } args := []any{ @@ -2825,7 +2825,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s } }) if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to query marker: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to query marker: %w", err) } else if len(markerExprs) > 0 { whereExpr = "WHERE " + strings.Join(markerExprs, " AND ") } @@ -2834,7 +2834,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s // apply sorting orderByExprs, err := orderByObject(sortBy, sortDir) if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to apply sorting: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to apply sorting: %w", err) } // apply offset and limit @@ -2873,7 +2873,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s strings.Join(orderByExprs, ", "), ), args...) if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to fetch objects: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to fetch objects: %w", err) } defer rows.Close() @@ -2881,7 +2881,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s for rows.Next() { om, err := tx.ScanObjectMetadata(rows) if err != nil { - return api.ObjectsListResponse{}, fmt.Errorf("failed to scan object metadata: %w", err) + return api.ObjectsResponse{}, fmt.Errorf("failed to scan object metadata: %w", err) } objects = append(objects, om) } @@ -2897,7 +2897,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s } } - return api.ObjectsListResponse{ + return api.ObjectsResponse{ HasMore: hasMore, NextMarker: nextMarker, Objects: objects, diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 632077492..a83ac23ac 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -217,6 +217,10 @@ func (tx *MainDatabaseTx) Bucket(ctx context.Context, bucket string) (api.Bucket return ssql.Bucket(ctx, tx, bucket) } +func (tx *MainDatabaseTx) Buckets(ctx context.Context) ([]api.Bucket, error) { + return ssql.Buckets(ctx, tx) +} + func (tx *MainDatabaseTx) CharLengthExpr() string { return "CHAR_LENGTH" } @@ -479,14 +483,6 @@ func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids return res.RowsAffected() } -func (tx *MainDatabaseTx) ListBuckets(ctx context.Context) ([]api.Bucket, error) { - return ssql.ListBuckets(ctx, tx) -} - -func (tx *MainDatabaseTx) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) { - return ssql.ListObjects(ctx, tx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) -} - func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int64, error) { // Create root dir. dirID := int64(sql.DirectoriesRootID) @@ -543,6 +539,10 @@ func (tx *MainDatabaseTx) Object(ctx context.Context, bucket, key string) (api.O return ssql.Object(ctx, tx, bucket, key) } +func (tx *MainDatabaseTx) Objects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsResponse, error) { + return ssql.Objects(ctx, tx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) +} + func (tx *MainDatabaseTx) ObjectMetadata(ctx context.Context, bucket, key string) (api.Object, error) { return ssql.ObjectMetadata(ctx, tx, bucket, key) } diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 8f903e053..b1ffc8c20 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -216,6 +216,10 @@ func (tx *MainDatabaseTx) Bucket(ctx context.Context, bucket string) (api.Bucket return ssql.Bucket(ctx, tx, bucket) } +func (tx *MainDatabaseTx) Buckets(ctx context.Context) ([]api.Bucket, error) { + return ssql.Buckets(ctx, tx) +} + func (tx *MainDatabaseTx) CharLengthExpr() string { return "LENGTH" } @@ -465,14 +469,6 @@ func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids return res.RowsAffected() } -func (tx *MainDatabaseTx) ListBuckets(ctx context.Context) ([]api.Bucket, error) { - return ssql.ListBuckets(ctx, tx) -} - -func (tx *MainDatabaseTx) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) { - return ssql.ListObjects(ctx, tx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) -} - func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int64, error) { insertDirStmt, err := tx.Prepare(ctx, "INSERT INTO directories (name, db_parent_id) VALUES (?, ?) ON CONFLICT(name) DO NOTHING") if err != nil { @@ -539,6 +535,10 @@ func (tx *MainDatabaseTx) Object(ctx context.Context, bucket, key string) (api.O return ssql.Object(ctx, tx, bucket, key) } +func (tx *MainDatabaseTx) Objects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsResponse, error) { + return ssql.Objects(ctx, tx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) +} + func (tx *MainDatabaseTx) ObjectMetadata(ctx context.Context, bucket, key string) (api.Object, error) { return ssql.ObjectMetadata(ctx, tx, bucket, key) } diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 93fcd9888..194a30206 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -632,8 +632,8 @@ func (*s3Mock) CopyObject(context.Context, string, string, string, string, api.C return api.ObjectMetadata{}, nil } -func (*s3Mock) ListObjects(context.Context, string, api.ListObjectOptions) (resp api.ObjectsListResponse, err error) { - return api.ObjectsListResponse{}, nil +func (*s3Mock) ListObjects(context.Context, string, api.ListObjectOptions) (resp api.ObjectsResponse, err error) { + return api.ObjectsResponse{}, nil } func (*s3Mock) AbortMultipartUpload(context.Context, string, string, string) (err error) { diff --git a/worker/s3/s3.go b/worker/s3/s3.go index c278bf37f..fcf2680ae 100644 --- a/worker/s3/s3.go +++ b/worker/s3/s3.go @@ -33,7 +33,7 @@ type Bus interface { AddObject(ctx context.Context, bucket, key, contractSet string, o object.Object, opts api.AddObjectOptions) (err error) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey string, opts api.CopyObjectOptions) (om api.ObjectMetadata, err error) DeleteObject(ctx context.Context, bucket, key string) (err error) - Objects(ctx context.Context, bucket, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) + Objects(ctx context.Context, bucket, prefix string, opts api.ListObjectOptions) (resp api.ObjectsResponse, err error) AbortMultipartUpload(ctx context.Context, bucket, key string, uploadID string) (err error) CompleteMultipartUpload(ctx context.Context, bucket, key, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error) From ac978bc3bf10fe8e10c657de456f5690044bee7e Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 17 Sep 2024 10:27:02 +0200 Subject: [PATCH 3/4] testing: rename --- internal/test/e2e/cluster_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 1136fc10c..d7b5fc637 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -34,7 +34,7 @@ import ( "lukechampine.com/frand" ) -func TestListObjectsWithNoDelimiter(t *testing.T) { +func TestObjectsWithNoDelimiter(t *testing.T) { if testing.Short() { t.SkipNow() } @@ -373,11 +373,11 @@ func TestNewTestCluster(t *testing.T) { } } -// TestListObjectsWithDelimiterSlash is an integration test that verifies +// TestObjectsWithDelimiterSlash is an integration test that verifies // objects are uploaded, download and deleted from and to the paths we // would expect. It is similar to the TestObjectEntries unit test, but uses // the worker and bus client to verify paths are passed correctly. -func TestListObjectsWithDelimiterSlash(t *testing.T) { +func TestObjectsWithDelimiterSlash(t *testing.T) { if testing.Short() { t.SkipNow() } From 7055fece0a5663391baaacf3f8d5d151574c02ac Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 17 Sep 2024 10:37:01 +0200 Subject: [PATCH 4/4] worker: add RemoveObjects to client --- worker/client/client.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/worker/client/client.go b/worker/client/client.go index d60b1bb69..4fdcdffe4 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -170,6 +170,15 @@ func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) return } +// RemoveObjects removes the object with given prefix. +func (c *Client) RemoveObjects(ctx context.Context, bucket, prefix string) (err error) { + err = c.c.WithContext(ctx).POST("/objects/remove", api.ObjectsRemoveRequest{ + Bucket: bucket, + Prefix: prefix, + }, nil) + return +} + // State returns the current state of the worker. func (c *Client) State() (state api.WorkerStateResponse, err error) { err = c.c.GET("/state", &state)