From 9cd96567e2e348966a8f468cf10409c26b252121 Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Wed, 7 Jun 2023 13:03:38 -0400 Subject: [PATCH] Adding optional revision bump to snapshot restore --- etcdutl/etcdutl/snapshot_command.go | 6 +++- etcdutl/snapshot/util.go | 11 ++++++ etcdutl/snapshot/v3_snapshot.go | 54 +++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/etcdutl/etcdutl/snapshot_command.go b/etcdutl/etcdutl/snapshot_command.go index 28df31f8dd02..3355e48c6fbe 100644 --- a/etcdutl/etcdutl/snapshot_command.go +++ b/etcdutl/etcdutl/snapshot_command.go @@ -38,6 +38,7 @@ var ( restorePeerURLs string restoreName string skipHashCheck bool + revisionBump int64 ) // NewSnapshotCommand returns the cobra command for "snapshot". @@ -75,6 +76,7 @@ func NewSnapshotRestoreCommand() *cobra.Command { cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster") cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member") cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)") + cmd.Flags().Int64Var(&revisionBump, "bump-revision", 0, "How much to increase the revision of each key, after restore, to force a resync of watchers") cmd.MarkFlagDirname("data-dir") cmd.MarkFlagDirname("wal-dir") @@ -100,7 +102,7 @@ func SnapshotStatusCommandFunc(cmd *cobra.Command, args []string) { func snapshotRestoreCommandFunc(_ *cobra.Command, args []string) { SnapshotRestoreCommandFunc(restoreCluster, restoreClusterToken, restoreDataDir, restoreWalDir, - restorePeerURLs, restoreName, skipHashCheck, args) + restorePeerURLs, restoreName, skipHashCheck, revisionBump, args) } func SnapshotRestoreCommandFunc(restoreCluster string, @@ -110,6 +112,7 @@ func SnapshotRestoreCommandFunc(restoreCluster string, restorePeerURLs string, restoreName string, skipHashCheck bool, + revisionBump int64, args []string) { if len(args) != 1 { err := fmt.Errorf("snapshot restore requires exactly one argument") @@ -138,6 +141,7 @@ func SnapshotRestoreCommandFunc(restoreCluster string, InitialCluster: restoreCluster, InitialClusterToken: restoreClusterToken, SkipHashCheck: skipHashCheck, + RevisionBump: revisionBump, }); err != nil { cobrautl.ExitWithError(cobrautl.ExitError, err) } diff --git a/etcdutl/snapshot/util.go b/etcdutl/snapshot/util.go index 2c1fae21fa15..b2b562e84ce2 100644 --- a/etcdutl/snapshot/util.go +++ b/etcdutl/snapshot/util.go @@ -20,12 +20,23 @@ import ( type revision struct { main int64 + sep byte sub int64 } +// The binary representation of the revision is 17 bytes: +// +// [8 bytes: main] [1 byte: sep] [8 bytes: sub] func bytesToRev(bytes []byte) revision { return revision{ main: int64(binary.BigEndian.Uint64(bytes[0:8])), + sep: bytes[8], sub: int64(binary.BigEndian.Uint64(bytes[9:])), } } + +func revToBytes(bytes []byte, rev revision) { + binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main)) + bytes[8] = rev.sep + binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub)) +} diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 8958ba80da13..02a0b95e4365 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -30,6 +30,7 @@ import ( bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/types" clientv3 "go.etcd.io/etcd/client/v3" @@ -203,6 +204,8 @@ type RestoreConfig struct { // SkipHashCheck is "true" to ignore snapshot integrity hash value // (required if copied from data directory). SkipHashCheck bool + + RevisionBump int64 } // Restore restores a new etcd data directory from given snapshot file. @@ -265,6 +268,11 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { if err = s.saveDB(); err != nil { return err } + + if err = s.revBump(cfg.RevisionBump); err != nil { + return err + } + hardstate, err := s.saveWALAndSnap() if err != nil { return err @@ -311,6 +319,52 @@ func (s *v3Manager) saveDB() error { return nil } +// revBump increases the revision, modrevision, and version of every key to force watchers to resync. +// +// In order to ensure that watchers don't have a revision that's higher than the restored revision, +// 'amount' should be a very large number, > 1_000_000. +// +// Compaction should be run after this to further help invalidate watcher caches. +func (s *v3Manager) revBump(amount int64) error { + // Don't do anything if there's nothing to do + if amount == 0 { + s.lg.Info("skipping revision bump") + return nil + } + + s.lg.Info("starting revision bump", zap.Int64("bump-amount", amount)) + defer s.lg.Info("finished revision bump") + + be := backend.NewDefaultBackend(s.lg, s.outDbPath()) + defer be.Close() + + tx := be.BatchTx() + tx.LockInsideApply() + defer tx.Unlock() + + return tx.UnsafeForEach(schema.Key, func(k, v []byte) (err error) { + rev := bytesToRev(k) + rev.main += amount + revToBytes(k, rev) + + val := mvccpb.KeyValue{} + if err = val.Unmarshal(v); err != nil { + return err + } + + val.ModRevision += amount + val.Version += amount + + v, err = val.Marshal() + if err != nil { + return err + } + + tx.UnsafeSeqPut(schema.Key, k, v) + return nil + }) +} + func (s *v3Manager) copyAndVerifyDB() error { srcf, ferr := os.Open(s.srcDbPath) if ferr != nil {