From 24376160da5b8de5601930da621d81ae5e37da36 Mon Sep 17 00:00:00 2001 From: Wilson Wang Date: Fri, 29 Oct 2021 16:44:56 -0700 Subject: [PATCH] server: remove KeyValue unmarshal failure panic and send corrupt alarm request instead --- server/config/config.go | 13 ++++++----- server/embed/config.go | 3 +++ server/embed/etcd.go | 1 + server/etcdmain/config.go | 1 + server/etcdserver/corrupt.go | 22 +++++++++++------- server/etcdserver/server.go | 5 +++-- server/storage/mvcc/kv.go | 3 +++ server/storage/mvcc/kvstore.go | 5 +++-- server/storage/mvcc/watchable_store.go | 31 ++++++++++++++++++++++---- 9 files changed, 62 insertions(+), 22 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index c9e7d3aa3f01..5b6c2019cf50 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -111,12 +111,13 @@ type ServerConfig struct { BootstrapTimeout time.Duration - AutoCompactionRetention time.Duration - AutoCompactionMode string - CompactionBatchLimit int - CompactionSleepInterval time.Duration - QuotaBackendBytes int64 - MaxTxnOps uint + AutoCompactionRetention time.Duration + AutoCompactionMode string + CompactionBatchLimit int + CompactionSleepInterval time.Duration + WatchableStoreErrorReporting bool + QuotaBackendBytes int64 + MaxTxnOps uint // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint diff --git a/server/embed/config.go b/server/embed/config.go index abcdead5a8ba..6b8d2a1aa7e3 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -329,6 +329,9 @@ type Config struct { // ExperimentalWarningUnaryRequestDuration is the time duration after which a warning is generated if applying // unary request takes more time than this value. ExperimentalWarningUnaryRequestDuration time.Duration `json:"experimental-warning-unary-request-duration"` + // ExperimentalWatchableStoreErrorReporting enables WatchableStore to report error in case of failure. Normally we + // treat these failures as panics. + ExperimentalWatchableStoreErrorReporting bool `json:"experimental-watchable-store-error-reporting"` // ForceNewCluster starts a new cluster even if previously started; unsafe. ForceNewCluster bool `json:"force-new-cluster"` diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 418199037cd0..781bbf06c1c0 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -212,6 +212,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval, + WatchableStoreErrorReporting: cfg.ExperimentalWatchableStoreErrorReporting, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 2c3f135d88bc..3d77761cd310 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -291,6 +291,7 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.") fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.") fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.") + fs.BoolVar(&cfg.ec.ExperimentalWatchableStoreErrorReporting, "experimental-watchable-store-error-reporting", false, "Enable the WatchableStore to report error instead of panic.") // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 81288d5cbaff..3a4f2134346e 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -137,6 +137,8 @@ func (s *EtcdServer) monitorKVHash() { select { case <-s.stopping: return + case <-s.kv.ErrorC(): + s.sendCorruptionAlarmRequest(uint64(s.ID())) case <-time.After(t): } if !s.isLeader() { @@ -148,6 +150,17 @@ func (s *EtcdServer) monitorKVHash() { } } +func (s *EtcdServer) sendCorruptionAlarmRequest(id uint64) { + a := &pb.AlarmRequest{ + MemberID: id, + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_CORRUPT, + } + s.GoAttach(func() { + s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) + }) +} + func (s *EtcdServer) checkHashKV() error { lg := s.Logger() @@ -175,14 +188,7 @@ func (s *EtcdServer) checkHashKV() error { return } alarmed = true - a := &pb.AlarmRequest{ - MemberID: id, - Action: pb.AlarmRequest_ACTIVATE, - Alarm: pb.AlarmType_CORRUPT, - } - s.GoAttach(func() { - s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) - }) + s.sendCorruptionAlarmRequest(id) } if h2 != h && rev2 == rev && crev == crev2 { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2de2477ec7f6..af81cdeb50bb 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -362,8 +362,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } mvccStoreConfig := mvcc.StoreConfig{ - CompactionBatchLimit: cfg.CompactionBatchLimit, - CompactionSleepInterval: cfg.CompactionSleepInterval, + CompactionBatchLimit: cfg.CompactionBatchLimit, + CompactionSleepInterval: cfg.CompactionSleepInterval, + WatchableStoreErrorReporting: cfg.WatchableStoreErrorReporting, } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) diff --git a/server/storage/mvcc/kv.go b/server/storage/mvcc/kv.go index 10c4821b1463..9210f02931d1 100644 --- a/server/storage/mvcc/kv.go +++ b/server/storage/mvcc/kv.go @@ -147,4 +147,7 @@ type Watchable interface { // NewWatchStream returns a WatchStream that can be used to // watch events happened or happening on the KV. NewWatchStream() WatchStream + // ErrorC returns an error channel which holds errors encountered + // during WatchableStore execution. + ErrorC() <-chan error } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 3c0a60bef680..9f990f026b33 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -52,8 +52,9 @@ var defaultCompactBatchLimit = 1000 var minimumBatchInterval = 10 * time.Millisecond type StoreConfig struct { - CompactionBatchLimit int - CompactionSleepInterval time.Duration + CompactionBatchLimit int + CompactionSleepInterval time.Duration + WatchableStoreErrorReporting bool } type store struct { diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index c2a8832db1d0..a21129c1be1e 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -15,6 +15,7 @@ package mvcc import ( + "fmt" "sync" "time" @@ -64,6 +65,8 @@ type watchableStore struct { stopc chan struct{} wg sync.WaitGroup + + errorc chan error } // cancelFunc updates unsynced and synced maps when running @@ -85,6 +88,9 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S synced: newWatcherGroup(), stopc: make(chan struct{}), } + if cfg.WatchableStoreErrorReporting { + s.errorc = make(chan error) + } s.store.ReadView = &readView{s} s.store.WriteView = &writeView{s} if s.le != nil { @@ -99,6 +105,9 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S func (s *watchableStore) Close() error { close(s.stopc) + if s.errorc != nil { + close(s.errorc) + } s.wg.Wait() return s.store.Close() } @@ -113,6 +122,11 @@ func (s *watchableStore) NewWatchStream() WatchStream { } } +// ErrorC can return nil result when WatchableStoreErrorReporting is not enabled +func (s *watchableStore) ErrorC() <-chan error { + return s.errorc +} + func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) { wa := &watcher{ key: key, @@ -356,7 +370,15 @@ func (s *watchableStore) syncWatchers() int { tx.RLock() revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) tx.RUnlock() - evs := kvsToEvents(s.store.lg, wg, revs, vs) + evs, err := s.kvsToEvents(wg, revs, vs) + if err != nil { + if s.errorc != nil { + s.lg.Fatal("kvsToEvents failed", zap.Error(err)) + s.errorc <- err + return 0 + } + s.lg.Panic("failed to run kvsToEvents", zap.Error(err)) + } victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) @@ -404,11 +426,12 @@ func (s *watchableStore) syncWatchers() int { } // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { +func (s *watchableStore) kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event, err error) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { - lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) + s.store.lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) + return nil, fmt.Errorf("rev: %d, value: %#v", revs[i], v) } if !wg.contains(string(kv.Key)) { @@ -423,7 +446,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m } evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty}) } - return evs + return evs, nil } // notify notifies the fact that given event at the given rev just happened to