From cbfbd0aba7a2fd465d73c36ec8ff14573f9c8dec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 30 Nov 2023 17:27:42 +0100 Subject: [PATCH] use UploadSessionLister interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- go.mod | 2 + go.sum | 4 +- services/storage-users/pkg/command/uploads.go | 40 ++--- .../v2/pkg/rhttp/datatx/manager/tus/tus.go | 48 +++--- .../cs3org/reva/v2/pkg/storage/storage.go | 18 --- .../cs3org/reva/v2/pkg/storage/uploads.go | 86 ++++++++++ .../pkg/storage/utils/decomposedfs/recycle.go | 147 ++++++++++++------ .../pkg/storage/utils/decomposedfs/upload.go | 107 ++++++++----- .../utils/decomposedfs/upload/processing.go | 83 ++++++++++ vendor/modules.txt | 3 +- 10 files changed, 375 insertions(+), 163 deletions(-) create mode 100644 vendor/github.com/cs3org/reva/v2/pkg/storage/uploads.go diff --git a/go.mod b/go.mod index 8cf950af5a4..8dd9e4f0a55 100644 --- a/go.mod +++ b/go.mod @@ -346,3 +346,5 @@ require ( ) replace github.com/go-micro/plugins/v4/store/nats-js => github.com/kobergj/plugins/v4/store/nats-js v1.2.1-0.20231020092801-9463c820c19a + +replace github.com/cs3org/reva/v2 => github.com/butonic/reva/v2 v2.0.0-20231201103817-4db6635be3e4 diff --git a/go.sum b/go.sum index a234ba17035..3eff1e7de23 100644 --- a/go.sum +++ b/go.sum @@ -941,6 +941,8 @@ github.com/bombsimon/logrusr/v3 v3.1.0/go.mod h1:PksPPgSFEL2I52pla2glgCyyd2OqOHA github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/butonic/reva/v2 v2.0.0-20231201103817-4db6635be3e4 h1:G542puOCIFsTBwl/aXXzRZTSSqdLKzYYUBJbo1/gnnM= +github.com/butonic/reva/v2 v2.0.0-20231201103817-4db6635be3e4/go.mod h1:zcrrYVsBv/DwhpyO2/W5hoSZ/k6az6Z2EYQok65uqZY= github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/bytecodealliance/wasmtime-go/v3 v3.0.2 h1:3uZCA/BLTIu+DqCfguByNMJa2HVHpXvjfy0Dy7g6fuA= github.com/bytecodealliance/wasmtime-go/v3 v3.0.2/go.mod h1:RnUjnIXxEJcL6BgCvNyzCCRzZcxCgsZCi+RNlvYor5Q= @@ -1017,8 +1019,6 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c= github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.16.1-0.20231128104331-ea8d1336afc9 h1:5vKQcL1hPHEZKu9e8C9rl0ap3ofMBznmoSgi4lRYXec= -github.com/cs3org/reva/v2 v2.16.1-0.20231128104331-ea8d1336afc9/go.mod h1:zcrrYVsBv/DwhpyO2/W5hoSZ/k6az6Z2EYQok65uqZY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/services/storage-users/pkg/command/uploads.go b/services/storage-users/pkg/command/uploads.go index 568864defca..8217a669c4a 100644 --- a/services/storage-users/pkg/command/uploads.go +++ b/services/storage-users/pkg/command/uploads.go @@ -3,11 +3,8 @@ package command import ( "fmt" "os" - "strconv" "sync" - "time" - tusd "github.com/tus/tusd/pkg/handler" "github.com/urfave/cli/v2" "github.com/cs3org/reva/v2/pkg/storage" @@ -51,20 +48,21 @@ func ListUploads(cfg *config.Config) *cli.Command { return err } - managingFS, ok := fs.(storage.UploadsManager) + managingFS, ok := fs.(storage.UploadSessionLister) if !ok { fmt.Fprintf(os.Stderr, "'%s' storage does not support listing expired uploads\n", cfg.Driver) os.Exit(1) } - - uploads, err := managingFS.ListUploads() + falseValue := false + uploads, err := managingFS.ListUploadSessions(c.Context, storage.UploadSessionFilter{Expired: &falseValue}) if err != nil { return err } fmt.Println("Incomplete uploads:") for _, u := range uploads { - fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", u.ID, u.MetaData["filename"], u.Size, expiredString(u.MetaData["expires"])) + ref := u.Reference() + fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) } return nil }, @@ -92,7 +90,7 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command { return err } - managingFS, ok := fs.(storage.UploadsManager) + managingFS, ok := fs.(storage.UploadSessionLister) if !ok { fmt.Fprintf(os.Stderr, "'%s' storage does not support clean expired uploads\n", cfg.Driver) os.Exit(1) @@ -100,18 +98,23 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command { wg := sync.WaitGroup{} wg.Add(1) - purgedChannel := make(chan tusd.FileInfo) + falseValue := false + trueValue := false + uploads, err := managingFS.ListUploadSessions(c.Context, storage.UploadSessionFilter{Expired: &trueValue, Processing: &falseValue}) + if err != nil { + return err + } - fmt.Println("Cleaned uploads:") + fmt.Println("purging uploads:") go func() { - for purged := range purgedChannel { - fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", purged.ID, purged.MetaData["filename"], purged.Size, expiredString(purged.MetaData["expires"])) + for _, u := range uploads { + ref := u.Reference() + fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) + u.Purge(c.Context) } wg.Done() }() - err = managingFS.PurgeExpiredUploads(purgedChannel) - close(purgedChannel) wg.Wait() if err != nil { fmt.Fprintf(os.Stderr, "Failed to clean expired uploads '%s'\n", err) @@ -121,12 +124,3 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command { }, } } - -func expiredString(e string) string { - expired := "N/A" - iExpires, err := strconv.Atoi(e) - if err == nil { - expired = time.Unix(int64(iExpires), 0).Format(time.RFC3339) - } - return expired -} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus/tus.go b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus/tus.go index f98ce8adb10..a46bd177392 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus/tus.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus/tus.go @@ -23,13 +23,11 @@ import ( "log" "net/http" "path" - "path/filepath" "time" "github.com/pkg/errors" tusd "github.com/tus/tusd/pkg/handler" - userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net" "github.com/cs3org/reva/v2/pkg/appctx" @@ -40,8 +38,8 @@ import ( "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/cache" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/cs3org/reva/v2/pkg/utils" "github.com/mitchellh/mapstructure" ) @@ -103,33 +101,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, err } - go func() { - for { - ev := <-handler.CompleteUploads - info := ev.Upload - spaceOwner := &userv1beta1.UserId{ - OpaqueId: info.Storage["SpaceOwnerOrManager"], - } - owner := &userv1beta1.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - } - ref := &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: info.MetaData["providerID"], - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], - }, - Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), - } - datatx.InvalidateCache(owner, ref, m.statCache) - if m.publisher != nil { - if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil { - appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") + if _, ok := fs.(storage.UploadSessionLister); ok { + // We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info + go func() { + for { + ev := <-handler.CompleteUploads + // We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files + // so we create a Progress instance here that is used to read the correct properties + up := upload.Progress{ + Info: ev.Upload, + } + executant := up.Executant() + ref := up.Reference() + datatx.InvalidateCache(&executant, &ref, m.statCache) + if m.publisher != nil { + if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil { + appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") + } } } - } - }() + }() + } h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { method := r.Method diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/storage.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/storage.go index bc73db0fdcc..70ceb5134a7 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/storage.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/storage.go @@ -23,16 +23,10 @@ import ( "io" "net/url" - tusd "github.com/tus/tusd/pkg/handler" - - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" ) -// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished -type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference) - // FS is the interface to implement access to the storage. type FS interface { GetHome(ctx context.Context) (string, error) @@ -77,12 +71,6 @@ type FS interface { DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error } -// UploadsManager defines the interface for FS implementations that allow for managing uploads -type UploadsManager interface { - ListUploads() ([]tusd.FileInfo, error) - PurgeExpiredUploads(chan<- tusd.FileInfo) error -} - // Registry is the interface that storage registries implement // for discovering storage providers type Registry interface { @@ -98,9 +86,3 @@ type PathWrapper interface { Unwrap(ctx context.Context, rp string) (string, error) Wrap(ctx context.Context, rp string) (string, error) } - -type UploadRequest struct { - Ref *provider.Reference - Body io.ReadCloser - Length int64 -} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/uploads.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/uploads.go new file mode 100644 index 00000000000..87d26115bf2 --- /dev/null +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/uploads.go @@ -0,0 +1,86 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package storage + +import ( + "context" + "io" + "time" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + tusd "github.com/tus/tusd/pkg/handler" +) + +// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished +type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference) + +// UploadRequest us used in FS.Upload() to carry required upload metadata +type UploadRequest struct { + Ref *provider.Reference + Body io.ReadCloser + Length int64 +} + +// UploadsManager defines the interface for storage drivers that allow for managing uploads +// Deprecated: No longer used. Storage drivers should implement the UploadSessionLister. +type UploadsManager interface { + ListUploads() ([]tusd.FileInfo, error) + PurgeExpiredUploads(chan<- tusd.FileInfo) error +} + +// UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions +type UploadSessionLister interface { + // ListUploadSessions returns the upload sessions matching the given filter + ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error) +} + +// UploadSession is the interface that storage drivers need to return whan listing upload sessions. +type UploadSession interface { + // ID returns the upload id + ID() string + // Filename returns the filename of the file + Filename() string + // Size returns the size of the upload + Size() int64 + // Offset returns the current offset + Offset() int64 + // Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root + Reference() provider.Reference + // Executant returns the userid of the user that created the upload + Executant() userpb.UserId + // SpaceOwner returns the owner of a space if set. optional + SpaceOwner() *userpb.UserId + // Expires returns the time when the upload can no longer be used + Expires() time.Time + + // IsProcessing returns true if postprocessing has not finished, yet + // The actual postprocessing state is tracked in the postprocessing service. + IsProcessing() bool + + // Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome + Purge(ctx context.Context) error +} + +// UploadSessionFilter can be used to filter upload sessions +type UploadSessionFilter struct { + ID *string + Processing *bool + Expired *bool +} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go index eeb153777a4..72c2b226403 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go @@ -27,6 +27,9 @@ import ( "strings" "time" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" @@ -35,7 +38,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/pkg/errors" ) // Recycle items are stored inside the node folder and start with the uuid of the deleted node. @@ -214,66 +216,111 @@ func readTrashLink(path string) (string, string, string, error) { func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) { log := appctx.GetLogger(ctx) - items := make([]*provider.RecycleItem, 0) - trashRoot := fs.getRecycleRoot(spaceID) - matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*") + + subTrees, err := filepath.Glob(trashRoot + "/*") if err != nil { return nil, err } - for _, itemPath := range matches { - nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") - continue - } + numWorkers := fs.o.MaxConcurrency + if len(subTrees) < numWorkers { + numWorkers = len(subTrees) + } - md, err := os.Stat(nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") - continue - } + work := make(chan string, len(subTrees)) + results := make(chan *provider.RecycleItem, len(subTrees)) - attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") - continue - } + g, ctx := errgroup.WithContext(ctx) - nodeType := fs.lu.TypeFromPath(ctx, nodePath) - if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") - continue - } - - item := &provider.RecycleItem{ - Type: nodeType, - Size: uint64(md.Size()), - Key: nodeID, - } - if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { - item.DeletionTime = &types.Timestamp{ - Seconds: uint64(deletionTime.Unix()), - // TODO nanos + // Distribute work + g.Go(func() error { + defer close(work) + for _, itemPath := range subTrees { + select { + case work <- itemPath: + case <-ctx.Done(): + return ctx.Err() } - } else { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") } + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for subTree := range work { + matches, err := filepath.Glob(subTree + "/*/*/*/*") + if err != nil { + return err + } + + for _, itemPath := range matches { + nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") + continue + } + + md, err := os.Stat(nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") + continue + } + + attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") + continue + } + + nodeType := fs.lu.TypeFromPath(ctx, nodePath) + if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") + continue + } + + item := &provider.RecycleItem{ + Type: nodeType, + Size: uint64(md.Size()), + Key: nodeID, + } + if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { + item.DeletionTime = &types.Timestamp{ + Seconds: uint64(deletionTime.Unix()), + // TODO nanos + } + } else { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") + } + + // lookup origin path in extended attributes + if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { + item.Ref = &provider.Reference{Path: string(attr)} + } else { + log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") + } + select { + case results <- item: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil + }) + } - // lookup origin path in extended attributes - if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { - item.Ref = &provider.Reference{Path: string(attr)} - } else { - log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path, skipping") - continue - } - // TODO filter results by permission ... on the original parent? or the trashed node? - // if it were on the original parent it would be possible to see files that were trashed before the current user got access - // so -> check the trash node itself - // hmm listing trash currently lists the current users trash or the 'root' trash. from ocs only the home storage is queried for trash items. - // for now we can only really check if the current user is the owner - items = append(items, item) + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() + + // Collect results + items := []*provider.RecycleItem{} + for ri := range results { + items = append(items, ri) } return items, nil } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go index 40cb53b0730..59ef12a4e17 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go @@ -20,10 +20,10 @@ package decomposedfs import ( "context" + "fmt" "os" "path/filepath" "regexp" - "strconv" "strings" "time" @@ -96,14 +96,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u }, Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), } - owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) + executant, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) if !ok { return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") } spaceOwner := &userpb.UserId{ OpaqueId: info.Storage["SpaceOwnerOrManager"], } - uff(spaceOwner, owner.Id, uploadRef) + uff(spaceOwner, executant.Id, uploadRef) } ri := provider.ResourceInfo{ @@ -243,36 +243,42 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } -// ListUploads returns a list of all incomplete uploads -func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { - return fs.uploadInfos(context.Background()) -} - -// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers -func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error { - infos, err := fs.uploadInfos(context.Background()) - if err != nil { - return err - } - - for _, info := range infos { - expires, err := strconv.Atoi(info.MetaData["expires"]) +// ListUploadSessions returns the upload sessions for the given filter +func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) { + var sessions []storage.UploadSession + if filter.ID != nil && *filter.ID != "" { + session, err := fs.getUploadSession(ctx, filepath.Join(fs.o.Root, "uploads", *filter.ID+".info")) + if err != nil { + return nil, err + } + sessions = []storage.UploadSession{session} + } else { + var err error + sessions, err = fs.uploadSessions(ctx) if err != nil { + return nil, err + } + } + filteredSessions := []storage.UploadSession{} + now := time.Now() + for _, session := range sessions { + if filter.Processing != nil && *filter.Processing != session.IsProcessing() { continue } - if int64(expires) < time.Now().Unix() { - purgedChan <- info - err = os.Remove(info.Storage["BinPath"]) - if err != nil { - return err - } - err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info")) - if err != nil { - return err + if filter.Expired != nil { + if *filter.Expired { + if now.Before(session.Expires()) { + continue + } + } else { + if now.After(session.Expires()) { + continue + } } } + filteredSessions = append(filteredSessions, session) } - return nil + return filteredSessions, nil } // AsTerminatableUpload returns a TerminatableUpload @@ -296,28 +302,47 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload return up.(*upload.Upload) } -func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) { - infos := []tusd.FileInfo{} +func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSession, error) { + uploads := []storage.UploadSession{} infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) if err != nil { return nil, err } for _, info := range infoFiles { - match := _idRegexp.FindStringSubmatch(info) - if match == nil || len(match) < 2 { - continue - } - up, err := fs.GetUpload(ctx, match[1]) - if err != nil { - return nil, err - } - info, err := up.GetInfo(context.Background()) + progress, err := fs.getUploadSession(ctx, info) if err != nil { - return nil, err + appctx.GetLogger(ctx).Error().Interface("path", info).Msg("Decomposedfs: could not getUploadSession") + continue } - infos = append(infos, info) + uploads = append(uploads, progress) + } + return uploads, nil +} + +func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (storage.UploadSession, error) { + match := _idRegexp.FindStringSubmatch(path) + if match == nil || len(match) < 2 { + return nil, fmt.Errorf("invalid upload path") + } + up, err := fs.GetUpload(ctx, match[1]) + if err != nil { + return nil, err + } + info, err := up.GetInfo(context.Background()) + if err != nil { + return nil, err + } + // upload processing state is stored in the node, for decomposedfs the NodeId is always set by InitiateUpload + n, err := node.ReadNode(ctx, fs.lu, info.Storage["SpaceRoot"], info.Storage["NodeId"], true, nil, true) + if err != nil { + return nil, err + } + progress := upload.Progress{ + Path: path, + Info: info, + Processing: n.IsProcessing(ctx), } - return infos, nil + return progress, nil } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/processing.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/processing.go index 462716562e4..a025c54ed75 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/processing.go @@ -21,6 +21,7 @@ package upload import ( "context" "encoding/json" + stderrors "errors" "fmt" iofs "io/fs" "os" @@ -497,3 +498,85 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look } return n, nil } + +// Progress adapts the persisted upload metadata for the UploadSessionLister interface +type Progress struct { + Path string + Info tusd.FileInfo + Processing bool +} + +// ID implements the storage.UploadSession interface +func (p Progress) ID() string { + return p.Info.ID +} + +// Filename implements the storage.UploadSession interface +func (p Progress) Filename() string { + return p.Info.MetaData["filename"] +} + +// Size implements the storage.UploadSession interface +func (p Progress) Size() int64 { + return p.Info.Size +} + +// Offset implements the storage.UploadSession interface +func (p Progress) Offset() int64 { + return p.Info.Offset +} + +// Reference implements the storage.UploadSession interface +func (p Progress) Reference() provider.Reference { + return provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: p.Info.MetaData["providerID"], + SpaceId: p.Info.Storage["SpaceRoot"], + OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in InitiateUpload + }, + } +} + +// Executant implements the storage.UploadSession interface +func (p Progress) Executant() userpb.UserId { + return userpb.UserId{ + Idp: p.Info.Storage["Idp"], + OpaqueId: p.Info.Storage["UserId"], + Type: utils.UserTypeMap(p.Info.Storage["UserType"]), + } +} + +// SpaceOwner implements the storage.UploadSession interface +func (p Progress) SpaceOwner() *userpb.UserId { + return &userpb.UserId{ + // idp and type do not seem to be consumed and the node currently only stores the user id anyway + OpaqueId: p.Info.Storage["SpaceOwnerOrManager"], + } +} + +// Expires implements the storage.UploadSession interface +func (p Progress) Expires() time.Time { + mt, _ := utils.MTimeToTime(p.Info.MetaData["expires"]) + return mt +} + +// IsProcessing implements the storage.UploadSession interface +func (p Progress) IsProcessing() bool { + return p.Processing +} + +// Purge implements the storage.UploadSession interface +func (p Progress) Purge(ctx context.Context) error { + berr := os.Remove(p.Info.Storage["BinPath"]) + if berr != nil { + appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Info.Storage["BinPath"]).Msg("Decomposedfs: could not purge bin path for upload session") + } + + // remove upload metadata + merr := os.Remove(p.Path) + if merr != nil { + appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Path).Msg("Decomposedfs: could not purge metadata path for upload session") + } + + return stderrors.Join(berr, merr) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 43e862eb8ac..d4b0671aad1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -357,7 +357,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.16.1-0.20231128104331-ea8d1336afc9 +# github.com/cs3org/reva/v2 v2.16.1-0.20231128104331-ea8d1336afc9 => github.com/butonic/reva/v2 v2.0.0-20231201103817-4db6635be3e4 ## explicit; go 1.20 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime @@ -2276,3 +2276,4 @@ stash.kopano.io/kgol/oidc-go ## explicit; go 1.13 stash.kopano.io/kgol/rndm # github.com/go-micro/plugins/v4/store/nats-js => github.com/kobergj/plugins/v4/store/nats-js v1.2.1-0.20231020092801-9463c820c19a +# github.com/cs3org/reva/v2 => github.com/butonic/reva/v2 v2.0.0-20231201103817-4db6635be3e4