From ce4d6575ed8077fa11c83936419400f73a836cc6 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Thu, 14 Sep 2017 15:28:32 -0700 Subject: [PATCH] *: support auto-compaction with finer granularity --- compactor/compactor.go | 5 ++- compactor/periodic.go | 74 ++++++++++++++------------------------ compactor/periodic_test.go | 10 +++--- embed/config.go | 2 +- embed/etcd.go | 17 ++++++++- etcdmain/config.go | 4 +-- etcdmain/help.go | 2 +- etcdserver/config.go | 2 +- 8 files changed, 55 insertions(+), 61 deletions(-) diff --git a/compactor/compactor.go b/compactor/compactor.go index 5a83d13f8333..c057225174cc 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -29,8 +29,7 @@ var ( ) const ( - checkCompactionInterval = 5 * time.Minute - executeCompactionInterval = time.Hour + checkCompactionInterval = 5 * time.Minute ModePeriodic = "periodic" ModeRevision = "revision" @@ -57,7 +56,7 @@ type RevGetter interface { Rev() int64 } -func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) { +func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) { switch mode { case ModePeriodic: return NewPeriodic(retention, rg, c), nil diff --git a/compactor/periodic.go b/compactor/periodic.go index 784cef7c1663..8ebb4ff153e4 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -26,72 +26,60 @@ import ( ) // Periodic compacts the log by purging revisions older than -// the configured retention time. Compaction happens hourly. +// the configured retention time. type Periodic struct { - clock clockwork.Clock - periodInHour int + clock clockwork.Clock + period time.Duration rg RevGetter c Compactable - revs []int64 ctx context.Context cancel context.CancelFunc - mu sync.Mutex + // mu protects paused + mu sync.RWMutex paused bool } // NewPeriodic creates a new instance of Periodic compactor that purges -// the log older than h hours. -func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic { +// the log older than h Duration. +func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { return &Periodic{ - clock: clockwork.NewRealClock(), - periodInHour: h, - rg: rg, - c: c, + clock: clockwork.NewRealClock(), + period: h, + rg: rg, + c: c, } } func (t *Periodic) Run() { t.ctx, t.cancel = context.WithCancel(context.Background()) - t.revs = make([]int64, 0) clock := t.clock + timer := clock.After(t.period) go func() { - last := clock.Now() for { - t.revs = append(t.revs, t.rg.Rev()) select { case <-t.ctx.Done(): return - case <-clock.After(checkCompactionInterval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() - if p { + case <-timer: + timer = clock.After(t.period) + t.mu.RLock() + if t.paused { + t.mu.RUnlock() continue } - } - - if clock.Now().Sub(last) < executeCompactionInterval { - continue - } - - rev, remaining := t.getRev(t.periodInHour) - if rev < 0 { - continue - } - - plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour) - _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) - if err == nil || err == mvcc.ErrCompacted { - t.revs = remaining - last = clock.Now() - plog.Noticef("Finished auto-compaction at revision %d", rev) - } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) - plog.Noticef("Retry after %v", checkCompactionInterval) + t.mu.RUnlock() + rev := t.rg.Rev() + plog.Noticef("Starting auto-compaction at revision %d (retention: %v )", rev, t.period) + _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) + if err == nil || err == mvcc.ErrCompacted { + plog.Noticef("Finished auto-compaction at revision %d", rev) + } else { + plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) + plog.Noticef("Retry after %v", t.period) + } } } }() @@ -112,11 +100,3 @@ func (t *Periodic) Resume() { defer t.mu.Unlock() t.paused = false } - -func (t *Periodic) getRev(h int) (int64, []int64) { - i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) - if i < 0 { - return -1, t.revs - } - return t.revs[i], t.revs[i+1:] -} diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index d0bb7f6eef3c..14b751d41c52 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -25,16 +25,16 @@ import ( ) func TestPeriodic(t *testing.T) { - retentionHours := 2 + retentionHours := time.Duration(2 * time.Hour) fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} tb := &Periodic{ - clock: fc, - periodInHour: retentionHours, - rg: rg, - c: compactable, + clock: fc, + period: retentionHours, + rg: rg, + c: compactable, } tb.Run() diff --git a/embed/config.go b/embed/config.go index 8d429cb0a5c1..deae3229a2f5 100644 --- a/embed/config.go +++ b/embed/config.go @@ -81,7 +81,7 @@ type Config struct { MaxWalFiles uint `json:"max-wals"` Name string `json:"name"` SnapCount uint64 `json:"snapshot-count"` - AutoCompactionRetention int `json:"auto-compaction-retention"` + AutoCompactionRetention string `json:"auto-compaction-retention"` AutoCompactionMode string `json:"auto-compaction-mode"` // TickMs is the number of milliseconds between heartbeat ticks. diff --git a/embed/etcd.go b/embed/etcd.go index 6ceb55b79307..bae8c443a8d7 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "net/url" + "strconv" "sync" "time" @@ -127,6 +128,20 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { } } + var ( + autoCompactionRetention time.Duration + h int + ) + h, err = strconv.Atoi(cfg.AutoCompactionRetention) + if err == nil { + autoCompactionRetention = time.Duration(int64(h)) * time.Hour + } else { + autoCompactionRetention, err = time.ParseDuration(cfg.AutoCompactionRetention) + if err != nil { + return nil, fmt.Errorf("error parsing AutoCompactionRetention: %v", err) + } + } + srvcfg := etcdserver.ServerConfig{ Name: cfg.Name, ClientURLs: cfg.ACUrls, @@ -145,7 +160,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { PeerTLSInfo: cfg.PeerTLSInfo, TickMs: cfg.TickMs, ElectionTicks: cfg.ElectionTicks(), - AutoCompactionRetention: cfg.AutoCompactionRetention, + AutoCompactionRetention: autoCompactionRetention, AutoCompactionMode: cfg.AutoCompactionMode, QuotaBackendBytes: cfg.QuotaBackendBytes, MaxTxnOps: cfg.MaxTxnOps, diff --git a/etcdmain/config.go b/etcdmain/config.go index 614112923632..73b059d2f801 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -196,8 +196,8 @@ func newConfig() *config { // version fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.") - fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.") - fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "Interpret 'auto-compaction-retention' as hours when 'periodic', as revision numbers when 'revision'.") + fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.") + fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "'periodic' means hours if an integer or a duration string otherwise, 'revision' means revision numbers to retain by auto compaction") // pprof profiler via HTTP fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"") diff --git a/etcdmain/help.go b/etcdmain/help.go index 37a670abdd5a..58cd9cab9fde 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -99,7 +99,7 @@ clustering flags: --auto-compaction-retention '0' auto compaction retention length. 0 means disable auto compaction. --auto-compaction-mode 'periodic' - 'periodic' means hours, 'revision' means revision numbers to retain by auto compaction + 'periodic' means hours if an integer or a duration string otherwise, 'revision' means revision numbers to retain by auto compaction. --enable-v2 Accept etcd V2 client requests. diff --git a/etcdserver/config.go b/etcdserver/config.go index cac4fb010602..e35095493c13 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -51,7 +51,7 @@ type ServerConfig struct { ElectionTicks int BootstrapTimeout time.Duration - AutoCompactionRetention int + AutoCompactionRetention time.Duration AutoCompactionMode string QuotaBackendBytes int64 MaxTxnOps uint