diff --git a/changelog/unreleased/improve-revisions-purge.md b/changelog/unreleased/improve-revisions-purge.md new file mode 100644 index 00000000000..ac4a5055e69 --- /dev/null +++ b/changelog/unreleased/improve-revisions-purge.md @@ -0,0 +1,5 @@ +Enhancement: Improve revisions purge + +The `revisions purge` command would time out on big spaces. We have improved performance by parallelizing the process. + +https://github.com/owncloud/ocis/pull/9891 diff --git a/ocis/pkg/command/revisions.go b/ocis/pkg/command/revisions.go index 174a9df3a6a..569b7023826 100644 --- a/ocis/pkg/command/revisions.go +++ b/ocis/pkg/command/revisions.go @@ -5,6 +5,7 @@ import ( "fmt" "path/filepath" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ocisbs "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore" "github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup" s3bs "github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/blobstore" @@ -19,7 +20,7 @@ import ( var ( // _nodesGlobPattern is the glob pattern to find all nodes - _nodesGlobPattern = "spaces/*/*/*/*/*/*/*/*" + _nodesGlobPattern = "spaces/*/*/nodes/" ) // RevisionsCommand is the entrypoint for the revisions command. @@ -30,7 +31,7 @@ func RevisionsCommand(cfg *config.Config) *cli.Command { Subcommands: []*cli.Command{ PurgeRevisionsCommand(cfg), }, - Before: func(c *cli.Context) error { + Before: func(_ *cli.Context) error { return configlog.ReturnError(parser.ParseConfig(cfg, true)) }, Action: func(_ *cli.Context) error { @@ -74,6 +75,11 @@ func PurgeRevisionsCommand(cfg *config.Config) *cli.Command { Aliases: []string{"r"}, Usage: "purge all revisions of this file/space. If not set, all revisions will be purged", }, + &cli.StringFlag{ + Name: "glob-mechanism", + Usage: "the glob mechanism to find all nodes. Can be 'glob', 'list' or 'workers'. 'glob' uses globbing with a single worker. 'workers' spawns multiple go routines, accelatering the command drastically but causing high cpu and ram usage. 'list' looks for references by listing directories with multiple workers. Default is 'glob'", + Value: "glob", + }, }, Action: func(c *cli.Context) error { basePath := c.String("basepath") @@ -108,43 +114,72 @@ func PurgeRevisionsCommand(cfg *config.Config) *cli.Command { return err } - p, err := generatePath(basePath, c.String("resource-id")) - if err != nil { - fmt.Printf("❌ Error parsing resourceID: %s", err) - return err + var rid *provider.ResourceId + resid, err := storagespace.ParseID(c.String("resource-id")) + if err == nil { + rid = &resid } - if err := revisions.PurgeRevisions(p, bs, c.Bool("dry-run"), c.Bool("verbose")); err != nil { - fmt.Printf("❌ Error purging revisions: %s", err) - return err + mechanism := c.String("glob-mechanism") + if rid.GetOpaqueId() != "" { + mechanism = "glob" } + var ch <-chan string + switch mechanism { + default: + fallthrough + case "glob": + p := generatePath(basePath, rid) + if rid.GetOpaqueId() == "" { + p = filepath.Join(p, "*/*/*/*/*") + } + ch = revisions.Glob(p) + case "workers": + p := generatePath(basePath, rid) + ch = revisions.GlobWorkers(p, "/*", "/*/*/*/*") + case "list": + p := filepath.Join(basePath, "spaces") + if rid != nil { + p = generatePath(basePath, rid) + } + ch = revisions.List(p, 10) + } + + files, blobs, revisions := revisions.PurgeRevisions(ch, bs, c.Bool("dry-run"), c.Bool("verbose")) + printResults(files, blobs, revisions, c.Bool("dry-run")) return nil }, } } -func generatePath(basePath string, resourceID string) (string, error) { - if resourceID == "" { - return filepath.Join(basePath, _nodesGlobPattern), nil +func printResults(countFiles, countBlobs, countRevisions int, dryRun bool) { + switch { + case countFiles == 0 && countRevisions == 0 && countBlobs == 0: + fmt.Println("❎ No revisions found. Storage provider is clean.") + case !dryRun: + fmt.Printf("✅ Deleted %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs) + default: + fmt.Printf("👉 Would delete %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs) } +} - rid, err := storagespace.ParseID(resourceID) - if err != nil { - return "", err +func generatePath(basePath string, rid *provider.ResourceId) string { + if rid == nil { + return filepath.Join(basePath, _nodesGlobPattern) } sid := lookup.Pathify(rid.GetSpaceId(), 1, 2) if sid == "" { - sid = "*/*" + return "" } nid := lookup.Pathify(rid.GetOpaqueId(), 4, 2) if nid == "" { - nid = "*/*/*/*/" + return filepath.Join(basePath, "spaces", sid, "nodes") } - return filepath.Join(basePath, "spaces", sid, "nodes", nid+"*"), nil + return filepath.Join(basePath, "spaces", sid, "nodes", nid+"*") } func init() { diff --git a/ocis/pkg/revisions/revisions.go b/ocis/pkg/revisions/revisions.go index d2d1eac686b..bc4dd00a3d0 100644 --- a/ocis/pkg/revisions/revisions.go +++ b/ocis/pkg/revisions/revisions.go @@ -7,6 +7,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/shamaton/msgpack/v2" @@ -25,16 +26,12 @@ type DelBlobstore interface { Delete(node *node.Node) error } -// PurgeRevisionsGlob removes all revisions from a storage provider using globbing. -func PurgeRevisionsGlob(pattern string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) { - if verbose { - fmt.Println("Looking for nodes in", pattern) - } - +// Glob uses globbing to find all revision nodes in a storage provider. +func Glob(pattern string) <-chan string { ch := make(chan string) go func() { defer close(ch) - nodes, err := filepath.Glob(pattern) + nodes, err := filepath.Glob(filepath.Join(pattern)) if err != nil { fmt.Println("error globbing", pattern, err) return @@ -52,11 +49,51 @@ func PurgeRevisionsGlob(pattern string, bs DelBlobstore, dryRun bool, verbose bo } }() - return purgeRevisions(ch, bs, dryRun, verbose) + return ch } -// PurgeRevisionsWalk removes all revisions from a storage provider using walking. -func PurgeRevisionsWalk(base string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) { +// GlobWorkers uses multiple go routine to glob all revision nodes in a storage provider. +func GlobWorkers(pattern string, depth string, remainder string) <-chan string { + wg := sync.WaitGroup{} + ch := make(chan string) + go func() { + defer close(ch) + nodes, err := filepath.Glob(pattern + depth) + if err != nil { + fmt.Println("error globbing", pattern, err) + return + } + + if len(nodes) == 0 { + fmt.Println("no nodes found. Double check storage path") + return + } + + for _, node := range nodes { + wg.Add(1) + go func(node string) { + defer wg.Done() + nodes, err := filepath.Glob(node + remainder) + if err != nil { + fmt.Println("error globbing", node, err) + return + } + for _, n := range nodes { + if _versionRegex.MatchString(n) { + ch <- n + } + } + }(node) + } + + wg.Wait() + }() + + return ch +} + +// Walk walks the storage provider to find all revision nodes. +func Walk(base string) <-chan string { ch := make(chan string) go func() { defer close(ch) @@ -79,58 +116,25 @@ func PurgeRevisionsWalk(base string, bs DelBlobstore, dryRun bool, verbose bool) } }() - return purgeRevisions(ch, bs, dryRun, verbose) + return ch } -// PurgeRevisionsList removes all revisions from a storage provider using listing. -func PurgeRevisionsList(base string, bs DelBlobstore, dryRun bool, verbose bool) (int, int, int) { +// List uses directory listing to find all revision nodes in a storage provider. +func List(base string, workers int) <-chan string { ch := make(chan string) go func() { defer close(ch) - if err := listFolder(base, ch); err != nil { + if err := listFolder(base, ch, make(chan struct{}, workers)); err != nil { fmt.Println("error listing", base, err) return } }() - return purgeRevisions(ch, bs, dryRun, verbose) + return ch } -func listFolder(path string, ch chan<- string) error { - children, err := os.ReadDir(path) - if err != nil { - return err - } - - for _, child := range children { - if child.IsDir() { - if err := listFolder(filepath.Join(path, child.Name()), ch); err != nil { - return err - } - } - - if _versionRegex.MatchString(child.Name()) { - ch <- filepath.Join(path, child.Name()) - } - - } - return nil -} - -// PrintResults prints the results -func PrintResults(countFiles, countBlobs, countRevisions int, dryRun bool) error { - switch { - case countFiles == 0 && countRevisions == 0 && countBlobs == 0: - fmt.Println("❎ No revisions found. Storage provider is clean.") - case !dryRun: - fmt.Printf("✅ Deleted %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs) - default: - fmt.Printf("👉 Would delete %d revisions (%d files / %d blobs)\n", countRevisions, countFiles, countBlobs) - } - return nil -} - -func purgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool) (int, int, int) { +// PurgeRevisions removes all revisions from a storage provider. +func PurgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool) (int, int, int) { countFiles := 0 countBlobs := 0 countRevisions := 0 @@ -201,6 +205,37 @@ func purgeRevisions(nodes <-chan string, bs DelBlobstore, dryRun, verbose bool) return countFiles, countBlobs, countRevisions } +func listFolder(path string, ch chan<- string, workers chan struct{}) error { + workers <- struct{}{} + wg := sync.WaitGroup{} + + children, err := os.ReadDir(path) + if err != nil { + <-workers + return err + } + + for _, child := range children { + if child.IsDir() { + wg.Add(1) + go func() { + defer wg.Done() + if err := listFolder(filepath.Join(path, child.Name()), ch, workers); err != nil { + fmt.Println("error listing", path, err) + } + }() + } + + if _versionRegex.MatchString(child.Name()) { + ch <- filepath.Join(path, child.Name()) + } + + } + <-workers + wg.Wait() + return nil +} + func getBlobID(path string) (string, error) { b, err := os.ReadFile(path) if err != nil { diff --git a/ocis/pkg/revisions/revisions_test.go b/ocis/pkg/revisions/revisions_test.go index 477ccbebd90..6f43cf737ea 100644 --- a/ocis/pkg/revisions/revisions_test.go +++ b/ocis/pkg/revisions/revisions_test.go @@ -14,7 +14,7 @@ import ( ) var ( - _basePath = "test_temp/spaces/8f/638374-6ea8-4f0d-80c4-66d9b49830a5/nodes/" + _basePath = "/spaces/8f/638374-6ea8-4f0d-80c4-66d9b49830a5/nodes/" ) // func TestInit(t *testing.T) { @@ -22,117 +22,147 @@ var ( // defer os.RemoveAll("test_temp") // } -func TestGlob30(t *testing.T) { testGlob(t, 10, 2) } -func TestGlob80(t *testing.T) { testGlob(t, 20, 3) } -func TestGlob250(t *testing.T) { testGlob(t, 50, 4) } -func TestGlob600(t *testing.T) { testGlob(t, 100, 5) } - -func TestWalk30(t *testing.T) { testWalk(t, 10, 2) } -func TestWalk80(t *testing.T) { testWalk(t, 20, 3) } -func TestWalk250(t *testing.T) { testWalk(t, 50, 4) } -func TestWalk600(t *testing.T) { testWalk(t, 100, 5) } - -func TestList30(t *testing.T) { testList(t, 10, 2) } -func TestList80(t *testing.T) { testList(t, 20, 3) } -func TestList250(t *testing.T) { testList(t, 50, 4) } -func TestList600(t *testing.T) { testList(t, 100, 5) } - -func BenchmarkGlob30(b *testing.B) { benchmarkGlob(b, 10, 2) } -func BenchmarkWalk30(b *testing.B) { benchmarkWalk(b, 10, 2) } -func BenchmarkList30(b *testing.B) { benchmarkList(b, 10, 2) } - -func BenchmarkGlob80(b *testing.B) { benchmarkGlob(b, 20, 3) } -func BenchmarkWalk80(b *testing.B) { benchmarkWalk(b, 20, 3) } -func BenchmarkList80(b *testing.B) { benchmarkList(b, 20, 3) } - -func BenchmarkGlob250(b *testing.B) { benchmarkGlob(b, 50, 4) } -func BenchmarkWalk250(b *testing.B) { benchmarkWalk(b, 50, 4) } -func BenchmarkList250(b *testing.B) { benchmarkList(b, 50, 4) } - -func BenchmarkGlob600(b *testing.B) { benchmarkGlob(b, 100, 5) } -func BenchmarkWalk600(b *testing.B) { benchmarkWalk(b, 100, 5) } -func BenchmarkList600(b *testing.B) { benchmarkList(b, 100, 5) } - -func BenchmarkGlob11000(b *testing.B) { benchmarkGlob(b, 1000, 10) } -func BenchmarkWalk11000(b *testing.B) { benchmarkWalk(b, 1000, 10) } -func BenchmarkList11000(b *testing.B) { benchmarkList(b, 1000, 10) } - -func BenchmarkGlob110000(b *testing.B) { benchmarkGlob(b, 10000, 10) } -func BenchmarkWalk110000(b *testing.B) { benchmarkWalk(b, 10000, 10) } -func BenchmarkList110000(b *testing.B) { benchmarkList(b, 10000, 10) } - -func benchmarkGlob(b *testing.B, numNodes int, numRevisions int) { - initialize(numNodes, numRevisions) - defer os.RemoveAll("test_temp") +func TestGlob30(t *testing.T) { test(t, 10, 2, glob) } +func TestGlob80(t *testing.T) { test(t, 20, 3, glob) } +func TestGlob250(t *testing.T) { test(t, 50, 4, glob) } +func TestGlob600(t *testing.T) { test(t, 100, 5, glob) } + +func TestWalk30(t *testing.T) { test(t, 10, 2, walk) } +func TestWalk80(t *testing.T) { test(t, 20, 3, walk) } +func TestWalk250(t *testing.T) { test(t, 50, 4, walk) } +func TestWalk600(t *testing.T) { test(t, 100, 5, walk) } + +func TestList30(t *testing.T) { test(t, 10, 2, list2) } +func TestList80(t *testing.T) { test(t, 20, 3, list10) } +func TestList250(t *testing.T) { test(t, 50, 4, list20) } +func TestList600(t *testing.T) { test(t, 100, 5, list2) } + +func TestGlobWorkers30(t *testing.T) { test(t, 10, 2, globWorkersD1) } +func TestGlobWorkers80(t *testing.T) { test(t, 20, 3, globWorkersD2) } +func TestGlobWorkers250(t *testing.T) { test(t, 50, 4, globWorkersD4) } +func TestGlobWorkers600(t *testing.T) { test(t, 100, 5, globWorkersD2) } + +func BenchmarkGlob30(b *testing.B) { benchmark(b, 10, 2, glob) } +func BenchmarkWalk30(b *testing.B) { benchmark(b, 10, 2, walk) } +func BenchmarkList30(b *testing.B) { benchmark(b, 10, 2, list2) } +func BenchmarkGlobWorkers30(b *testing.B) { benchmark(b, 10, 2, globWorkersD2) } + +func BenchmarkGlob80(b *testing.B) { benchmark(b, 20, 3, glob) } +func BenchmarkWalk80(b *testing.B) { benchmark(b, 20, 3, walk) } +func BenchmarkList80(b *testing.B) { benchmark(b, 20, 3, list2) } +func BenchmarkGlobWorkers80(b *testing.B) { benchmark(b, 20, 3, globWorkersD2) } + +func BenchmarkGlob250(b *testing.B) { benchmark(b, 50, 4, glob) } +func BenchmarkWalk250(b *testing.B) { benchmark(b, 50, 4, walk) } +func BenchmarkList250(b *testing.B) { benchmark(b, 50, 4, list2) } +func BenchmarkGlobWorkers250(b *testing.B) { benchmark(b, 50, 4, globWorkersD2) } + +func BenchmarkGlobAT600(b *testing.B) { benchmark(b, 100, 5, glob) } +func BenchmarkWalkAT600(b *testing.B) { benchmark(b, 100, 5, walk) } +func BenchmarkList2AT600(b *testing.B) { benchmark(b, 100, 5, list2) } +func BenchmarkList10AT600(b *testing.B) { benchmark(b, 100, 5, list10) } +func BenchmarkList20AT600(b *testing.B) { benchmark(b, 100, 5, list20) } +func BenchmarkGlobWorkersD1AT600(b *testing.B) { benchmark(b, 100, 5, globWorkersD1) } +func BenchmarkGlobWorkersD2AT600(b *testing.B) { benchmark(b, 100, 5, globWorkersD2) } +func BenchmarkGlobWorkersD4AT600(b *testing.B) { benchmark(b, 100, 5, globWorkersD4) } + +func BenchmarkGlobAT22000(b *testing.B) { benchmark(b, 2000, 10, glob) } +func BenchmarkWalkAT22000(b *testing.B) { benchmark(b, 2000, 10, walk) } +func BenchmarkList2AT22000(b *testing.B) { benchmark(b, 2000, 10, list2) } +func BenchmarkList10AT22000(b *testing.B) { benchmark(b, 2000, 10, list10) } +func BenchmarkList20AT22000(b *testing.B) { benchmark(b, 2000, 10, list20) } +func BenchmarkGlobWorkersD1AT22000(b *testing.B) { benchmark(b, 2000, 10, globWorkersD1) } +func BenchmarkGlobWorkersD2AT22000(b *testing.B) { benchmark(b, 2000, 10, globWorkersD2) } +func BenchmarkGlobWorkersD4AT22000(b *testing.B) { benchmark(b, 2000, 10, globWorkersD4) } + +func BenchmarkGlob110000(b *testing.B) { benchmark(b, 10000, 10, glob) } +func BenchmarkWalk110000(b *testing.B) { benchmark(b, 10000, 10, walk) } +func BenchmarkList110000(b *testing.B) { benchmark(b, 10000, 10, list2) } +func BenchmarkGlobWorkers110000(b *testing.B) { benchmark(b, 10000, 10, globWorkersD2) } + +func benchmark(b *testing.B, numNodes int, numRevisions int, f func(string) <-chan string) { + base := initialize(numNodes, numRevisions) + defer os.RemoveAll(base) for i := 0; i < b.N; i++ { - PurgeRevisionsGlob(_basePath+"*/*/*/*/*", nil, false, false) + ch := f(base) + PurgeRevisions(ch, nil, false, false) } } -func benchmarkWalk(b *testing.B, numNodes int, numRevisions int) { - initialize(numNodes, numRevisions) - defer os.RemoveAll("test_temp") +func test(t *testing.T, numNodes int, numRevisions int, f func(string) <-chan string) { + base := initialize(numNodes, numRevisions) + defer os.RemoveAll(base) - for i := 0; i < b.N; i++ { - PurgeRevisionsWalk(_basePath, nil, false, false) - } + ch := f(base) + _, _, revisions := PurgeRevisions(ch, nil, false, false) + require.Equal(t, numNodes*numRevisions, revisions, "Deleted Revisions") } -func benchmarkList(b *testing.B, numNodes int, numRevisions int) { - initialize(numNodes, numRevisions) - defer os.RemoveAll("test_temp") +func glob(base string) <-chan string { + return Glob(base + _basePath + "*/*/*/*/*") +} - for i := 0; i < b.N; i++ { - PurgeRevisionsList(_basePath, nil, false, false) - } +func walk(base string) <-chan string { + return Walk(base + _basePath) } -func testGlob(t *testing.T, numNodes int, numRevisions int) { - initialize(numNodes, numRevisions) - defer os.RemoveAll("test_temp") +func list2(base string) <-chan string { + return List(base+_basePath, 2) +} - _, _, revisions := PurgeRevisionsGlob(_basePath+"*/*/*/*/*", nil, false, false) - require.Equal(t, numNodes*numRevisions, revisions, "Deleted Revisions") +func list10(base string) <-chan string { + return List(base+_basePath, 10) } -func testWalk(t *testing.T, numNodes int, numRevisions int) { - initialize(numNodes, numRevisions) - defer os.RemoveAll("test_temp") +func list20(base string) <-chan string { + return List(base+_basePath, 20) +} - _, _, revisions := PurgeRevisionsWalk(_basePath, nil, false, false) - require.Equal(t, numNodes*numRevisions, revisions, "Deleted Revisions") +func globWorkersD1(base string) <-chan string { + return GlobWorkers(base+_basePath, "*", "/*/*/*/*") } -func testList(t *testing.T, numNodes int, numRevisions int) { - initialize(numNodes, numRevisions) - defer os.RemoveAll("test_temp") +func globWorkersD2(base string) <-chan string { + return GlobWorkers(base+_basePath, "*/*", "/*/*/*") +} - _, _, revisions := PurgeRevisionsList(_basePath, nil, false, false) - require.Equal(t, numNodes*numRevisions, revisions, "Deleted Revisions") +func globWorkersD4(base string) <-chan string { + return GlobWorkers(base+_basePath, "*/*/*/*", "/*") } -func initialize(numNodes int, numRevisions int) { - // create base path - if err := os.MkdirAll(_basePath, fs.ModePerm); err != nil { +func initialize(numNodes int, numRevisions int) string { + base := "test_temp_" + uuid.New().String() + if err := os.Mkdir(base, os.ModePerm); err != nil { fmt.Println("Error creating test_temp directory", err) + os.RemoveAll(base) + os.Exit(1) + } + + // create base path + if err := os.MkdirAll(base+_basePath, fs.ModePerm); err != nil { + fmt.Println("Error creating base path", err) + os.RemoveAll(base) os.Exit(1) } for i := 0; i < numNodes; i++ { path := lookup.Pathify(uuid.New().String(), 4, 2) dir := filepath.Dir(path) - if err := os.MkdirAll(_basePath+dir, fs.ModePerm); err != nil { + if err := os.MkdirAll(base+_basePath+dir, fs.ModePerm); err != nil { fmt.Println("Error creating test_temp directory", err) + os.RemoveAll(base) os.Exit(1) } - if _, err := os.Create(_basePath + path); err != nil { + if _, err := os.Create(base + _basePath + path); err != nil { fmt.Println("Error creating file", err) + os.RemoveAll(base) os.Exit(1) } for i := 0; i < numRevisions; i++ { - os.Create(_basePath + path + ".REV.2024-05-22T07:32:53.89969" + strconv.Itoa(i) + "Z") + os.Create(base + _basePath + path + ".REV.2024-05-22T07:32:53.89969" + strconv.Itoa(i) + "Z") } } + return base }