Skip to content

Commit

Permalink
Adding optional revision bump to snapshot restore
Browse files Browse the repository at this point in the history
  • Loading branch information
dusk125 committed Jun 7, 2023
1 parent caee532 commit 9cd9656
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
6 changes: 5 additions & 1 deletion etcdutl/etcdutl/snapshot_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
restorePeerURLs string
restoreName string
skipHashCheck bool
revisionBump int64
)

// NewSnapshotCommand returns the cobra command for "snapshot".
Expand Down Expand Up @@ -75,6 +76,7 @@ func NewSnapshotRestoreCommand() *cobra.Command {
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member")
cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)")
cmd.Flags().Int64Var(&revisionBump, "bump-revision", 0, "How much to increase the revision of each key, after restore, to force a resync of watchers")

cmd.MarkFlagDirname("data-dir")
cmd.MarkFlagDirname("wal-dir")
Expand All @@ -100,7 +102,7 @@ func SnapshotStatusCommandFunc(cmd *cobra.Command, args []string) {

func snapshotRestoreCommandFunc(_ *cobra.Command, args []string) {
SnapshotRestoreCommandFunc(restoreCluster, restoreClusterToken, restoreDataDir, restoreWalDir,
restorePeerURLs, restoreName, skipHashCheck, args)
restorePeerURLs, restoreName, skipHashCheck, revisionBump, args)
}

func SnapshotRestoreCommandFunc(restoreCluster string,
Expand All @@ -110,6 +112,7 @@ func SnapshotRestoreCommandFunc(restoreCluster string,
restorePeerURLs string,
restoreName string,
skipHashCheck bool,
revisionBump int64,
args []string) {
if len(args) != 1 {
err := fmt.Errorf("snapshot restore requires exactly one argument")
Expand Down Expand Up @@ -138,6 +141,7 @@ func SnapshotRestoreCommandFunc(restoreCluster string,
InitialCluster: restoreCluster,
InitialClusterToken: restoreClusterToken,
SkipHashCheck: skipHashCheck,
RevisionBump: revisionBump,
}); err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
Expand Down
11 changes: 11 additions & 0 deletions etcdutl/snapshot/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@ import (

type revision struct {
main int64
sep byte
sub int64
}

// The binary representation of the revision is 17 bytes:
//
// [8 bytes: main] [1 byte: sep] [8 bytes: sub]
func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sep: bytes[8],
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}

func revToBytes(bytes []byte, rev revision) {
binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main))
bytes[8] = rev.sep
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}
54 changes: 54 additions & 0 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -203,6 +204,8 @@ type RestoreConfig struct {
// SkipHashCheck is "true" to ignore snapshot integrity hash value
// (required if copied from data directory).
SkipHashCheck bool

RevisionBump int64
}

// Restore restores a new etcd data directory from given snapshot file.
Expand Down Expand Up @@ -265,6 +268,11 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
if err = s.saveDB(); err != nil {
return err
}

if err = s.revBump(cfg.RevisionBump); err != nil {
return err
}

hardstate, err := s.saveWALAndSnap()
if err != nil {
return err
Expand Down Expand Up @@ -311,6 +319,52 @@ func (s *v3Manager) saveDB() error {
return nil
}

// revBump increases the revision, modrevision, and version of every key to force watchers to resync.
//
// In order to ensure that watchers don't have a revision that's higher than the restored revision,
// 'amount' should be a very large number, > 1_000_000.
//
// Compaction should be run after this to further help invalidate watcher caches.
func (s *v3Manager) revBump(amount int64) error {
// Don't do anything if there's nothing to do
if amount == 0 {
s.lg.Info("skipping revision bump")
return nil
}

s.lg.Info("starting revision bump", zap.Int64("bump-amount", amount))
defer s.lg.Info("finished revision bump")

be := backend.NewDefaultBackend(s.lg, s.outDbPath())
defer be.Close()

tx := be.BatchTx()
tx.LockInsideApply()
defer tx.Unlock()

return tx.UnsafeForEach(schema.Key, func(k, v []byte) (err error) {
rev := bytesToRev(k)
rev.main += amount
revToBytes(k, rev)

val := mvccpb.KeyValue{}
if err = val.Unmarshal(v); err != nil {
return err
}

val.ModRevision += amount
val.Version += amount

v, err = val.Marshal()
if err != nil {
return err
}

tx.UnsafeSeqPut(schema.Key, k, v)
return nil
})
}

func (s *v3Manager) copyAndVerifyDB() error {
srcf, ferr := os.Open(s.srcDbPath)
if ferr != nil {
Expand Down

0 comments on commit 9cd9656

Please sign in to comment.