From 6077c15a92df06c1f4298d357f380427904be026 Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Wed, 7 Jun 2023 13:03:38 -0400 Subject: [PATCH] Adding optional revision bump and mark compacted to snapshot restore --- etcdutl/etcdutl/snapshot_command.go | 10 ++- etcdutl/snapshot/util.go | 11 ++++ etcdutl/snapshot/v3_snapshot.go | 95 +++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+), 1 deletion(-) diff --git a/etcdutl/etcdutl/snapshot_command.go b/etcdutl/etcdutl/snapshot_command.go index 28df31f8dd02..97908337c7d2 100644 --- a/etcdutl/etcdutl/snapshot_command.go +++ b/etcdutl/etcdutl/snapshot_command.go @@ -38,6 +38,8 @@ var ( restorePeerURLs string restoreName string skipHashCheck bool + markCompacted bool + revisionBump int64 ) // NewSnapshotCommand returns the cobra command for "snapshot". @@ -75,6 +77,8 @@ func NewSnapshotRestoreCommand() *cobra.Command { cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster") cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member") cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)") + cmd.Flags().Int64Var(&revisionBump, "bump-revision", 0, "How much to increase the latest revision after restore") + cmd.Flags().BoolVar(&markCompacted, "mark-compacted", false, "Mark the latest revision after restore as the point of scheduled compaction (to invalidate watcher caches)") cmd.MarkFlagDirname("data-dir") cmd.MarkFlagDirname("wal-dir") @@ -100,7 +104,7 @@ func SnapshotStatusCommandFunc(cmd *cobra.Command, args []string) { func snapshotRestoreCommandFunc(_ *cobra.Command, args []string) { SnapshotRestoreCommandFunc(restoreCluster, restoreClusterToken, restoreDataDir, restoreWalDir, - restorePeerURLs, restoreName, skipHashCheck, args) + restorePeerURLs, restoreName, skipHashCheck, revisionBump, markCompacted, args) } func SnapshotRestoreCommandFunc(restoreCluster string, @@ -110,6 +114,8 @@ func SnapshotRestoreCommandFunc(restoreCluster string, restorePeerURLs string, restoreName string, skipHashCheck bool, + revisionBump int64, + markCompacted bool, args []string) { if len(args) != 1 { err := fmt.Errorf("snapshot restore requires exactly one argument") @@ -138,6 +144,8 @@ func SnapshotRestoreCommandFunc(restoreCluster string, InitialCluster: restoreCluster, InitialClusterToken: restoreClusterToken, SkipHashCheck: skipHashCheck, + RevisionBump: revisionBump, + MarkCompacted: markCompacted, }); err != nil { cobrautl.ExitWithError(cobrautl.ExitError, err) } diff --git a/etcdutl/snapshot/util.go b/etcdutl/snapshot/util.go index 2c1fae21fa15..b2b562e84ce2 100644 --- a/etcdutl/snapshot/util.go +++ b/etcdutl/snapshot/util.go @@ -20,12 +20,23 @@ import ( type revision struct { main int64 + sep byte sub int64 } +// The binary representation of the revision is 17 bytes: +// +// [8 bytes: main] [1 byte: sep] [8 bytes: sub] func bytesToRev(bytes []byte) revision { return revision{ main: int64(binary.BigEndian.Uint64(bytes[0:8])), + sep: bytes[8], sub: int64(binary.BigEndian.Uint64(bytes[9:])), } } + +func revToBytes(bytes []byte, rev revision) { + binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main)) + bytes[8] = rev.sep + binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub)) +} diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 8958ba80da13..4977ab689f3b 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -41,6 +41,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/storage/wal" "go.etcd.io/etcd/server/v3/storage/wal/walpb" @@ -203,6 +204,16 @@ type RestoreConfig struct { // SkipHashCheck is "true" to ignore snapshot integrity hash value // (required if copied from data directory). SkipHashCheck bool + + // RevisionBump is the amount to increase the latest revision after restore, + // to ensure that the revisions never decrease from the clients perspective + // If 0, revision bumping is skipped. + RevisionBump int64 + + // MarkCompacted is "true" to mark the latest revision as scheduled for compaction. + // This will cause the server to compact the restored revisions (at some point after startup), + // and correctly cancel watchers so they can invalidate their caches. + MarkCompacted bool } // Restore restores a new etcd data directory from given snapshot file. @@ -265,6 +276,17 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { if err = s.saveDB(); err != nil { return err } + + if err = s.revBump(cfg.RevisionBump); err != nil { + return err + } + + if cfg.MarkCompacted { + if err = s.markLatestRevCompacted(); err != nil { + return err + } + } + hardstate, err := s.saveWALAndSnap() if err != nil { return err @@ -311,6 +333,79 @@ func (s *v3Manager) saveDB() error { return nil } +// revBump increases the revision, modrevision, and version of every key to force watchers to resync. +// +// In order to ensure that watchers don't have a revision that's higher than the restored revision, +// 'amount' should be a very large number, > 1_000_000. +// +// Compaction should be run after this to further help invalidate watcher caches. +func (s *v3Manager) revBump(amount int64) error { + // Don't do anything if there's nothing to do + if amount == 0 { + s.lg.Info("skipping revision bump") + return nil + } + + be := backend.NewDefaultBackend(s.lg, s.outDbPath()) + defer be.Close() + + tx := be.BatchTx() + tx.LockInsideApply() + defer tx.Unlock() + + var latest revision + var value []byte + if err := tx.UnsafeForEach(schema.Key, func(k, v []byte) (err error) { + rev := bytesToRev(k) + + if latest.main < rev.main { + latest = rev + value = v + } + + return nil + }); err != nil { + return err + } + + s.lg.Info("bumping latest revision to ensure revisions never decrease", zap.Int64("latest-revision", latest.main), zap.Int64("bump-amount", amount), zap.Int64("new-latest-revision", latest.main+amount)) + + latest.main += amount + k := [17]byte{} + revToBytes(k[:], latest) + tx.UnsafePut(schema.Key, k[:], value) + + return nil +} + +func (s *v3Manager) markLatestRevCompacted() error { + be := backend.NewDefaultBackend(s.lg, s.outDbPath()) + defer be.Close() + + tx := be.BatchTx() + tx.LockInsideApply() + defer tx.Unlock() + + var latest int64 + if err := tx.UnsafeForEach(schema.Key, func(k, v []byte) (err error) { + rev := bytesToRev(k) + + if latest < rev.main { + latest = rev.main + } + + return nil + }); err != nil { + return err + } + + s.lg.Info("marking revision compacted", zap.Int64("revision", latest)) + + mvcc.UnsafeSetScheduledCompact(tx, latest) + + return nil +} + func (s *v3Manager) copyAndVerifyDB() error { srcf, ferr := os.Open(s.srcDbPath) if ferr != nil {