Skip to content

Commit

Permalink
Merge pull request #4083 from aduffeck/rollback-migrations
Browse files Browse the repository at this point in the history
Rollback migrations
  • Loading branch information
aduffeck authored Jul 31, 2023
2 parents c444e4c + e0f456c commit db79e9b
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 96 deletions.
7 changes: 7 additions & 0 deletions changelog/unreleased/decomposedfs-rollback-migrations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Enhancement: Allow for rolling back migrations

The decomposedfs now supports rolling back migrations (starting with 0004). It
also got a Migrations() method which returns the list of migrations incl. their
states.

https://github.com/cs3org/reva/pull/4083
Original file line number Diff line number Diff line change
Expand Up @@ -29,44 +29,55 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
)

// Migration0001 creates the spaces directory structure
func (m *Migrator) Migration0001() (Result, error) {
m.log.Info().Msg("Migrating spaces directory structure...")
func init() {
registerMigration("0001", Migration0001{})
}

type Migration0001 struct{}

// Migrate creates the spaces directory structure
func (m Migration0001) Migrate(migrator *Migrator) (Result, error) {
migrator.log.Info().Msg("Migrating spaces directory structure...")

// create spaces folder and iterate over existing nodes to populate it
nodesPath := filepath.Join(m.lu.InternalRoot(), "nodes")
nodesPath := filepath.Join(migrator.lu.InternalRoot(), "nodes")
fi, err := os.Stat(nodesPath)
if err == nil && fi.IsDir() {
f, err := os.Open(nodesPath)
if err != nil {
return resultFailed, err
return stateFailed, err
}
nodes, err := f.Readdir(0)
if err != nil {
return resultFailed, err
return stateFailed, err
}

for _, n := range nodes {
nodePath := filepath.Join(nodesPath, n.Name())

attr, err := m.lu.MetadataBackend().Get(context.Background(), nodePath, prefixes.ParentidAttr)
attr, err := migrator.lu.MetadataBackend().Get(context.Background(), nodePath, prefixes.ParentidAttr)
if err == nil && string(attr) == node.RootID {
if err := m.moveNode(n.Name(), n.Name()); err != nil {
m.log.Error().Err(err).
if err := m.moveNode(migrator, n.Name(), n.Name()); err != nil {
migrator.log.Error().Err(err).
Str("space", n.Name()).
Msg("could not move space")
continue
}
m.linkSpaceNode("personal", n.Name())
m.linkSpaceNode(migrator, "personal", n.Name())
}
}
// TODO delete nodesPath if empty
}
return resultSucceeded, nil
return stateSucceeded, nil
}

// Rollback is not implemented
func (Migration0001) Rollback(_ *Migrator) (Result, error) {
return stateFailed, errors.New("rollback not implemented")
}

func (m *Migrator) moveNode(spaceID, nodeID string) error {
dirPath := filepath.Join(m.lu.InternalRoot(), "nodes", nodeID)
func (m Migration0001) moveNode(migrator *Migrator, spaceID, nodeID string) error {
dirPath := filepath.Join(migrator.lu.InternalRoot(), "nodes", nodeID)
f, err := os.Open(dirPath)
if err != nil {
return err
Expand All @@ -76,18 +87,18 @@ func (m *Migrator) moveNode(spaceID, nodeID string) error {
return err
}
for _, child := range children {
old := filepath.Join(m.lu.InternalRoot(), "nodes", child.Name())
new := filepath.Join(m.lu.InternalRoot(), "spaces", lookup.Pathify(spaceID, 1, 2), "nodes", lookup.Pathify(child.Name(), 4, 2))
old := filepath.Join(migrator.lu.InternalRoot(), "nodes", child.Name())
new := filepath.Join(migrator.lu.InternalRoot(), "spaces", lookup.Pathify(spaceID, 1, 2), "nodes", lookup.Pathify(child.Name(), 4, 2))
if err := os.Rename(old, new); err != nil {
m.log.Error().Err(err).
migrator.log.Error().Err(err).
Str("space", spaceID).
Str("nodes", child.Name()).
Str("oldpath", old).
Str("newpath", new).
Msg("could not rename node")
}
if child.IsDir() {
if err := m.moveNode(spaceID, child.Name()); err != nil {
if err := m.moveNode(migrator, spaceID, child.Name()); err != nil {
return err
}
}
Expand All @@ -96,27 +107,27 @@ func (m *Migrator) moveNode(spaceID, nodeID string) error {
}

// linkSpace creates a new symbolic link for a space with the given type st, and node id
func (m *Migrator) linkSpaceNode(spaceType, spaceID string) {
spaceTypesPath := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType, spaceID)
func (m Migration0001) linkSpaceNode(migrator *Migrator, spaceType, spaceID string) {
spaceTypesPath := filepath.Join(migrator.lu.InternalRoot(), "spacetypes", spaceType, spaceID)
expectedTarget := "../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
linkTarget, err := os.Readlink(spaceTypesPath)
if errors.Is(err, os.ErrNotExist) {
err = os.Symlink(expectedTarget, spaceTypesPath)
if err != nil {
m.log.Error().Err(err).
migrator.log.Error().Err(err).
Str("space_type", spaceType).
Str("space", spaceID).
Msg("could not create symlink")
}
} else {
if err != nil {
m.log.Error().Err(err).
migrator.log.Error().Err(err).
Str("space_type", spaceType).
Str("space", spaceID).
Msg("could not read symlink")
}
if linkTarget != expectedTarget {
m.log.Warn().
migrator.log.Warn().
Str("space_type", spaceType).
Str("space", spaceID).
Str("expected", expectedTarget).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,39 @@
package migrator

import (
"errors"
"io"
"os"
"path/filepath"

"github.com/cs3org/reva/v2/pkg/logger"
)

// Migration0002 migrates spacetypes to indexes
func (m *Migrator) Migration0002() (Result, error) {
m.log.Info().Msg("Migrating space types indexes...")
func init() {
registerMigration("0002", Migration0002{})
}

type Migration0002 struct{}

// Migrate migrates spacetypes to indexes
func (m Migration0002) Migrate(migrator *Migrator) (Result, error) {
migrator.log.Info().Msg("Migrating space types indexes...")

spaceTypesPath := filepath.Join(m.lu.InternalRoot(), "spacetypes")
spaceTypesPath := filepath.Join(migrator.lu.InternalRoot(), "spacetypes")
fi, err := os.Stat(spaceTypesPath)
if err == nil && fi.IsDir() {

f, err := os.Open(spaceTypesPath)
if err != nil {
return resultFailed, err
return stateFailed, err
}
spaceTypes, err := f.Readdir(0)
if err != nil {
return resultFailed, err
return stateFailed, err
}

for _, st := range spaceTypes {
err := m.moveSpaceType(st.Name())
err := m.moveSpaceType(migrator, st.Name())
if err != nil {
logger.New().Error().Err(err).
Str("space", st.Name()).
Expand All @@ -59,7 +66,7 @@ func (m *Migrator) Migration0002() (Result, error) {
logger.New().Error().Err(err).
Str("spacetypesdir", spaceTypesPath).
Msg("could not open spacetypesdir")
return resultFailed, nil
return stateFailed, nil
}
defer d.Close()
_, err = d.Readdirnames(1) // Or f.Readdir(1)
Expand All @@ -77,11 +84,16 @@ func (m *Migrator) Migration0002() (Result, error) {
Msg("could not delete, not empty")
}
}
return resultSucceeded, nil
return stateSucceeded, nil
}

// Rollback is not implemented
func (Migration0002) Rollback(_ *Migrator) (Result, error) {
return stateFailed, errors.New("rollback not implemented")
}

func (m *Migrator) moveSpaceType(spaceType string) error {
dirPath := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType)
func (m Migration0002) moveSpaceType(migrator *Migrator, spaceType string) error {
dirPath := filepath.Join(migrator.lu.InternalRoot(), "spacetypes", spaceType)
f, err := os.Open(dirPath)
if err != nil {
return err
Expand All @@ -91,7 +103,7 @@ func (m *Migrator) moveSpaceType(spaceType string) error {
return err
}
for _, child := range children {
old := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType, child.Name())
old := filepath.Join(migrator.lu.InternalRoot(), "spacetypes", spaceType, child.Name())
target, err := os.Readlink(old)
if err != nil {
logger.New().Error().Err(err).
Expand All @@ -101,7 +113,7 @@ func (m *Migrator) moveSpaceType(spaceType string) error {
Msg("could not read old symlink")
continue
}
newDir := filepath.Join(m.lu.InternalRoot(), "indexes", "by-type", spaceType)
newDir := filepath.Join(migrator.lu.InternalRoot(), "indexes", "by-type", spaceType)
if err := os.MkdirAll(newDir, 0700); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,29 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
)

// Migration0003 migrates the file metadata to the current backend.
func init() {
registerMigration("0003", Migration0003{})
}

type Migration0003 struct{}

// Migrate migrates the file metadata to the current backend.
// Only the xattrs -> messagepack path is supported.
func (m *Migrator) Migration0003() (Result, error) {
bod := lookup.DetectBackendOnDisk(m.lu.InternalRoot())
func (m Migration0003) Migrate(migrator *Migrator) (Result, error) {
bod := lookup.DetectBackendOnDisk(migrator.lu.InternalRoot())
if bod == "" {
return resultFailed, errors.New("could not detect metadata backend on disk")
return stateFailed, errors.New("could not detect metadata backend on disk")
}

if bod != "xattrs" || m.lu.MetadataBackend().Name() != "messagepack" {
return resultSucceededRunAgain, nil
if bod != "xattrs" || migrator.lu.MetadataBackend().Name() != "messagepack" {
return stateSucceededRunAgain, nil
}

m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Migrating to messagepack metadata backend...")
migrator.log.Info().Str("root", migrator.lu.InternalRoot()).Msg("Migrating to messagepack metadata backend...")
xattrs := metadata.XattrsBackend{}
mpk := metadata.NewMessagePackBackend(m.lu.InternalRoot(), cache.Config{})
mpk := metadata.NewMessagePackBackend(migrator.lu.InternalRoot(), cache.Config{})

spaces, _ := filepath.Glob(filepath.Join(m.lu.InternalRoot(), "spaces", "*", "*"))
spaces, _ := filepath.Glob(filepath.Join(migrator.lu.InternalRoot(), "spaces", "*", "*"))
for _, space := range spaces {
err := filepath.WalkDir(filepath.Join(space, "nodes"), func(path string, _ fs.DirEntry, err error) error {
// Do not continue on error
Expand Down Expand Up @@ -77,7 +83,7 @@ func (m *Migrator) Migration0003() (Result, error) {

attribs, err := xattrs.All(context.Background(), path)
if err != nil {
m.log.Error().Err(err).Str("path", path).Msg("error converting file")
migrator.log.Error().Err(err).Str("path", path).Msg("error converting file")
return err
}
if len(attribs) == 0 {
Expand All @@ -86,24 +92,29 @@ func (m *Migrator) Migration0003() (Result, error) {

err = mpk.SetMultiple(context.Background(), path, attribs, false)
if err != nil {
m.log.Error().Err(err).Str("path", path).Msg("error setting attributes")
migrator.log.Error().Err(err).Str("path", path).Msg("error setting attributes")
return err
}

for k := range attribs {
err = xattrs.Remove(context.Background(), path, k)
if err != nil {
m.log.Debug().Err(err).Str("path", path).Msg("error removing xattr")
migrator.log.Debug().Err(err).Str("path", path).Msg("error removing xattr")
}
}

return nil
})
if err != nil {
m.log.Error().Err(err).Msg("error migrating nodes to messagepack metadata backend")
migrator.log.Error().Err(err).Msg("error migrating nodes to messagepack metadata backend")
}
}

m.log.Info().Msg("done.")
return resultSucceeded, nil
migrator.log.Info().Msg("done.")
return stateSucceeded, nil
}

// Rollback is not implemented
func (Migration0003) Rollback(_ *Migrator) (Result, error) {
return stateFailed, errors.New("rollback not implemented")
}
Loading

0 comments on commit db79e9b

Please sign in to comment.