Skip to content

Commit

Permalink
Adding optional revision bump and mark compacted to snapshot restore
Browse files Browse the repository at this point in the history
  • Loading branch information
dusk125 committed Jun 12, 2023
1 parent caee532 commit 6077c15
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 1 deletion.
10 changes: 9 additions & 1 deletion etcdutl/etcdutl/snapshot_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
restorePeerURLs string
restoreName string
skipHashCheck bool
markCompacted bool
revisionBump int64
)

// NewSnapshotCommand returns the cobra command for "snapshot".
Expand Down Expand Up @@ -75,6 +77,8 @@ func NewSnapshotRestoreCommand() *cobra.Command {
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member")
cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)")
cmd.Flags().Int64Var(&revisionBump, "bump-revision", 0, "How much to increase the latest revision after restore")
cmd.Flags().BoolVar(&markCompacted, "mark-compacted", false, "Mark the latest revision after restore as the point of scheduled compaction (to invalidate watcher caches)")

cmd.MarkFlagDirname("data-dir")
cmd.MarkFlagDirname("wal-dir")
Expand All @@ -100,7 +104,7 @@ func SnapshotStatusCommandFunc(cmd *cobra.Command, args []string) {

func snapshotRestoreCommandFunc(_ *cobra.Command, args []string) {
SnapshotRestoreCommandFunc(restoreCluster, restoreClusterToken, restoreDataDir, restoreWalDir,
restorePeerURLs, restoreName, skipHashCheck, args)
restorePeerURLs, restoreName, skipHashCheck, revisionBump, markCompacted, args)
}

func SnapshotRestoreCommandFunc(restoreCluster string,
Expand All @@ -110,6 +114,8 @@ func SnapshotRestoreCommandFunc(restoreCluster string,
restorePeerURLs string,
restoreName string,
skipHashCheck bool,
revisionBump int64,
markCompacted bool,
args []string) {
if len(args) != 1 {
err := fmt.Errorf("snapshot restore requires exactly one argument")
Expand Down Expand Up @@ -138,6 +144,8 @@ func SnapshotRestoreCommandFunc(restoreCluster string,
InitialCluster: restoreCluster,
InitialClusterToken: restoreClusterToken,
SkipHashCheck: skipHashCheck,
RevisionBump: revisionBump,
MarkCompacted: markCompacted,
}); err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
Expand Down
11 changes: 11 additions & 0 deletions etcdutl/snapshot/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@ import (

type revision struct {
main int64
sep byte
sub int64
}

// The binary representation of the revision is 17 bytes:
//
// [8 bytes: main] [1 byte: sep] [8 bytes: sub]
func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sep: bytes[8],
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}

func revToBytes(bytes []byte, rev revision) {
binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main))
bytes[8] = rev.sep
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}
95 changes: 95 additions & 0 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
Expand Down Expand Up @@ -203,6 +204,16 @@ type RestoreConfig struct {
// SkipHashCheck is "true" to ignore snapshot integrity hash value
// (required if copied from data directory).
SkipHashCheck bool

// RevisionBump is the amount to increase the latest revision after restore,
// to ensure that the revisions never decrease from the clients perspective
// If 0, revision bumping is skipped.
RevisionBump int64

// MarkCompacted is "true" to mark the latest revision as scheduled for compaction.
// This will cause the server to compact the restored revisions (at some point after startup),
// and correctly cancel watchers so they can invalidate their caches.
MarkCompacted bool
}

// Restore restores a new etcd data directory from given snapshot file.
Expand Down Expand Up @@ -265,6 +276,17 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
if err = s.saveDB(); err != nil {
return err
}

if err = s.revBump(cfg.RevisionBump); err != nil {
return err
}

if cfg.MarkCompacted {
if err = s.markLatestRevCompacted(); err != nil {
return err
}
}

hardstate, err := s.saveWALAndSnap()
if err != nil {
return err
Expand Down Expand Up @@ -311,6 +333,79 @@ func (s *v3Manager) saveDB() error {
return nil
}

// revBump increases the revision, modrevision, and version of every key to force watchers to resync.
//
// In order to ensure that watchers don't have a revision that's higher than the restored revision,
// 'amount' should be a very large number, > 1_000_000.
//
// Compaction should be run after this to further help invalidate watcher caches.
func (s *v3Manager) revBump(amount int64) error {
// Don't do anything if there's nothing to do
if amount == 0 {
s.lg.Info("skipping revision bump")
return nil
}

be := backend.NewDefaultBackend(s.lg, s.outDbPath())
defer be.Close()

tx := be.BatchTx()
tx.LockInsideApply()
defer tx.Unlock()

var latest revision
var value []byte
if err := tx.UnsafeForEach(schema.Key, func(k, v []byte) (err error) {
rev := bytesToRev(k)

if latest.main < rev.main {
latest = rev
value = v
}

return nil
}); err != nil {
return err
}

s.lg.Info("bumping latest revision to ensure revisions never decrease", zap.Int64("latest-revision", latest.main), zap.Int64("bump-amount", amount), zap.Int64("new-latest-revision", latest.main+amount))

latest.main += amount
k := [17]byte{}
revToBytes(k[:], latest)
tx.UnsafePut(schema.Key, k[:], value)

return nil
}

func (s *v3Manager) markLatestRevCompacted() error {
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
defer be.Close()

tx := be.BatchTx()
tx.LockInsideApply()
defer tx.Unlock()

var latest int64
if err := tx.UnsafeForEach(schema.Key, func(k, v []byte) (err error) {
rev := bytesToRev(k)

if latest < rev.main {
latest = rev.main
}

return nil
}); err != nil {
return err
}

s.lg.Info("marking revision compacted", zap.Int64("revision", latest))

mvcc.UnsafeSetScheduledCompact(tx, latest)

return nil
}

func (s *v3Manager) copyAndVerifyDB() error {
srcf, ferr := os.Open(s.srcDbPath)
if ferr != nil {
Expand Down

0 comments on commit 6077c15

Please sign in to comment.