Skip to content

Commit

Permalink
bus,worker: update object endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Sep 17, 2024
1 parent 61ab464 commit 5af724b
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 71 deletions.
16 changes: 6 additions & 10 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ type (
Objects []ObjectMetadata `json:"objects"`
}

// ObjectsRemoveRequest is the request type for the /bus/objects/remove endpoint.
ObjectsRemoveRequest struct {
Bucket string `json:"bucket"`
Prefix string `json:"prefix"`
}

// ObjectsRenameRequest is the request type for the /bus/objects/rename endpoint.
ObjectsRenameRequest struct {
Bucket string `json:"bucket"`
Expand Down Expand Up @@ -189,10 +195,6 @@ type (
Metadata ObjectUserMetadata `json:"metadata"`
}

DeleteObjectOptions struct {
Batch bool
}

HeadObjectOptions struct {
Range *DownloadRange
}
Expand Down Expand Up @@ -284,12 +286,6 @@ func (opts DownloadObjectOptions) ApplyHeaders(h http.Header) {
}
}

func (opts DeleteObjectOptions) Apply(values url.Values) {
if opts.Batch {
values.Set("batch", "true")
}
}

func (opts HeadObjectOptions) Apply(values url.Values) {
}

Expand Down
13 changes: 7 additions & 6 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,12 +457,13 @@ func (b *Bus) Handler() http.Handler {
"POST /multipart/listuploads": b.multipartHandlerListUploadsPOST,
"POST /multipart/listparts": b.multipartHandlerListPartsPOST,

"GET /listobjects/*prefix": b.objectsHandlerGET,
"GET /objects/*key": b.objectHandlerGET,
"PUT /objects/*key": b.objectsHandlerPUT,
"DELETE /objects/*key": b.objectsHandlerDELETE,
"POST /objects/copy": b.objectsCopyHandlerPOST,
"POST /objects/rename": b.objectsRenameHandlerPOST,
"GET /object/*key": b.objectHandlerGET,
"PUT /object/*key": b.objectHandlerPUT,
"DELETE /object/*key": b.objectHandlerDELETE,
"GET /objects/*prefix": b.objectsHandlerGET,
"POST /objects/copy": b.objectsCopyHandlerPOST,
"POST /objects/remove": b.objectsRemoveHandlerPOST,
"POST /objects/rename": b.objectsRenameHandlerPOST,

"GET /params/gouging": b.paramsHandlerGougingGET,
"GET /params/upload": b.paramsHandlerUploadGET,
Expand Down
21 changes: 14 additions & 7 deletions bus/client/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// AddObject stores the provided object under the given path.
func (c *Client) AddObject(ctx context.Context, bucket, path, contractSet string, o object.Object, opts api.AddObjectOptions) (err error) {
path = api.ObjectKeyEscape(path)
err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/objects/%s", path), api.AddObjectRequest{
err = c.c.WithContext(ctx).PUT(fmt.Sprintf("/object/%s", path), api.AddObjectRequest{
Bucket: bucket,
ContractSet: contractSet,
Object: o,
Expand All @@ -37,15 +37,22 @@ func (c *Client) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, d
return
}

// DeleteObject either deletes the object at the given key or if batch=true
// deletes all objects that start with the given key.
func (c *Client) DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) (err error) {
// DeleteObject deletes the object with given key.
func (c *Client) DeleteObject(ctx context.Context, bucket, key string) (err error) {
values := url.Values{}
values.Set("bucket", bucket)
opts.Apply(values)

key = api.ObjectKeyEscape(key)
err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/objects/%s?"+values.Encode(), key))
err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/object/%s?"+values.Encode(), key))
return
}

// RemoveObjects removes objects with given prefix.
func (c *Client) RemoveObjects(ctx context.Context, bucket, prefix string) (err error) {
err = c.c.WithContext(ctx).POST("/objects/remove", api.ObjectsRemoveRequest{
Bucket: bucket,
Prefix: prefix,
}, nil)
return
}

Expand All @@ -58,7 +65,7 @@ func (c *Client) Object(ctx context.Context, bucket, key string, opts api.GetObj
key = api.ObjectKeyEscape(key)
key += "?" + values.Encode()

err = c.c.WithContext(ctx).GET(fmt.Sprintf("/objects/%s", key), &res)
err = c.c.WithContext(ctx).GET(fmt.Sprintf("/object/%s", key), &res)
return
}

Expand Down
31 changes: 19 additions & 12 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func (b *Bus) objectsHandlerGET(jc jape.Context) {
jc.Encode(resp)
}

func (b *Bus) objectsHandlerPUT(jc jape.Context) {
func (b *Bus) objectHandlerPUT(jc jape.Context) {
var aor api.AddObjectRequest
if jc.Decode(&aor) != nil {
return
Expand All @@ -1208,6 +1208,22 @@ func (b *Bus) objectsCopyHandlerPOST(jc jape.Context) {
jc.Encode(om)
}

func (b *Bus) objectsRemoveHandlerPOST(jc jape.Context) {
var orr api.ObjectsRemoveRequest
if jc.Decode(&orr) != nil {
return
} else if orr.Bucket == "" {
orr.Bucket = api.DefaultBucketName
}

if orr.Prefix == "" {
jc.Error(errors.New("prefix cannot be empty"), http.StatusBadRequest)
return
}

jc.Check("failed to remove objects", b.ms.RemoveObjects(jc.Request.Context(), orr.Bucket, orr.Prefix))
}

func (b *Bus) objectsRenameHandlerPOST(jc jape.Context) {
var orr api.ObjectsRenameRequest
if jc.Decode(&orr) != nil {
Expand Down Expand Up @@ -1238,21 +1254,12 @@ func (b *Bus) objectsRenameHandlerPOST(jc jape.Context) {
}
}

func (b *Bus) objectsHandlerDELETE(jc jape.Context) {
var batch bool
if jc.DecodeForm("batch", &batch) != nil {
return
}
func (b *Bus) objectHandlerDELETE(jc jape.Context) {
bucket := api.DefaultBucketName
if jc.DecodeForm("bucket", &bucket) != nil {
return
}
var err error
if batch {
err = b.ms.RemoveObjects(jc.Request.Context(), bucket, jc.PathParam("key"))
} else {
err = b.ms.RemoveObject(jc.Request.Context(), bucket, jc.PathParam("key"))
}
err := b.ms.RemoveObject(jc.Request.Context(), bucket, jc.PathParam("key"))
if errors.Is(err, api.ErrObjectNotFound) {
jc.Error(err, http.StatusNotFound)
return
Expand Down
10 changes: 5 additions & 5 deletions internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) {

// delete all uploads
for _, upload := range uploads {
tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, upload.key, api.DeleteObjectOptions{}))
tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, upload.key))
}

// assert root dir is empty
Expand Down Expand Up @@ -870,7 +870,7 @@ func TestUploadDownloadExtended(t *testing.T) {
}

// delete the object
tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, path, api.DeleteObjectOptions{}))
tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, path))
}
}

