From f3d8a24e1c0eb3aa82e14247227287420c2092d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 30 Nov 2023 09:05:48 +0100 Subject: [PATCH] Implemented concurrent processing for recycling sub-trees MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Christian Richter Signed-off-by: André Duffeck --- pkg/storage/utils/decomposedfs/recycle.go | 114 ++++++++++++---------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/recycle.go b/pkg/storage/utils/decomposedfs/recycle.go index 77ceb91cb3..72c2b22640 100644 --- a/pkg/storage/utils/decomposedfs/recycle.go +++ b/pkg/storage/utils/decomposedfs/recycle.go @@ -27,6 +27,9 @@ import ( "strings" "time" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" @@ -35,8 +38,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/pkg/errors" - "golang.org/x/sync/errgroup" ) // Recycle items are stored inside the node folder and start with the uuid of the deleted node. @@ -215,27 +216,27 @@ func readTrashLink(path string) (string, string, string, error) { func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) { log := appctx.GetLogger(ctx) - trashRoot := fs.getRecycleRoot(spaceID) - matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*") + + subTrees, err := filepath.Glob(trashRoot + "/*") if err != nil { return nil, err } numWorkers := fs.o.MaxConcurrency - if len(matches) < numWorkers { - numWorkers = len(matches) + if len(subTrees) < numWorkers { + numWorkers = len(subTrees) } - work := make(chan string, len(matches)) - results := make(chan *provider.RecycleItem, len(matches)) + work := make(chan string, len(subTrees)) + results := make(chan *provider.RecycleItem, len(subTrees)) g, ctx := errgroup.WithContext(ctx) // Distribute work g.Go(func() error { defer close(work) - for _, itemPath := range matches { + for _, itemPath := range subTrees { select { case work <- itemPath: case <-ctx.Done(): @@ -248,55 +249,62 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p // Spawn workers that'll concurrently work the queue for i := 0; i < numWorkers; i++ { g.Go(func() error { - for itemPath := range work { - nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) + for subTree := range work { + matches, err := filepath.Glob(subTree + "/*/*/*/*") if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") - continue + return err } - md, err := os.Stat(nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") - continue - } + for _, itemPath := range matches { + nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") + continue + } - attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") - continue - } + md, err := os.Stat(nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") + continue + } - nodeType := fs.lu.TypeFromPath(ctx, nodePath) - if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") - continue - } + attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") + continue + } - item := &provider.RecycleItem{ - Type: nodeType, - Size: uint64(md.Size()), - Key: nodeID, - } - if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { - item.DeletionTime = &types.Timestamp{ - Seconds: uint64(deletionTime.Unix()), - // TODO nanos + nodeType := fs.lu.TypeFromPath(ctx, nodePath) + if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") + continue } - } else { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") - } - // lookup origin path in extended attributes - if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { - item.Ref = &provider.Reference{Path: string(attr)} - } else { - log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") - } - select { - case results <- item: - case <-ctx.Done(): - return ctx.Err() + item := &provider.RecycleItem{ + Type: nodeType, + Size: uint64(md.Size()), + Key: nodeID, + } + if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { + item.DeletionTime = &types.Timestamp{ + Seconds: uint64(deletionTime.Unix()), + // TODO nanos + } + } else { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") + } + + // lookup origin path in extended attributes + if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { + item.Ref = &provider.Reference{Path: string(attr)} + } else { + log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") + } + select { + case results <- item: + case <-ctx.Done(): + return ctx.Err() + } } } return nil @@ -310,11 +318,9 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p }() // Collect results - items := make([]*provider.RecycleItem, len(matches)) - i := 0 + items := []*provider.RecycleItem{} for ri := range results { - items[i] = ri - i++ + items = append(items, ri) } return items, nil }