diff --git a/e2e/cluster_test.go b/e2e/cluster_test.go index 7d3fe6a1ece..69896dd48d5 100644 --- a/e2e/cluster_test.go +++ b/e2e/cluster_test.go @@ -112,10 +112,11 @@ type etcdProcessClusterConfig struct { isClientAutoTLS bool isClientCRL bool - forceNewCluster bool - initialToken string - quotaBackendBytes int64 - noStrictReconfig bool + forceNewCluster bool + initialToken string + quotaBackendBytes int64 + noStrictReconfig bool + initialCorruptCheck bool } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -224,6 +225,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro if cfg.noStrictReconfig { args = append(args, "--strict-reconfig-check=false") } + if cfg.initialCorruptCheck { + args = append(args, "--experimental-initial-corrupt-check") + } var murl string if cfg.metricsURLScheme != "" { murl = (&url.URL{ diff --git a/e2e/ctl_v3_test.go b/e2e/ctl_v3_test.go index 28b88b76221..f16a14fc241 100644 --- a/e2e/ctl_v3_test.go +++ b/e2e/ctl_v3_test.go @@ -55,6 +55,7 @@ type ctlCtx struct { t *testing.T cfg etcdProcessClusterConfig quotaBackendBytes int64 + corruptFunc func(string) error noStrictReconfig bool epc *etcdProcessCluster @@ -69,6 +70,8 @@ type ctlCtx struct { user string pass string + initialCorruptCheck bool + // for compaction compactPhysical bool } @@ -105,6 +108,14 @@ func withCompactPhysical() ctlOption { return func(cx *ctlCtx) { cx.compactPhysical = true } } +func withInitialCorruptCheck() ctlOption { + return func(cx *ctlCtx) { cx.initialCorruptCheck = true } +} + +func withCorruptFunc(f func(string) error) ctlOption { + return func(cx *ctlCtx) { cx.corruptFunc = f } +} + func withNoStrictReconfig() ctlOption { return func(cx *ctlCtx) { cx.noStrictReconfig = true } } @@ -131,6 +142,9 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { ret.cfg.quotaBackendBytes = ret.quotaBackendBytes } ret.cfg.noStrictReconfig = ret.noStrictReconfig + if ret.initialCorruptCheck { + ret.cfg.initialCorruptCheck = ret.initialCorruptCheck + } epc, err := newEtcdProcessCluster(&ret.cfg) if err != nil { diff --git a/e2e/etcd_corrupt_test.go b/e2e/etcd_corrupt_test.go new file mode 100644 index 00000000000..a2bbb4c42b2 --- /dev/null +++ b/e2e/etcd_corrupt_test.go @@ -0,0 +1,129 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" + + bolt "github.com/coreos/bbolt" +) + +// TODO: test with embedded etcd in integration package + +func TestEtcdCorruptHash(t *testing.T) { + oldenv := os.Getenv("EXPECT_DEBUG") + defer os.Setenv("EXPECT_DEBUG", oldenv) + os.Setenv("EXPECT_DEBUG", "1") + + cfg := configNoTLS + + // trigger snapshot so that restart member can load peers from disk + cfg.snapCount = 3 + + testCtl(t, corruptTest, withQuorum(), + withCfg(cfg), + withInitialCorruptCheck(), + withCorruptFunc(corruptHash), + ) +} + +func corruptTest(cx ctlCtx) { + for i := 0; i < 10; i++ { + if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil { + if cx.dialTimeout > 0 && !isGRPCTimedout(err) { + cx.t.Fatalf("putTest ctlV3Put error (%v)", err) + } + } + } + // enough time for all nodes sync on the same data + time.Sleep(3 * time.Second) + + eps := cx.epc.EndpointsV3() + cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second}) + if err != nil { + cx.t.Fatal(err) + } + defer cli1.Close() + + sresp, err := cli1.Status(context.TODO(), eps[0]) + if err != nil { + cx.t.Fatal(err) + } + id0 := sresp.Header.GetMemberId() + + cx.epc.procs[0].Stop() + + // corrupt first member by modifying backend offline. + fp := filepath.Join(cx.epc.procs[0].Config().dataDirPath, "member", "snap", "db") + if err = cx.corruptFunc(fp); err != nil { + cx.t.Fatal(err) + } + + ep := cx.epc.procs[0] + proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...)) + if err != nil { + cx.t.Fatal(err) + } + defer proc.Stop() + + // restarting corrupted member should fail + waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) +} + +func corruptHash(fpath string) error { + db, derr := bolt.Open(fpath, os.ModePerm, &bolt.Options{}) + if derr != nil { + return derr + } + defer db.Close() + + return db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("key")) + if b == nil { + return errors.New("got nil bucket for 'key'") + } + keys, vals := [][]byte{}, [][]byte{} + c := b.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + keys = append(keys, k) + var kv mvccpb.KeyValue + if uerr := kv.Unmarshal(v); uerr != nil { + return uerr + } + kv.Key[0]++ + kv.Value[0]++ + v2, v2err := kv.Marshal() + if v2err != nil { + return v2err + } + vals = append(vals, v2) + } + for i := range keys { + if perr := b.Put(keys[i], vals[i]); perr != nil { + return perr + } + } + return nil + }) +} diff --git a/embed/config.go b/embed/config.go index 0aa3564ffa2..e2be12c9e29 100644 --- a/embed/config.go +++ b/embed/config.go @@ -171,8 +171,9 @@ type Config struct { // Experimental flags - ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` - ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` + ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` + ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` } // configYAML holds the config suitable for yaml parsing diff --git a/embed/etcd.go b/embed/etcd.go index 94f3aca00ba..2b5cf537384 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -124,7 +124,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { token string ) + memberInitialized := true if !isMemberInitialized(cfg) { + memberInitialized = false urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd") if err != nil { return e, fmt.Errorf("error setting up initial cluster: %v", err) @@ -175,6 +177,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, AuthToken: cfg.AuthToken, + InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, } @@ -185,6 +188,16 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { // buffer channel so goroutines on closed connections won't wait forever e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) + // newly started member ("memberInitialized==false") + // does not need corruption check + if memberInitialized { + if err = e.Server.CheckInitialHashKV(); err != nil { + // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" + // (nothing to close since rafthttp transports have not been started) + e.Server = nil + return e, err + } + } e.Server.Start() if err = e.servePeers(); err != nil { diff --git a/etcdmain/config.go b/etcdmain/config.go index fd7b0d263c8..3ea2a6d5cd8 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -213,6 +213,7 @@ func newConfig() *config { fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.") // experimental + fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") // ignored diff --git a/etcdmain/help.go b/etcdmain/help.go index 87b9db1f1bd..5efcc6bade1 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -187,8 +187,10 @@ auth flags: Specify a v3 authentication token type and its options ('simple' or 'jwt'). experimental flags: + --experimental-initial-corrupt-check 'false' + enable to check data corruption before serving any client/peer traffic. --experimental-corrupt-check-time '0s' - duration of time between cluster corruption check passes. + duration of time between cluster corruption check passes. --experimental-enable-v2v3 '' serve v2 requests through the v3 backend under a given prefix. ` diff --git a/etcdserver/config.go b/etcdserver/config.go index 3f6ec6f2ae3..80d8219436e 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -66,7 +66,10 @@ type ServerConfig struct { AuthToken string - CorruptCheckTime time.Duration + // InitialCorruptCheck is true to check data corruption on boot + // before serving any peer/client traffic. + InitialCorruptCheck bool + CorruptCheckTime time.Duration } // VerifyBootstrap sanity-checks the initial config for bootstrap case diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 035c62f041c..d998ec59020 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -16,14 +16,61 @@ package etcdserver import ( "context" + "fmt" "time" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/pkg/types" ) +// CheckInitialHashKV compares initial hash values with its peers +// before serving any peer/client traffic. Only mismatch when hashes +// are different at requested revision, with same compact revision. +func (s *EtcdServer) CheckInitialHashKV() error { + if !s.Cfg.InitialCorruptCheck { + return nil + } + + plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout()) + h, rev, crev, err := s.kv.HashByRev(0) + if err != nil { + return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err) + } + peers := s.getPeerHashKVs(rev) + mismatch := 0 + for _, p := range peers { + if p.resp != nil { + peerID := types.ID(p.resp.Header.MemberId) + if h != p.resp.Hash { + if crev == p.resp.CompactRevision { + plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev) + mismatch++ + } else { + plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev) + } + } + continue + } + if p.err != nil { + switch p.err { + case rpctypes.ErrFutureRev: + plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) + case rpctypes.ErrCompacted: + plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) + } + } + } + if mismatch > 0 { + return fmt.Errorf("%s found data inconsistency with peers", s.ID()) + } + + plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID()) + return nil +} + func (s *EtcdServer) monitorKVHash() { t := s.Cfg.CorruptCheckTime if t == 0 { @@ -50,7 +97,7 @@ func (s *EtcdServer) checkHashKV() error { if err != nil { plog.Fatalf("failed to hash kv store (%v)", err) } - resps := s.getPeerHashKVs(rev) + peers := s.getPeerHashKVs(rev) ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) err = s.linearizableReadNotify(ctx) @@ -86,24 +133,27 @@ func (s *EtcdServer) checkHashKV() error { mismatch(uint64(s.ID())) } - for _, resp := range resps { - id := resp.Header.MemberId + for _, p := range peers { + if p.resp == nil { + continue + } + id := p.resp.Header.MemberId // leader expects follower's latest revision less than or equal to leader's - if resp.Header.Revision > rev2 { + if p.resp.Header.Revision > rev2 { plog.Warningf( "revision %d from member %v, expected at most %d", - resp.Header.Revision, + p.resp.Header.Revision, types.ID(id), rev2) mismatch(id) } // leader expects follower's latest compact revision less than or equal to leader's - if resp.CompactRevision > crev2 { + if p.resp.CompactRevision > crev2 { plog.Warningf( "compact revision %d from member %v, expected at most %d", - resp.CompactRevision, + p.resp.CompactRevision, types.ID(id), crev2, ) @@ -111,10 +161,10 @@ func (s *EtcdServer) checkHashKV() error { } // follower's compact revision is leader's old one, then hashes must match - if resp.CompactRevision == crev && resp.Hash != h { + if p.resp.CompactRevision == crev && p.resp.Hash != h { plog.Warningf( "hash %d at revision %d from member %v, expected hash %d", - resp.Hash, + p.resp.Hash, rev, types.ID(id), h, @@ -125,36 +175,53 @@ func (s *EtcdServer) checkHashKV() error { return nil } -func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) { - for _, m := range s.cluster.Members() { +type peerHashKVResp struct { + resp *clientv3.HashKVResponse + err error + eps []string +} + +func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { + // TODO: handle the case when "s.cluster.Members" have not + // been populated (e.g. no snapshot to load from disk) + mbs := s.cluster.Members() + pURLs := make([][]string, len(mbs)) + for _, m := range mbs { if m.ID == s.ID() { continue } + pURLs = append(pURLs, m.PeerURLs) + } + for _, purls := range pURLs { + if len(purls) == 0 { + continue + } cli, cerr := clientv3.New(clientv3.Config{ DialTimeout: s.Cfg.ReqTimeout(), - Endpoints: m.PeerURLs, + Endpoints: purls, }) if cerr != nil { - plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error()) + plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error()) continue } respsLen := len(resps) for _, c := range cli.Endpoints() { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - resp, herr := cli.HashKV(ctx, c, rev) + var resp *clientv3.HashKVResponse + resp, cerr = cli.HashKV(ctx, c, rev) cancel() - if herr == nil { - cerr = herr - resps = append(resps, resp) + if cerr == nil { + resps = append(resps, &peerHashKVResp{resp: resp}) break } + plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev) } cli.Close() if respsLen == len(resps) { - plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr) + resps = append(resps, &peerHashKVResp{err: cerr, eps: purls}) } } return resps