diff --git a/changelog/unreleased/decomposedfs-rollback-migrations.md b/changelog/unreleased/decomposedfs-rollback-migrations.md new file mode 100644 index 0000000000..a80e0bd0bd --- /dev/null +++ b/changelog/unreleased/decomposedfs-rollback-migrations.md @@ -0,0 +1,7 @@ +Enhancement: Allow for rolling back migrations + +The decomposedfs now supports rolling back migrations (starting with 0004). It +also got a Migrations() method which returns the list of migrations incl. their +states. + +https://github.com/cs3org/reva/pull/4083 diff --git a/pkg/storage/utils/decomposedfs/migrator/0001_create_spaces_directory_structure.go b/pkg/storage/utils/decomposedfs/migrator/0001_create_spaces_directory_structure.go index 99ee44d99d..f915023e8f 100644 --- a/pkg/storage/utils/decomposedfs/migrator/0001_create_spaces_directory_structure.go +++ b/pkg/storage/utils/decomposedfs/migrator/0001_create_spaces_directory_structure.go @@ -29,44 +29,55 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" ) -// Migration0001 creates the spaces directory structure -func (m *Migrator) Migration0001() (Result, error) { - m.log.Info().Msg("Migrating spaces directory structure...") +func init() { + registerMigration("0001", Migration0001{}) +} + +type Migration0001 struct{} + +// Migrate creates the spaces directory structure +func (m Migration0001) Migrate(migrator *Migrator) (Result, error) { + migrator.log.Info().Msg("Migrating spaces directory structure...") // create spaces folder and iterate over existing nodes to populate it - nodesPath := filepath.Join(m.lu.InternalRoot(), "nodes") + nodesPath := filepath.Join(migrator.lu.InternalRoot(), "nodes") fi, err := os.Stat(nodesPath) if err == nil && fi.IsDir() { f, err := os.Open(nodesPath) if err != nil { - return resultFailed, err + return stateFailed, err } nodes, err := f.Readdir(0) if err != nil { - return resultFailed, err + return stateFailed, err } for _, n := range nodes { nodePath := filepath.Join(nodesPath, n.Name()) - attr, err := m.lu.MetadataBackend().Get(context.Background(), nodePath, prefixes.ParentidAttr) + attr, err := migrator.lu.MetadataBackend().Get(context.Background(), nodePath, prefixes.ParentidAttr) if err == nil && string(attr) == node.RootID { - if err := m.moveNode(n.Name(), n.Name()); err != nil { - m.log.Error().Err(err). + if err := m.moveNode(migrator, n.Name(), n.Name()); err != nil { + migrator.log.Error().Err(err). Str("space", n.Name()). Msg("could not move space") continue } - m.linkSpaceNode("personal", n.Name()) + m.linkSpaceNode(migrator, "personal", n.Name()) } } // TODO delete nodesPath if empty } - return resultSucceeded, nil + return stateSucceeded, nil +} + +// Rollback is not implemented +func (Migration0001) Rollback(_ *Migrator) (Result, error) { + return stateFailed, errors.New("rollback not implemented") } -func (m *Migrator) moveNode(spaceID, nodeID string) error { - dirPath := filepath.Join(m.lu.InternalRoot(), "nodes", nodeID) +func (m Migration0001) moveNode(migrator *Migrator, spaceID, nodeID string) error { + dirPath := filepath.Join(migrator.lu.InternalRoot(), "nodes", nodeID) f, err := os.Open(dirPath) if err != nil { return err @@ -76,10 +87,10 @@ func (m *Migrator) moveNode(spaceID, nodeID string) error { return err } for _, child := range children { - old := filepath.Join(m.lu.InternalRoot(), "nodes", child.Name()) - new := filepath.Join(m.lu.InternalRoot(), "spaces", lookup.Pathify(spaceID, 1, 2), "nodes", lookup.Pathify(child.Name(), 4, 2)) + old := filepath.Join(migrator.lu.InternalRoot(), "nodes", child.Name()) + new := filepath.Join(migrator.lu.InternalRoot(), "spaces", lookup.Pathify(spaceID, 1, 2), "nodes", lookup.Pathify(child.Name(), 4, 2)) if err := os.Rename(old, new); err != nil { - m.log.Error().Err(err). + migrator.log.Error().Err(err). Str("space", spaceID). Str("nodes", child.Name()). Str("oldpath", old). @@ -87,7 +98,7 @@ func (m *Migrator) moveNode(spaceID, nodeID string) error { Msg("could not rename node") } if child.IsDir() { - if err := m.moveNode(spaceID, child.Name()); err != nil { + if err := m.moveNode(migrator, spaceID, child.Name()); err != nil { return err } } @@ -96,27 +107,27 @@ func (m *Migrator) moveNode(spaceID, nodeID string) error { } // linkSpace creates a new symbolic link for a space with the given type st, and node id -func (m *Migrator) linkSpaceNode(spaceType, spaceID string) { - spaceTypesPath := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType, spaceID) +func (m Migration0001) linkSpaceNode(migrator *Migrator, spaceType, spaceID string) { + spaceTypesPath := filepath.Join(migrator.lu.InternalRoot(), "spacetypes", spaceType, spaceID) expectedTarget := "../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2) linkTarget, err := os.Readlink(spaceTypesPath) if errors.Is(err, os.ErrNotExist) { err = os.Symlink(expectedTarget, spaceTypesPath) if err != nil { - m.log.Error().Err(err). + migrator.log.Error().Err(err). Str("space_type", spaceType). Str("space", spaceID). Msg("could not create symlink") } } else { if err != nil { - m.log.Error().Err(err). + migrator.log.Error().Err(err). Str("space_type", spaceType). Str("space", spaceID). Msg("could not read symlink") } if linkTarget != expectedTarget { - m.log.Warn(). + migrator.log.Warn(). Str("space_type", spaceType). Str("space", spaceID). Str("expected", expectedTarget). diff --git a/pkg/storage/utils/decomposedfs/migrator/0002_move_spacetypes_to_indexes.go b/pkg/storage/utils/decomposedfs/migrator/0002_move_spacetypes_to_indexes.go index 689f736f35..1250c69cfe 100644 --- a/pkg/storage/utils/decomposedfs/migrator/0002_move_spacetypes_to_indexes.go +++ b/pkg/storage/utils/decomposedfs/migrator/0002_move_spacetypes_to_indexes.go @@ -19,6 +19,7 @@ package migrator import ( + "errors" "io" "os" "path/filepath" @@ -26,25 +27,31 @@ import ( "github.com/cs3org/reva/v2/pkg/logger" ) -// Migration0002 migrates spacetypes to indexes -func (m *Migrator) Migration0002() (Result, error) { - m.log.Info().Msg("Migrating space types indexes...") +func init() { + registerMigration("0002", Migration0002{}) +} + +type Migration0002 struct{} + +// Migrate migrates spacetypes to indexes +func (m Migration0002) Migrate(migrator *Migrator) (Result, error) { + migrator.log.Info().Msg("Migrating space types indexes...") - spaceTypesPath := filepath.Join(m.lu.InternalRoot(), "spacetypes") + spaceTypesPath := filepath.Join(migrator.lu.InternalRoot(), "spacetypes") fi, err := os.Stat(spaceTypesPath) if err == nil && fi.IsDir() { f, err := os.Open(spaceTypesPath) if err != nil { - return resultFailed, err + return stateFailed, err } spaceTypes, err := f.Readdir(0) if err != nil { - return resultFailed, err + return stateFailed, err } for _, st := range spaceTypes { - err := m.moveSpaceType(st.Name()) + err := m.moveSpaceType(migrator, st.Name()) if err != nil { logger.New().Error().Err(err). Str("space", st.Name()). @@ -59,7 +66,7 @@ func (m *Migrator) Migration0002() (Result, error) { logger.New().Error().Err(err). Str("spacetypesdir", spaceTypesPath). Msg("could not open spacetypesdir") - return resultFailed, nil + return stateFailed, nil } defer d.Close() _, err = d.Readdirnames(1) // Or f.Readdir(1) @@ -77,11 +84,16 @@ func (m *Migrator) Migration0002() (Result, error) { Msg("could not delete, not empty") } } - return resultSucceeded, nil + return stateSucceeded, nil +} + +// Rollback is not implemented +func (Migration0002) Rollback(_ *Migrator) (Result, error) { + return stateFailed, errors.New("rollback not implemented") } -func (m *Migrator) moveSpaceType(spaceType string) error { - dirPath := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType) +func (m Migration0002) moveSpaceType(migrator *Migrator, spaceType string) error { + dirPath := filepath.Join(migrator.lu.InternalRoot(), "spacetypes", spaceType) f, err := os.Open(dirPath) if err != nil { return err @@ -91,7 +103,7 @@ func (m *Migrator) moveSpaceType(spaceType string) error { return err } for _, child := range children { - old := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType, child.Name()) + old := filepath.Join(migrator.lu.InternalRoot(), "spacetypes", spaceType, child.Name()) target, err := os.Readlink(old) if err != nil { logger.New().Error().Err(err). @@ -101,7 +113,7 @@ func (m *Migrator) moveSpaceType(spaceType string) error { Msg("could not read old symlink") continue } - newDir := filepath.Join(m.lu.InternalRoot(), "indexes", "by-type", spaceType) + newDir := filepath.Join(migrator.lu.InternalRoot(), "indexes", "by-type", spaceType) if err := os.MkdirAll(newDir, 0700); err != nil { logger.New().Error().Err(err). Str("space", spaceType). diff --git a/pkg/storage/utils/decomposedfs/migrator/0003_switch_to_messagepack_metadata.go b/pkg/storage/utils/decomposedfs/migrator/0003_switch_to_messagepack_metadata.go index 9aa7069b10..73c919d0fa 100644 --- a/pkg/storage/utils/decomposedfs/migrator/0003_switch_to_messagepack_metadata.go +++ b/pkg/storage/utils/decomposedfs/migrator/0003_switch_to_messagepack_metadata.go @@ -31,23 +31,29 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" ) -// Migration0003 migrates the file metadata to the current backend. +func init() { + registerMigration("0003", Migration0003{}) +} + +type Migration0003 struct{} + +// Migrate migrates the file metadata to the current backend. // Only the xattrs -> messagepack path is supported. -func (m *Migrator) Migration0003() (Result, error) { - bod := lookup.DetectBackendOnDisk(m.lu.InternalRoot()) +func (m Migration0003) Migrate(migrator *Migrator) (Result, error) { + bod := lookup.DetectBackendOnDisk(migrator.lu.InternalRoot()) if bod == "" { - return resultFailed, errors.New("could not detect metadata backend on disk") + return stateFailed, errors.New("could not detect metadata backend on disk") } - if bod != "xattrs" || m.lu.MetadataBackend().Name() != "messagepack" { - return resultSucceededRunAgain, nil + if bod != "xattrs" || migrator.lu.MetadataBackend().Name() != "messagepack" { + return stateSucceededRunAgain, nil } - m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Migrating to messagepack metadata backend...") + migrator.log.Info().Str("root", migrator.lu.InternalRoot()).Msg("Migrating to messagepack metadata backend...") xattrs := metadata.XattrsBackend{} - mpk := metadata.NewMessagePackBackend(m.lu.InternalRoot(), cache.Config{}) + mpk := metadata.NewMessagePackBackend(migrator.lu.InternalRoot(), cache.Config{}) - spaces, _ := filepath.Glob(filepath.Join(m.lu.InternalRoot(), "spaces", "*", "*")) + spaces, _ := filepath.Glob(filepath.Join(migrator.lu.InternalRoot(), "spaces", "*", "*")) for _, space := range spaces { err := filepath.WalkDir(filepath.Join(space, "nodes"), func(path string, _ fs.DirEntry, err error) error { // Do not continue on error @@ -77,7 +83,7 @@ func (m *Migrator) Migration0003() (Result, error) { attribs, err := xattrs.All(context.Background(), path) if err != nil { - m.log.Error().Err(err).Str("path", path).Msg("error converting file") + migrator.log.Error().Err(err).Str("path", path).Msg("error converting file") return err } if len(attribs) == 0 { @@ -86,24 +92,29 @@ func (m *Migrator) Migration0003() (Result, error) { err = mpk.SetMultiple(context.Background(), path, attribs, false) if err != nil { - m.log.Error().Err(err).Str("path", path).Msg("error setting attributes") + migrator.log.Error().Err(err).Str("path", path).Msg("error setting attributes") return err } for k := range attribs { err = xattrs.Remove(context.Background(), path, k) if err != nil { - m.log.Debug().Err(err).Str("path", path).Msg("error removing xattr") + migrator.log.Debug().Err(err).Str("path", path).Msg("error removing xattr") } } return nil }) if err != nil { - m.log.Error().Err(err).Msg("error migrating nodes to messagepack metadata backend") + migrator.log.Error().Err(err).Msg("error migrating nodes to messagepack metadata backend") } } - m.log.Info().Msg("done.") - return resultSucceeded, nil + migrator.log.Info().Msg("done.") + return stateSucceeded, nil +} + +// Rollback is not implemented +func (Migration0003) Rollback(_ *Migrator) (Result, error) { + return stateFailed, errors.New("rollback not implemented") } diff --git a/pkg/storage/utils/decomposedfs/migrator/0004_switch_to_messagepack_space_index.go b/pkg/storage/utils/decomposedfs/migrator/0004_switch_to_messagepack_space_index.go index 43f36244e4..87dd226c0c 100644 --- a/pkg/storage/utils/decomposedfs/migrator/0004_switch_to_messagepack_space_index.go +++ b/pkg/storage/utils/decomposedfs/migrator/0004_switch_to_messagepack_space_index.go @@ -21,18 +21,25 @@ package migrator import ( "os" "path/filepath" + "strings" "github.com/shamaton/msgpack/v2" ) -// Migration0004 migrates the directory tree based space indexes to messagepack -func (m *Migrator) Migration0004() (Result, error) { - root := m.lu.InternalRoot() +func init() { + registerMigration("0004", Migration0004{}) +} + +type Migration0004 struct{} + +// Migrate migrates the directory tree based space indexes to messagepack +func (Migration0004) Migrate(migrator *Migrator) (Result, error) { + root := migrator.lu.InternalRoot() // migrate user indexes users, err := os.ReadDir(filepath.Join(root, "indexes", "by-user-id")) if err != nil { - m.log.Warn().Err(err).Msg("error listing user indexes") + migrator.log.Warn().Err(err).Msg("error listing user indexes") } for _, user := range users { if !user.IsDir() { @@ -41,19 +48,18 @@ func (m *Migrator) Migration0004() (Result, error) { id := user.Name() indexPath := filepath.Join(root, "indexes", "by-user-id", id+".mpk") dirIndexPath := filepath.Join(root, "indexes", "by-user-id", id) - cacheKey := "by-user-id:" + id - m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Migrating " + indexPath + " to messagepack index format...") - err := migrateSpaceIndex(indexPath, dirIndexPath, cacheKey) + migrator.log.Info().Str("root", migrator.lu.InternalRoot()).Msg("Migrating " + indexPath + " to messagepack index format...") + err := migrateSpaceIndex(indexPath, dirIndexPath) if err != nil { - m.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") + migrator.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") } } // migrate group indexes groups, err := os.ReadDir(filepath.Join(root, "indexes", "by-group-id")) if err != nil { - m.log.Warn().Err(err).Msg("error listing group indexes") + migrator.log.Warn().Err(err).Msg("error listing group indexes") } for _, group := range groups { if !group.IsDir() { @@ -62,12 +68,11 @@ func (m *Migrator) Migration0004() (Result, error) { id := group.Name() indexPath := filepath.Join(root, "indexes", "by-group-id", id+".mpk") dirIndexPath := filepath.Join(root, "indexes", "by-group-id", id) - cacheKey := "by-group-id:" + id - m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Migrating " + indexPath + " to messagepack index format...") - err := migrateSpaceIndex(indexPath, dirIndexPath, cacheKey) + migrator.log.Info().Str("root", migrator.lu.InternalRoot()).Msg("Migrating " + indexPath + " to messagepack index format...") + err := migrateSpaceIndex(indexPath, dirIndexPath) if err != nil { - m.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") + migrator.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") } } @@ -75,25 +80,24 @@ func (m *Migrator) Migration0004() (Result, error) { for _, spaceType := range []string{"personal", "project", "share"} { indexPath := filepath.Join(root, "indexes", "by-type", spaceType+".mpk") dirIndexPath := filepath.Join(root, "indexes", "by-type", spaceType) - cacheKey := "by-type:" + spaceType _, err := os.Stat(dirIndexPath) if err != nil { continue } - m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Migrating " + indexPath + " to messagepack index format...") - err = migrateSpaceIndex(indexPath, dirIndexPath, cacheKey) + migrator.log.Info().Str("root", migrator.lu.InternalRoot()).Msg("Migrating " + indexPath + " to messagepack index format...") + err = migrateSpaceIndex(indexPath, dirIndexPath) if err != nil { - m.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") + migrator.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") } } - m.log.Info().Msg("done.") - return resultSucceeded, nil + migrator.log.Info().Msg("done.") + return stateSucceeded, nil } -func migrateSpaceIndex(indexPath, dirIndexPath, cacheKey string) error { +func migrateSpaceIndex(indexPath, dirIndexPath string) error { links := map[string][]byte{} m, err := filepath.Glob(dirIndexPath + "/*") if err != nil { @@ -118,3 +122,82 @@ func migrateSpaceIndex(indexPath, dirIndexPath, cacheKey string) error { } return os.RemoveAll(dirIndexPath) } + +// Rollback migrates the directory messagepack indexes to symlinks +func (Migration0004) Rollback(m *Migrator) (Result, error) { + root := m.lu.InternalRoot() + + // migrate user indexes + users, err := filepath.Glob(filepath.Join(root, "indexes", "by-user-id", "*.mpk")) + if err != nil { + m.log.Warn().Err(err).Msg("error listing user indexes") + } + for _, indexPath := range users { + dirIndexPath := strings.TrimSuffix(indexPath, ".mpk") + + m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Migrating " + indexPath + " to symlinks index format...") + err := downSpaceIndex(indexPath, dirIndexPath) + if err != nil { + m.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") + } + } + + // migrate group indexes + groups, err := filepath.Glob(filepath.Join(root, "indexes", "by-group-id", "*.mpk")) + if err != nil { + m.log.Warn().Err(err).Msg("error listing group indexes") + } + for _, indexPath := range groups { + dirIndexPath := strings.TrimSuffix(indexPath, ".mpk") + + m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Migrating " + indexPath + " to symlinks index format...") + err := downSpaceIndex(indexPath, dirIndexPath) + if err != nil { + m.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") + } + } + + // migrate project indexes + for _, spaceType := range []string{"personal", "project", "share"} { + indexPath := filepath.Join(root, "indexes", "by-type", spaceType+".mpk") + dirIndexPath := filepath.Join(root, "indexes", "by-type", spaceType) + + _, err := os.Stat(indexPath) + if err != nil || os.IsNotExist(err) { + continue + } + + m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Migrating " + indexPath + " to symlinks index format...") + err = downSpaceIndex(indexPath, dirIndexPath) + if err != nil { + m.log.Error().Err(err).Str("path", dirIndexPath).Msg("error migrating index") + } + } + return stateDown, nil +} + +func downSpaceIndex(indexPath, dirIndexPath string) error { + d, err := os.ReadFile(indexPath) + if err != nil { + return err + } + + links := map[string][]byte{} + err = msgpack.Unmarshal(d, &links) + if err != nil { + return err + } + + err = os.MkdirAll(dirIndexPath, 0700) + if err != nil { + return err + } + for link, target := range links { + err = os.Symlink(string(target), filepath.Join(dirIndexPath, link)) + if err != nil { + return err + } + } + + return os.Remove(indexPath) +} diff --git a/pkg/storage/utils/decomposedfs/migrator/0005_fix_messagepack_space_index_format.go b/pkg/storage/utils/decomposedfs/migrator/0005_fix_messagepack_space_index_format.go index ca69fd869c..d3a7bded47 100644 --- a/pkg/storage/utils/decomposedfs/migrator/0005_fix_messagepack_space_index_format.go +++ b/pkg/storage/utils/decomposedfs/migrator/0005_fix_messagepack_space_index_format.go @@ -25,27 +25,33 @@ import ( "github.com/shamaton/msgpack/v2" ) -// Migration0005 fixes the messagepack space index data structure -func (m *Migrator) Migration0005() (Result, error) { - root := m.lu.InternalRoot() +func init() { + registerMigration("0005", Migration0005{}) +} + +type Migration0005 struct{} + +// Migrate fixes the messagepack space index data structure +func (Migration0005) Migrate(migrator *Migrator) (Result, error) { + root := migrator.lu.InternalRoot() indexes, err := filepath.Glob(filepath.Join(root, "indexes", "**", "*.mpk")) if err != nil { - return resultFailed, err + return stateFailed, err } for _, i := range indexes { - m.log.Info().Str("root", m.lu.InternalRoot()).Msg("Fixing index format of " + i) + migrator.log.Info().Str("root", migrator.lu.InternalRoot()).Msg("Fixing index format of " + i) // Read old-format index oldData, err := os.ReadFile(i) if err != nil { - return resultFailed, err + return stateFailed, err } oldIndex := map[string][]byte{} err = msgpack.Unmarshal(oldData, &oldIndex) if err != nil { // likely already migrated -> skip - m.log.Warn().Str("root", m.lu.InternalRoot()).Msg("Invalid index format found in " + i) + migrator.log.Warn().Str("root", migrator.lu.InternalRoot()).Msg("Invalid index format found in " + i) continue } @@ -56,13 +62,53 @@ func (m *Migrator) Migration0005() (Result, error) { } newData, err := msgpack.Marshal(newIndex) if err != nil { - return resultFailed, err + return stateFailed, err + } + err = os.WriteFile(i, newData, 0600) + if err != nil { + return stateFailed, err + } + } + migrator.log.Info().Msg("done.") + return stateSucceeded, nil +} + +// Rollback rolls back the migration +func (Migration0005) Rollback(migrator *Migrator) (Result, error) { + root := migrator.lu.InternalRoot() + + indexes, err := filepath.Glob(filepath.Join(root, "indexes", "**", "*.mpk")) + if err != nil { + return stateFailed, err + } + for _, i := range indexes { + migrator.log.Info().Str("root", migrator.lu.InternalRoot()).Msg("Fixing index format of " + i) + + oldData, err := os.ReadFile(i) + if err != nil { + return stateFailed, err + } + oldIndex := map[string]string{} + err = msgpack.Unmarshal(oldData, &oldIndex) + if err != nil { + migrator.log.Warn().Str("root", migrator.lu.InternalRoot()).Msg("Invalid index format found in " + i) + continue + } + + // Write new-format index + newIndex := map[string][]byte{} + for k, v := range oldIndex { + newIndex[k] = []byte(v) + } + newData, err := msgpack.Marshal(newIndex) + if err != nil { + return stateFailed, err } err = os.WriteFile(i, newData, 0600) if err != nil { - return resultFailed, err + return stateFailed, err } } - m.log.Info().Msg("done.") - return resultSucceeded, nil + migrator.log.Info().Msg("done.") + return stateDown, nil } diff --git a/pkg/storage/utils/decomposedfs/migrator/migrator.go b/pkg/storage/utils/decomposedfs/migrator/migrator.go index 17001e8024..c164d6fa59 100644 --- a/pkg/storage/utils/decomposedfs/migrator/migrator.go +++ b/pkg/storage/utils/decomposedfs/migrator/migrator.go @@ -20,28 +20,53 @@ package migrator import ( "encoding/json" + "fmt" "os" "path/filepath" - "reflect" + "sort" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/rogpeppe/go-internal/lockedfile" "github.com/rs/zerolog" ) -var allMigrations = []string{"0001", "0002", "0003", "0004", "0005"} - const ( - resultFailed = "failed" - resultSucceeded = "succeeded" - resultSucceededRunAgain = "runagain" + statePending = "pending" + stateFailed = "failed" + stateSucceeded = "succeeded" + stateDown = "down" + stateSucceededRunAgain = "runagain" ) -type migrationState struct { +type migration interface { + Migrate(*Migrator) (Result, error) + Rollback(*Migrator) (Result, error) +} + +var migrations = map[string]migration{} + +type migrationStates map[string]MigrationState + +func registerMigration(name string, migration migration) { + migrations[name] = migration +} + +func allMigrations() []string { + ms := []string{} + + for k := range migrations { + ms = append(ms, k) + } + + sort.Strings(ms) + return ms +} + +// MigrationState holds the state of a migration +type MigrationState struct { State string Message string } -type migrationStates map[string]migrationState // Result represents the result of a migration run type Result string @@ -61,6 +86,71 @@ func New(lu *lookup.Lookup, log *zerolog.Logger) Migrator { } } +// Migrations returns the list of migrations and their states +func (m *Migrator) Migrations() (map[string]MigrationState, error) { + err := m.readStates() + if err != nil { + return nil, err + } + + states := map[string]MigrationState{} + for _, migration := range allMigrations() { + if s, ok := m.states[migration]; ok { + states[migration] = s + } else { + states[migration] = MigrationState{ + State: statePending, + } + } + } + + return states, nil +} + +// RunMigration runs or rolls back a migration +func (m *Migrator) RunMigration(id string, rollback bool) error { + if _, ok := migrations[id]; !ok { + return fmt.Errorf("invalid migration '%s'", id) + } + + lock, err := lockedfile.OpenFile(filepath.Join(m.lu.InternalRoot(), ".migrations.lock"), os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return err + } + defer lock.Close() + + err = m.readStates() + if err != nil { + return err + } + + var res Result + if !rollback { + m.log.Info().Msg("Running migration " + id + "...") + res, err = migrations[id].Migrate(m) + } else { + m.log.Info().Msg("Rolling back migration " + id + "...") + res, err = migrations[id].Rollback(m) + } + + // write back state + s := m.states[id] + s.State = string(res) + + if err != nil { + m.log.Error().Err(err).Msg("migration " + id + " failed") + s.Message = err.Error() + } + + m.states[id] = s + err = m.writeStates() + if err != nil { + return err + } + m.log.Info().Msg("done") + return nil +} + // RunMigrations runs all migrations in sequence. Note this sequence must not be changed or it might // damage existing decomposed fs. func (m *Migrator) RunMigrations() error { @@ -75,17 +165,15 @@ func (m *Migrator) RunMigrations() error { return err } - for _, migration := range allMigrations { + for _, migration := range allMigrations() { s := m.states[migration] - if s.State == "succeeded" { + if s.State == stateSucceeded || s.State == stateDown { continue } - migrateMethod := reflect.ValueOf(m).MethodByName("Migration" + migration) - v := migrateMethod.Call(nil) - s.State = string(v[0].Interface().(Result)) - if v[1].Interface() != nil { - err := v[1].Interface().(error) + res, err := migrations[migration].Migrate(m) + s.State = string(res) + if err != nil { m.log.Error().Err(err).Msg("migration " + migration + " failed") s.Message = err.Error() }