Skip to content

Commit

Permalink
server: remove KeyValue unmarshal failure panic and send corrupt alar…
Browse files Browse the repository at this point in the history
…m request instead
  • Loading branch information
wilsonwang371 committed Oct 29, 2021
1 parent 6656181 commit 2437616
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 22 deletions.
13 changes: 7 additions & 6 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,13 @@ type ServerConfig struct {

BootstrapTimeout time.Duration

AutoCompactionRetention time.Duration
AutoCompactionMode string
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64
MaxTxnOps uint
AutoCompactionRetention time.Duration
AutoCompactionMode string
CompactionBatchLimit int
CompactionSleepInterval time.Duration
WatchableStoreErrorReporting bool
QuotaBackendBytes int64
MaxTxnOps uint

// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint
Expand Down
3 changes: 3 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ type Config struct {
// ExperimentalWarningUnaryRequestDuration is the time duration after which a warning is generated if applying
// unary request takes more time than this value.
ExperimentalWarningUnaryRequestDuration time.Duration `json:"experimental-warning-unary-request-duration"`
// ExperimentalWatchableStoreErrorReporting enables WatchableStore to report error in case of failure. Normally we
// treat these failures as panics.
ExperimentalWatchableStoreErrorReporting bool `json:"experimental-watchable-store-error-reporting"`

// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchableStoreErrorReporting: cfg.ExperimentalWatchableStoreErrorReporting,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
fs.BoolVar(&cfg.ec.ExperimentalWatchableStoreErrorReporting, "experimental-watchable-store-error-reporting", false, "Enable the WatchableStore to report error instead of panic.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
22 changes: 14 additions & 8 deletions server/etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (s *EtcdServer) monitorKVHash() {
select {
case <-s.stopping:
return
case <-s.kv.ErrorC():
s.sendCorruptionAlarmRequest(uint64(s.ID()))
case <-time.After(t):
}
if !s.isLeader() {
Expand All @@ -148,6 +150,17 @@ func (s *EtcdServer) monitorKVHash() {
}
}

func (s *EtcdServer) sendCorruptionAlarmRequest(id uint64) {
a := &pb.AlarmRequest{
MemberID: id,
Action: pb.AlarmRequest_ACTIVATE,
Alarm: pb.AlarmType_CORRUPT,
}
s.GoAttach(func() {
s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
})
}

func (s *EtcdServer) checkHashKV() error {
lg := s.Logger()

Expand Down Expand Up @@ -175,14 +188,7 @@ func (s *EtcdServer) checkHashKV() error {
return
}
alarmed = true
a := &pb.AlarmRequest{
MemberID: id,
Action: pb.AlarmRequest_ACTIVATE,
Alarm: pb.AlarmType_CORRUPT,
}
s.GoAttach(func() {
s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
})
s.sendCorruptionAlarmRequest(id)
}

if h2 != h && rev2 == rev && crev == crev2 {
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}

mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
WatchableStoreErrorReporting: cfg.WatchableStoreErrorReporting,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

Expand Down
3 changes: 3 additions & 0 deletions server/storage/mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,7 @@ type Watchable interface {
// NewWatchStream returns a WatchStream that can be used to
// watch events happened or happening on the KV.
NewWatchStream() WatchStream
// ErrorC returns an error channel which holds errors encountered
// during WatchableStore execution.
ErrorC() <-chan error
}
5 changes: 3 additions & 2 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ var defaultCompactBatchLimit = 1000
var minimumBatchInterval = 10 * time.Millisecond

type StoreConfig struct {
CompactionBatchLimit int
CompactionSleepInterval time.Duration
CompactionBatchLimit int
CompactionSleepInterval time.Duration
WatchableStoreErrorReporting bool
}

type store struct {
Expand Down
31 changes: 27 additions & 4 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package mvcc

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -64,6 +65,8 @@ type watchableStore struct {

stopc chan struct{}
wg sync.WaitGroup

errorc chan error
}

// cancelFunc updates unsynced and synced maps when running
Expand All @@ -85,6 +88,9 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}
if cfg.WatchableStoreErrorReporting {
s.errorc = make(chan error)
}
s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
if s.le != nil {
Expand All @@ -99,6 +105,9 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S

func (s *watchableStore) Close() error {
close(s.stopc)
if s.errorc != nil {
close(s.errorc)
}
s.wg.Wait()
return s.store.Close()
}
Expand All @@ -113,6 +122,11 @@ func (s *watchableStore) NewWatchStream() WatchStream {
}
}

// ErrorC can return nil result when WatchableStoreErrorReporting is not enabled
func (s *watchableStore) ErrorC() <-chan error {
return s.errorc
}

func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
wa := &watcher{
key: key,
Expand Down Expand Up @@ -356,7 +370,15 @@ func (s *watchableStore) syncWatchers() int {
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
tx.RUnlock()
evs := kvsToEvents(s.store.lg, wg, revs, vs)
evs, err := s.kvsToEvents(wg, revs, vs)
if err != nil {
if s.errorc != nil {
s.lg.Fatal("kvsToEvents failed", zap.Error(err))
s.errorc <- err
return 0
}
s.lg.Panic("failed to run kvsToEvents", zap.Error(err))
}

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
Expand Down Expand Up @@ -404,11 +426,12 @@ func (s *watchableStore) syncWatchers() int {
}

// kvsToEvents gets all events for the watchers from all key-value pairs
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
func (s *watchableStore) kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event, err error) {
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
s.store.lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
return nil, fmt.Errorf("rev: %d, value: %#v", revs[i], v)
}

if !wg.contains(string(kv.Key)) {
Expand All @@ -423,7 +446,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
}
evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
}
return evs
return evs, nil
}

// notify notifies the fact that given event at the given rev just happened to
Expand Down

0 comments on commit 2437616

Please sign in to comment.