Skip to content

Commit

Permalink
Implemented concurrent processing for recycling sub-trees
Browse files Browse the repository at this point in the history
Co-authored-by: Christian Richter <crichter@owncloud.com>

Signed-off-by: André Duffeck <andre.duffeck@firondu.de>
  • Loading branch information
aduffeck authored and dragonchaser committed Nov 30, 2023
1 parent f18ddee commit f3d8a24
Showing 1 changed file with 60 additions and 54 deletions.
114 changes: 60 additions & 54 deletions pkg/storage/utils/decomposedfs/recycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
Expand All @@ -35,8 +38,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

// Recycle items are stored inside the node folder and start with the uuid of the deleted node.
Expand Down Expand Up @@ -215,27 +216,27 @@ func readTrashLink(path string) (string, string, string, error) {

func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) {
log := appctx.GetLogger(ctx)

trashRoot := fs.getRecycleRoot(spaceID)
matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*")

subTrees, err := filepath.Glob(trashRoot + "/*")
if err != nil {
return nil, err
}

numWorkers := fs.o.MaxConcurrency
if len(matches) < numWorkers {
numWorkers = len(matches)
if len(subTrees) < numWorkers {
numWorkers = len(subTrees)
}

work := make(chan string, len(matches))
results := make(chan *provider.RecycleItem, len(matches))
work := make(chan string, len(subTrees))
results := make(chan *provider.RecycleItem, len(subTrees))

g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for _, itemPath := range matches {
for _, itemPath := range subTrees {
select {
case work <- itemPath:
case <-ctx.Done():
Expand All @@ -248,55 +249,62 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p
// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for itemPath := range work {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
for subTree := range work {
matches, err := filepath.Glob(subTree + "/*/*/*/*")
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
return err
}

md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}
for _, itemPath := range matches {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}

attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}
md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}

nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}
attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}

item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}

// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path")
}
select {
case results <- item:
case <-ctx.Done():
return ctx.Err()
item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}

// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path")
}
select {
case results <- item:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
Expand All @@ -310,11 +318,9 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p
}()

// Collect results
items := make([]*provider.RecycleItem, len(matches))
i := 0
items := []*provider.RecycleItem{}
for ri := range results {
items[i] = ri
i++
items = append(items, ri)
}
return items, nil
}
Expand Down

0 comments on commit f3d8a24

Please sign in to comment.