Skip to content

Commit

Permalink
feat(ocis): benchmark revision listing possiblities
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj committed Aug 22, 2024
1 parent 585bd82 commit a4ac290
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 145 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/improve-revisions-purge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Improve revisions purge

The `revisions purge` command would time out on big spaces. We have improved performance by parallelizing the process.

https://github.com/owncloud/ocis/pull/9891
71 changes: 53 additions & 18 deletions ocis/pkg/command/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ocisbs "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
s3bs "github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/blobstore"
Expand All @@ -19,7 +20,7 @@ import (

var (
// _nodesGlobPattern is the glob pattern to find all nodes
_nodesGlobPattern = "spaces/*/*/*/*/*/*/*/*"
_nodesGlobPattern = "spaces/*/*/nodes/"
)

// RevisionsCommand is the entrypoint for the revisions command.
Expand All @@ -30,7 +31,7 @@ func RevisionsCommand(cfg *config.Config) *cli.Command {
Subcommands: []*cli.Command{
PurgeRevisionsCommand(cfg),
},
Before: func(c *cli.Context) error {
Before: func(_ *cli.Context) error {
return configlog.ReturnError(parser.ParseConfig(cfg, true))
},
Action: func(_ *cli.Context) error {
Expand Down Expand Up @@ -74,6 +75,11 @@ func PurgeRevisionsCommand(cfg *config.Config) *cli.Command {
Aliases: []string{"r"},
Usage: "purge all revisions of this file/space. If not set, all revisions will be purged",
},
&cli.StringFlag{
Name: "glob-mechanism",
Usage: "the glob mechanism to find all nodes. Can be 'glob', 'list' or 'workers'. 'glob' uses globbing with a single worker. 'workers' spawns multiple go routines, accelatering the command drastically but causing high cpu and ram usage. 'list' looks for references by listing directories with multiple workers. Default is 'glob'",
Value: "glob",
},
},
Action: func(c *cli.Context) error {
basePath := c.String("basepath")
Expand Down Expand Up @@ -108,43 +114,72 @@ func PurgeRevisionsCommand(cfg *config.Config) *cli.Command {
return err
}

p, err := generatePath(basePath, c.String("resource-id"))
if err != nil {
fmt.Printf("❌ Error parsing resourceID: %s", err)
return err
var rid *provider.ResourceId
resid, err := storagespace.ParseID(c.String("resource-id"))
if err == nil {
rid = &resid
}

if err := revisions.PurgeRevisions(p, bs, c.Bool("dry-run"), c.Bool("verbose")); err != nil {
fmt.Printf("❌ Error purging revisions: %s", err)
return err
mechanism := c.String("glob-mechanism")
if rid.GetOpaqueId() != "" {
mechanism = "glob"
}

var ch <-chan string
switch mechanism {
default:
fallthrough
case "glob":
p := generatePath(basePath, rid)
if rid.GetOpaqueId() == "" {
p = filepath.Join(p, "*/*/*/*/*")
}
ch = revisions.Glob(p)
case "workers":
p := generatePath(basePath, rid)
ch = revisions.GlobWorkers(p, "/*", "/*/*/*/*")
case "list":
p := filepath.Join(basePath, "spaces")
if rid != nil {
p = generatePath(basePath, rid)
}
ch = revisions.List(p, 10)
}

files, blobs, revisions := revisions.PurgeRevisions(ch, bs, c.Bool("dry-run"), c.Bool("verbose"))
printResults(files, blobs, revisions, c.Bool("dry-run"))
return nil
},
}
}

func generatePath(basePath string, resourceID string) (string, error) {
if resourceID == "" {
return filepath.Join(basePath, _nodesGlobPattern), nil
func printResults(countFiles, countBlobs, countRevisions int, dryRun bool) {
switch {
case countFiles == 0 && countRevisions == 0 && countBlobs == 0:
fmt.Println("❎ No revisions found. Storage provider is clean.")
case !dryRun:
fmt.Printf("✅ Deleted %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs)
default:
fmt.Printf("👉 Would delete %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs)
}
}

rid, err := storagespace.ParseID(resourceID)
if err != nil {
return "", err
func generatePath(basePath string, rid *provider.ResourceId) string {
if rid == nil {
return filepath.Join(basePath, _nodesGlobPattern)
}

sid := lookup.Pathify(rid.GetSpaceId(), 1, 2)
if sid == "" {
sid = "*/*"
return ""
}

nid := lookup.Pathify(rid.GetOpaqueId(), 4, 2)
if nid == "" {
nid = "*/*/*/*/"
return filepath.Join(basePath, "spaces", sid, "nodes")
}

return filepath.Join(basePath, "spaces", sid, "nodes", nid+"*"), nil
return filepath.Join(basePath, "spaces", sid, "nodes", nid+"*")
}

func init() {
Expand Down
135 changes: 85 additions & 50 deletions ocis/pkg/revisions/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"

"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/shamaton/msgpack/v2"
Expand All @@ -25,16 +26,12 @@ type DelBlobstore interface {
Delete(node *node.Node) error
}

// PurgeRevisionsGlob removes all revisions from a storage provider using globbing.
func PurgeRevisionsGlob(pattern string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) {
if verbose {
fmt.Println("Looking for nodes in", pattern)
}

// Glob uses globbing to find all revision nodes in a storage provider.
func Glob(pattern string) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
nodes, err := filepath.Glob(pattern)
nodes, err := filepath.Glob(filepath.Join(pattern))
if err != nil {
fmt.Println("error globbing", pattern, err)
return
Expand All @@ -52,11 +49,51 @@ func PurgeRevisionsGlob(pattern string, bs DelBlobstore, dryRun bool, verbose bo
}
}()

return purgeRevisions(ch, bs, dryRun, verbose)
return ch
}

// PurgeRevisionsWalk removes all revisions from a storage provider using walking.
func PurgeRevisionsWalk(base string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) {
// GlobWorkers uses multiple go routine to glob all revision nodes in a storage provider.
func GlobWorkers(pattern string, depth string, remainder string) <-chan string {
wg := sync.WaitGroup{}
ch := make(chan string)
go func() {
defer close(ch)
nodes, err := filepath.Glob(pattern + depth)
if err != nil {
fmt.Println("error globbing", pattern, err)
return
}

if len(nodes) == 0 {
fmt.Println("no nodes found. Double check storage path")
return
}

for _, node := range nodes {
wg.Add(1)
go func(node string) {
defer wg.Done()
nodes, err := filepath.Glob(node + remainder)
if err != nil {
fmt.Println("error globbing", node, err)
return
}
for _, n := range nodes {
if _versionRegex.MatchString(n) {
ch <- n
}
}
}(node)
}

wg.Wait()
}()

return ch
}

// Walk walks the storage provider to find all revision nodes.
func Walk(base string) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
Expand All @@ -79,58 +116,25 @@ func PurgeRevisionsWalk(base string, bs DelBlobstore, dryRun bool, verbose bool)
}

}()
return purgeRevisions(ch, bs, dryRun, verbose)
return ch
}

// PurgeRevisionsList removes all revisions from a storage provider using listing.
func PurgeRevisionsList(base string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) {
// List uses directory listing to find all revision nodes in a storage provider.
func List(base string, workers int) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
if err := listFolder(base, ch); err != nil {
if err := listFolder(base, ch, make(chan struct{}, workers)); err != nil {
fmt.Println("error listing", base, err)
return
}
}()

return purgeRevisions(ch, bs, dryRun, verbose)
return ch
}

func listFolder(path string, ch chan<- string) error {
children, err := os.ReadDir(path)
if err != nil {
return err
}

for _, child := range children {
if child.IsDir() {
if err := listFolder(filepath.Join(path, child.Name()), ch); err != nil {
return err
}
}

if _versionRegex.MatchString(child.Name()) {
ch <- filepath.Join(path, child.Name())
}

}
return nil
}

// PrintResults prints the results
func PrintResults(countFiles, countBlobs, countRevisions int, dryRun bool) error {
switch {
case countFiles == 0 && countRevisions == 0 && countBlobs == 0:
fmt.Println("❎ No revisions found. Storage provider is clean.")
case !dryRun:
fmt.Printf("✅ Deleted %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs)
default:
fmt.Printf("👉 Would delete %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs)
}
return nil
}

func purgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool) (int, int, int) {
// PurgeRevisions removes all revisions from a storage provider.
func PurgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool) (int, int, int) {
countFiles := 0
countBlobs := 0
countRevisions := 0
Expand Down Expand Up @@ -201,6 +205,37 @@ func purgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool)
return countFiles, countBlobs, countRevisions
}

func listFolder(path string, ch chan<- string, workers chan struct{}) error {
workers <- struct{}{}
wg := sync.WaitGroup{}

children, err := os.ReadDir(path)
if err != nil {
<-workers
return err
}

for _, child := range children {
if child.IsDir() {
wg.Add(1)
go func() {
defer wg.Done()
if err := listFolder(filepath.Join(path, child.Name()), ch, workers); err != nil {
fmt.Println("error listing", path, err)
}
}()
}

if _versionRegex.MatchString(child.Name()) {
ch <- filepath.Join(path, child.Name())
}

}
<-workers
wg.Wait()
return nil
}

func getBlobID(path string) (string, error) {
b, err := os.ReadFile(path)
if err != nil {
Expand Down
Loading

0 comments on commit a4ac290

Please sign in to comment.