Expand Down Expand Up @@ -1216,7 +1216,7 @@ func TestParallelUpload(t *testing.T) {
}

// Delete all objects under /dir/.
if err := cluster.Bus.DeleteObject(context.Background(), api.DefaultBucketName, "/dir/", api.DeleteObjectOptions{Batch: true}); err != nil {
if err := cluster.Bus.RemoveObjects(context.Background(), api.DefaultBucketName, "/dir/"); err != nil {
t.Fatal(err)
}
resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100})
Expand All @@ -1227,7 +1227,7 @@ func TestParallelUpload(t *testing.T) {
}

// Delete all objects under /.
if err := cluster.Bus.DeleteObject(context.Background(), api.DefaultBucketName, "/", api.DeleteObjectOptions{Batch: true}); err != nil {
if err := cluster.Bus.RemoveObjects(context.Background(), api.DefaultBucketName, "/"); err != nil {
t.Fatal(err)
}
resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100})
Expand Down Expand Up @@ -1402,7 +1402,7 @@ func TestUploadDownloadSameHost(t *testing.T) {
}

// delete the object
tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, fmt.Sprintf("foo_%d", i), api.DeleteObjectOptions{}))
tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, fmt.Sprintf("foo_%d", i)))
}

// wait until the slabs and sectors were pruned before constructing the
Expand Down
2 changes: 1 addition & 1 deletion internal/test/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMetrics(t *testing.T) {
data := frand.Bytes(rhpv2.SectorSize)
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, "foo", api.UploadObjectOptions{}))
tt.OK(w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, "foo", api.DownloadObjectOptions{}))
tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, "foo", api.DeleteObjectOptions{}))
tt.OK(w.DeleteObject(context.Background(), api.DefaultBucketName, "foo"))

tt.Retry(30, time.Second, func() (err error) {
defer func() {
Expand Down
4 changes: 2 additions & 2 deletions internal/test/e2e/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestSectorPruning(t *testing.T) {
// delete every other object
for i := 0; i < numObjects; i += 2 {
filename := fmt.Sprintf("obj_%d", i)
tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename, api.DeleteObjectOptions{}))
tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename))
}

// assert amount of prunable data
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestSectorPruning(t *testing.T) {
// delete other object
for i := 1; i < numObjects; i += 2 {
filename := fmt.Sprintf("obj_%d", i)
tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename, api.DeleteObjectOptions{}))
tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename))
}

// assert amount of prunable data
Expand Down
2 changes: 1 addition & 1 deletion internal/utils/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (t TreeMux) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func Auth(password string, unauthenticatedDownloads bool) func(http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if unauthenticatedDownloads && req.Method == http.MethodGet && strings.HasPrefix(req.URL.Path, "/objects/") {
if unauthenticatedDownloads && req.Method == http.MethodGet && strings.HasPrefix(req.URL.Path, "/object/") {
h.ServeHTTP(w, req)
} else {
jape.BasicAuth(password)(h).ServeHTTP(w, req)
Expand Down
17 changes: 8 additions & 9 deletions worker/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ func (c *Client) Contracts(ctx context.Context, hostTimeout time.Duration) (resp
}

// DeleteObject deletes the object at the given path.
func (c *Client) DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) (err error) {
func (c *Client) DeleteObject(ctx context.Context, bucket, key string) (err error) {
values := url.Values{}
values.Set("bucket", bucket)
opts.Apply(values)

key = api.ObjectKeyEscape(key)
err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/objects/%s?"+values.Encode(), key))
err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/object/%s?"+values.Encode(), key))
return
}

Expand Down Expand Up @@ -92,7 +91,7 @@ func (c *Client) DownloadStats() (resp api.DownloadStatsResponse, err error) {

// HeadObject returns the metadata of the object at the given key.
func (c *Client) HeadObject(ctx context.Context, bucket, key string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) {
c.c.Custom("HEAD", fmt.Sprintf("/objects/%s", key), nil, nil)
c.c.Custom("HEAD", fmt.Sprintf("/object/%s", key), nil, nil)

values := url.Values{}
values.Set("bucket", url.QueryEscape(bucket))
Expand All @@ -101,7 +100,7 @@ func (c *Client) HeadObject(ctx context.Context, bucket, key string, opts api.He
key += "?" + values.Encode()

// TODO: support HEAD in jape client
req, err := http.NewRequestWithContext(ctx, "HEAD", fmt.Sprintf("%s/objects/%s", c.c.BaseURL, key), http.NoBody)
req, err := http.NewRequestWithContext(ctx, "HEAD", fmt.Sprintf("%s/object/%s", c.c.BaseURL, key), http.NoBody)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -213,12 +212,12 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc
// UploadObject uploads the data in r, creating an object at the given path.
func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, key string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) {
key = api.ObjectKeyEscape(key)
c.c.Custom("PUT", fmt.Sprintf("/objects/%s", key), []byte{}, nil)
c.c.Custom("PUT", fmt.Sprintf("/object/%s", key), []byte{}, nil)

values := make(url.Values)
values.Set("bucket", bucket)
opts.ApplyValues(values)
u, err := url.Parse(fmt.Sprintf("%v/objects/%v", c.c.BaseURL, key))
u, err := url.Parse(fmt.Sprintf("%v/object/%v", c.c.BaseURL, key))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -258,8 +257,8 @@ func (c *Client) object(ctx context.Context, bucket, key string, opts api.Downlo
values.Set("bucket", url.QueryEscape(bucket))
key += "?" + values.Encode()

c.c.Custom("GET", fmt.Sprintf("/objects/%s", key), nil, (*[]api.ObjectMetadata)(nil))
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/objects/%s", c.c.BaseURL, key), http.NoBody)
c.c.Custom("GET", fmt.Sprintf("/object/%s", key), nil, (*[]api.ObjectMetadata)(nil))
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/object/%s", c.c.BaseURL, key), http.NoBody)
if err != nil {
panic(err)
}
Expand Down
6 changes: 5 additions & 1 deletion worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (os *objectStoreMock) DeleteHostSector(ctx context.Context, hk types.Public
return nil
}

func (os *objectStoreMock) DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) error {
func (os *objectStoreMock) DeleteObject(ctx context.Context, bucket, key string) error {
return nil
}

Expand Down Expand Up @@ -587,6 +587,10 @@ func (os *objectStoreMock) MultipartUpload(ctx context.Context, uploadID string)
return api.MultipartUpload{}, nil
}

func (os *objectStoreMock) RemoveObjects(ctx context.Context, bucket, prefix string) error {
return nil
}

func (os *objectStoreMock) totalSlabBufferSize() (total int) {
for _, p := range os.partials {
if time.Now().After(p.lockedUntil) {
Expand Down
4 changes: 2 additions & 2 deletions worker/s3/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (s *s3) HeadObject(ctx context.Context, bucketName, key string) (*gofakes3.
// delete marker, which becomes the latest version of the object. If there
// isn't a null version, Amazon S3 does not remove any objects.
func (s *s3) DeleteObject(ctx context.Context, bucketName, key string) (gofakes3.ObjectDeleteResult, error) {
err := s.b.DeleteObject(ctx, bucketName, key, api.DeleteObjectOptions{})
err := s.b.DeleteObject(ctx, bucketName, key)
if utils.IsErr(err, api.ErrBucketNotFound) {
return gofakes3.ObjectDeleteResult{}, gofakes3.BucketNotFound(bucketName)
} else if utils.IsErr(err, api.ErrObjectNotFound) {
Expand Down Expand Up @@ -354,7 +354,7 @@ func (s *s3) PutObject(ctx context.Context, bucketName, key string, meta map[str
func (s *s3) DeleteMulti(ctx context.Context, bucketName string, objects ...string) (gofakes3.MultiDeleteResult, error) {
var res gofakes3.MultiDeleteResult
for _, key := range objects {
err := s.b.DeleteObject(ctx, bucketName, key, api.DeleteObjectOptions{})
err := s.b.DeleteObject(ctx, bucketName, key)
if err != nil && !utils.IsErr(err, api.ErrObjectNotFound) {
res.Error = append(res.Error, gofakes3.ErrorResult{
Key: key,
Expand Down
2 changes: 1 addition & 1 deletion worker/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Bus interface {

AddObject(ctx context.Context, bucket, key, contractSet string, o object.Object, opts api.AddObjectOptions) (err error)
CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey string, opts api.CopyObjectOptions) (om api.ObjectMetadata, err error)
DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) (err error)
DeleteObject(ctx context.Context, bucket, key string) (err error)
Objects(ctx context.Context, bucket, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error)

AbortMultipartUpload(ctx context.Context, bucket, key string, uploadID string) (err error)
Expand Down
Loading

0 comments on commit 5af724b

Please sign in to comment.