From a3f8f47422fb4bfd93dedf4b661e92167bace901 Mon Sep 17 00:00:00 2001 From: Iwasaki Yudai Date: Fri, 16 Jun 2017 15:12:49 -0700 Subject: [PATCH] *: add Revision compactor --- compactor/compactor.go | 122 ++++++++---------------------------- compactor/compactor_test.go | 97 ---------------------------- compactor/periodic.go | 122 ++++++++++++++++++++++++++++++++++++ compactor/periodic_test.go | 117 ++++++++++++++++++++++++++++++++++ compactor/revision.go | 106 +++++++++++++++++++++++++++++++ compactor/revision_test.go | 117 ++++++++++++++++++++++++++++++++++ embed/config.go | 1 + embed/etcd.go | 1 + etcdmain/config.go | 3 +- etcdmain/help.go | 4 +- etcdserver/config.go | 1 + etcdserver/server.go | 9 ++- 12 files changed, 503 insertions(+), 197 deletions(-) create mode 100644 compactor/periodic.go create mode 100644 compactor/periodic_test.go create mode 100644 compactor/revision.go create mode 100644 compactor/revision_test.go diff --git a/compactor/compactor.go b/compactor/compactor.go index 270287072d8..e25fd7e4174 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -15,14 +15,13 @@ package compactor import ( - "sync" + "fmt" "time" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/mvcc" "github.com/coreos/pkg/capnslog" - "github.com/jonboulle/clockwork" "golang.org/x/net/context" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) var ( @@ -32,8 +31,24 @@ var ( const ( checkCompactionInterval = 5 * time.Minute executeCompactionInterval = time.Hour + + ModePeriodic = "periodic" + ModeRevision = "revision" ) +// Compactor purges old log from the storage periodically. +type Compactor interface { + // Run starts the main loop of the compactor in background. + // Use Stop() to halt the loop and release the resource. + Run() + // Stop halts the main loop of the compactor. + Stop() + // Pause temporally suspend the compactor not to run compaction. Resume() to unpose. + Pause() + // Resume restarts the compactor suspended by Pause(). + Resume() +} + type Compactable interface { Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) } @@ -42,96 +57,13 @@ type RevGetter interface { Rev() int64 } -// Periodic compacts the log by purging revisions older than -// the configured retention time. Compaction happens hourly. -type Periodic struct { - clock clockwork.Clock - periodInHour int - - rg RevGetter - c Compactable - - revs []int64 - ctx context.Context - cancel context.CancelFunc - - mu sync.Mutex - paused bool -} - -func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic { - return &Periodic{ - clock: clockwork.NewRealClock(), - periodInHour: h, - rg: rg, - c: c, - } -} - -func (t *Periodic) Run() { - t.ctx, t.cancel = context.WithCancel(context.Background()) - t.revs = make([]int64, 0) - clock := t.clock - - go func() { - last := clock.Now() - for { - t.revs = append(t.revs, t.rg.Rev()) - select { - case <-t.ctx.Done(): - return - case <-clock.After(checkCompactionInterval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() - if p { - continue - } - } - - if clock.Now().Sub(last) < executeCompactionInterval { - continue - } - - rev, remaining := t.getRev(t.periodInHour) - if rev < 0 { - continue - } - - plog.Noticef("Starting auto-compaction at revision %d", rev) - _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) - if err == nil || err == mvcc.ErrCompacted { - t.revs = remaining - last = clock.Now() - plog.Noticef("Finished auto-compaction at revision %d", rev) - } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) - plog.Noticef("Retry after %v", checkCompactionInterval) - } - } - }() -} - -func (t *Periodic) Stop() { - t.cancel() -} - -func (t *Periodic) Pause() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = true -} - -func (t *Periodic) Resume() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = false -} - -func (t *Periodic) getRev(h int) (int64, []int64) { - i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) - if i < 0 { - return -1, t.revs +func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) { + switch mode { + case ModePeriodic: + return NewPeriodic(retention, rg, c), nil + case ModeRevision: + return NewRevision(int64(retention), rg, c), nil + default: + return nil, fmt.Errorf("unsupported compaction mode %s", mode) } - return t.revs[i], t.revs[i+1:] } diff --git a/compactor/compactor_test.go b/compactor/compactor_test.go index ccc3a8091b9..5d806e98ff0 100644 --- a/compactor/compactor_test.go +++ b/compactor/compactor_test.go @@ -15,108 +15,11 @@ package compactor import ( - "reflect" - "testing" - "time" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" - "github.com/jonboulle/clockwork" "golang.org/x/net/context" ) -func TestPeriodic(t *testing.T) { - retentionHours := 2 - - fc := clockwork.NewFakeClock() - rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} - compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := &Periodic{ - clock: fc, - periodInHour: retentionHours, - rg: rg, - c: compactable, - } - - tb.Run() - defer tb.Stop() - - n := int(time.Hour / checkCompactionInterval) - // collect 5 hours of revisions - for i := 0; i < 5; i++ { - // advance one hour, one revision for each interval - for j := 0; j < n; j++ { - rg.Wait(1) - fc.Advance(checkCompactionInterval) - } - - // compaction doesn't happen til 2 hours elapses - if i+1 < retentionHours { - continue - } - - a, err := compactable.Wait(1) - if err != nil { - t.Fatal(err) - } - expectedRevision := int64(1 + (i+1)*n - retentionHours*n) - if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { - t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) - } - } - - // unblock the rev getter, so we can stop the compactor routine. - _, err := rg.Wait(1) - if err != nil { - t.Fatal(err) - } -} - -func TestPeriodicPause(t *testing.T) { - fc := clockwork.NewFakeClock() - compactable := &fakeCompactable{testutil.NewRecorderStream()} - rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} - tb := &Periodic{ - clock: fc, - periodInHour: 1, - rg: rg, - c: compactable, - } - - tb.Run() - tb.Pause() - - // tb will collect 3 hours of revisions but not compact since paused - n := int(time.Hour / checkCompactionInterval) - for i := 0; i < 3*n; i++ { - rg.Wait(1) - fc.Advance(checkCompactionInterval) - } - // tb ends up waiting for the clock - - select { - case a := <-compactable.Chan(): - t.Fatalf("unexpected action %v", a) - case <-time.After(10 * time.Millisecond): - } - - // tb resumes to being blocked on the clock - tb.Resume() - - // unblock clock, will kick off a compaction at hour 3:05 - rg.Wait(1) - fc.Advance(checkCompactionInterval) - a, err := compactable.Wait(1) - if err != nil { - t.Fatal(err) - } - // compact the revision from hour 2:05 - wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} - if !reflect.DeepEqual(a[0].Params[0], wreq) { - t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) - } -} - type fakeCompactable struct { testutil.Recorder } diff --git a/compactor/periodic.go b/compactor/periodic.go new file mode 100644 index 00000000000..7eb7cf7a8c4 --- /dev/null +++ b/compactor/periodic.go @@ -0,0 +1,122 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compactor + +import ( + "sync" + "time" + + "github.com/jonboulle/clockwork" + "golang.org/x/net/context" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc" +) + +// Periodic compacts the log by purging revisions older than +// the configured retention time. Compaction happens hourly. +type Periodic struct { + clock clockwork.Clock + periodInHour int + + rg RevGetter + c Compactable + + revs []int64 + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + paused bool +} + +// NewPeriodic creates a new instance of Periodic compactor that purges +// the log older than h hours. +func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic { + return &Periodic{ + clock: clockwork.NewRealClock(), + periodInHour: h, + rg: rg, + c: c, + } +} + +func (t *Periodic) Run() { + t.ctx, t.cancel = context.WithCancel(context.Background()) + t.revs = make([]int64, 0) + clock := t.clock + + go func() { + last := clock.Now() + for { + t.revs = append(t.revs, t.rg.Rev()) + select { + case <-t.ctx.Done(): + return + case <-clock.After(checkCompactionInterval): + t.mu.Lock() + p := t.paused + t.mu.Unlock() + if p { + continue + } + } + + if clock.Now().Sub(last) < executeCompactionInterval { + continue + } + + rev, remaining := t.getRev(t.periodInHour) + if rev < 0 { + continue + } + + plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour) + _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) + if err == nil || err == mvcc.ErrCompacted { + t.revs = remaining + last = clock.Now() + plog.Noticef("Finished auto-compaction at revision %d", rev) + } else { + plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) + plog.Noticef("Retry after %v", checkCompactionInterval) + } + } + }() +} + +func (t *Periodic) Stop() { + t.cancel() +} + +func (t *Periodic) Pause() { + t.mu.Lock() + defer t.mu.Unlock() + t.paused = true +} + +func (t *Periodic) Resume() { + t.mu.Lock() + defer t.mu.Unlock() + t.paused = false +} + +func (t *Periodic) getRev(h int) (int64, []int64) { + i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) + if i < 0 { + return -1, t.revs + } + return t.revs[i], t.revs[i+1:] +} diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go new file mode 100644 index 00000000000..d0bb7f6eef3 --- /dev/null +++ b/compactor/periodic_test.go @@ -0,0 +1,117 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compactor + +import ( + "reflect" + "testing" + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/testutil" + "github.com/jonboulle/clockwork" +) + +func TestPeriodic(t *testing.T) { + retentionHours := 2 + + fc := clockwork.NewFakeClock() + rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := &Periodic{ + clock: fc, + periodInHour: retentionHours, + rg: rg, + c: compactable, + } + + tb.Run() + defer tb.Stop() + + n := int(time.Hour / checkCompactionInterval) + // collect 5 hours of revisions + for i := 0; i < 5; i++ { + // advance one hour, one revision for each interval + for j := 0; j < n; j++ { + rg.Wait(1) + fc.Advance(checkCompactionInterval) + } + + // compaction doesn't happen til 2 hours elapses + if i+1 < retentionHours { + continue + } + + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(1 + (i+1)*n - retentionHours*n) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + } + + // unblock the rev getter, so we can stop the compactor routine. + _, err := rg.Wait(1) + if err != nil { + t.Fatal(err) + } +} + +func TestPeriodicPause(t *testing.T) { + fc := clockwork.NewFakeClock() + compactable := &fakeCompactable{testutil.NewRecorderStream()} + rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + tb := &Periodic{ + clock: fc, + periodInHour: 1, + rg: rg, + c: compactable, + } + + tb.Run() + tb.Pause() + + // tb will collect 3 hours of revisions but not compact since paused + n := int(time.Hour / checkCompactionInterval) + for i := 0; i < 3*n; i++ { + rg.Wait(1) + fc.Advance(checkCompactionInterval) + } + // tb ends up waiting for the clock + + select { + case a := <-compactable.Chan(): + t.Fatalf("unexpected action %v", a) + case <-time.After(10 * time.Millisecond): + } + + // tb resumes to being blocked on the clock + tb.Resume() + + // unblock clock, will kick off a compaction at hour 3:05 + rg.Wait(1) + fc.Advance(checkCompactionInterval) + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + // compact the revision from hour 2:05 + wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} + if !reflect.DeepEqual(a[0].Params[0], wreq) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) + } +} diff --git a/compactor/revision.go b/compactor/revision.go new file mode 100644 index 00000000000..fd80c278dd1 --- /dev/null +++ b/compactor/revision.go @@ -0,0 +1,106 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compactor + +import ( + "sync" + + "github.com/jonboulle/clockwork" + "golang.org/x/net/context" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc" +) + +// Revision compacts the log by purging revisions older than +// the configured reivison number. Compaction happens every 5 minutes. +type Revision struct { + clock clockwork.Clock + retention int64 + + rg RevGetter + c Compactable + + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + paused bool +} + +// NewRevision creates a new instance of Revisonal compactor that purges +// the log older than retention revisions from the current revision. +func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision { + return &Revision{ + clock: clockwork.NewRealClock(), + retention: retention, + rg: rg, + c: c, + } +} + +func (t *Revision) Run() { + t.ctx, t.cancel = context.WithCancel(context.Background()) + clock := t.clock + previous := int64(0) + + go func() { + for { + select { + case <-t.ctx.Done(): + return + case <-clock.After(checkCompactionInterval): + t.mu.Lock() + p := t.paused + t.mu.Unlock() + if p { + continue + } + } + + rev := t.rg.Rev() - t.retention + + if rev <= 0 || rev == previous { + continue + } + + plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention) + _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) + if err == nil || err == mvcc.ErrCompacted { + previous = rev + plog.Noticef("Finished auto-compaction at revision %d", rev) + } else { + plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) + plog.Noticef("Retry after %v", checkCompactionInterval) + } + } + }() +} + +func (t *Revision) Stop() { + t.cancel() +} + +func (t *Revision) Pause() { + t.mu.Lock() + defer t.mu.Unlock() + t.paused = true +} + +func (t *Revision) Resume() { + t.mu.Lock() + defer t.mu.Unlock() + t.paused = false +} diff --git a/compactor/revision_test.go b/compactor/revision_test.go new file mode 100644 index 00000000000..39098397844 --- /dev/null +++ b/compactor/revision_test.go @@ -0,0 +1,117 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compactor + +import ( + "reflect" + "testing" + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/testutil" + "github.com/jonboulle/clockwork" +) + +func TestRevision(t *testing.T) { + fc := clockwork.NewFakeClock() + rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := &Revision{ + clock: fc, + retention: 10, + rg: rg, + c: compactable, + } + + tb.Run() + defer tb.Stop() + + fc.Advance(checkCompactionInterval) + rg.Wait(1) + // nothing happens + + rg.rev = 99 // will be 100 + expectedRevision := int64(90) + fc.Advance(checkCompactionInterval) + rg.Wait(1) + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + + // skip the same revision + rg.rev = 99 // will be 100 + expectedRevision = int64(90) + rg.Wait(1) + // nothing happens + + rg.rev = 199 // will be 200 + expectedRevision = int64(190) + fc.Advance(checkCompactionInterval) + rg.Wait(1) + a, err = compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } +} + +func TestRevisionPause(t *testing.T) { + fc := clockwork.NewFakeClock() + compactable := &fakeCompactable{testutil.NewRecorderStream()} + rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100 + tb := &Revision{ + clock: fc, + retention: 10, + rg: rg, + c: compactable, + } + + tb.Run() + tb.Pause() + + // tb will collect 3 hours of revisions but not compact since paused + n := int(time.Hour / checkCompactionInterval) + for i := 0; i < 3*n; i++ { + fc.Advance(checkCompactionInterval) + } + // tb ends up waiting for the clock + + select { + case a := <-compactable.Chan(): + t.Fatalf("unexpected action %v", a) + case <-time.After(10 * time.Millisecond): + } + + // tb resumes to being blocked on the clock + tb.Resume() + + // unblock clock, will kick off a compaction at hour 3:05 + fc.Advance(checkCompactionInterval) + rg.Wait(1) + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + wreq := &pb.CompactionRequest{Revision: int64(90)} + if !reflect.DeepEqual(a[0].Params[0], wreq) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) + } +} diff --git a/embed/config.go b/embed/config.go index 44acc2bf2c7..80410312230 100644 --- a/embed/config.go +++ b/embed/config.go @@ -80,6 +80,7 @@ type Config struct { Name string `json:"name"` SnapCount uint64 `json:"snapshot-count"` AutoCompactionRetention int `json:"auto-compaction-retention"` + AutoCompactionMode string `json:"auto-compaction-mode"` // TickMs is the number of milliseconds between heartbeat ticks. // TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1). diff --git a/embed/etcd.go b/embed/etcd.go index b8e170f0681..2105dc3debe 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -138,6 +138,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { TickMs: cfg.TickMs, ElectionTicks: cfg.ElectionTicks(), AutoCompactionRetention: cfg.AutoCompactionRetention, + AutoCompactionMode: cfg.AutoCompactionMode, QuotaBackendBytes: cfg.QuotaBackendBytes, MaxTxnOps: cfg.MaxTxnOps, MaxRequestBytes: cfg.MaxRequestBytes, diff --git a/etcdmain/config.go b/etcdmain/config.go index 8f257e0ef9e..2aacfdf68d8 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -199,7 +199,8 @@ func newConfig() *config { // version fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.") - fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.") + fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.") + fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "Interpret 'auto-compaction-retention' as hours when 'periodic', as revision numbers when 'revision'.") // pprof profiler via HTTP fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"") diff --git a/etcdmain/help.go b/etcdmain/help.go index cbead2e80cd..07f1932e6b1 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -97,7 +97,9 @@ clustering flags: --strict-reconfig-check reject reconfiguration requests that would cause quorum loss. --auto-compaction-retention '0' - auto compaction retention in hour. 0 means disable auto compaction. + auto compaction retention length. 0 means disable auto compaction. + --auto-compaction-mode 'periodic' + 'periodic' means hours, 'revision' means revision numbers to retain by auto compaction --enable-v2 Accept etcd V2 client requests. diff --git a/etcdserver/config.go b/etcdserver/config.go index 7ab77d986c3..f6ed1f1bae7 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -53,6 +53,7 @@ type ServerConfig struct { BootstrapTimeout time.Duration AutoCompactionRetention int + AutoCompactionMode string QuotaBackendBytes int64 MaxTxnOps uint diff --git a/etcdserver/server.go b/etcdserver/server.go index ac18b881ec2..100d92bf34e 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -221,7 +221,7 @@ type EtcdServer struct { SyncTicker *time.Ticker // compactor is used to auto-compact the KV. - compactor *compactor.Periodic + compactor compactor.Compactor // peerRt used to send requests (version, lease) to peers. peerRt http.RoundTripper @@ -469,8 +469,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { return nil, err } srv.authStore = auth.NewAuthStore(srv.be, tp) - if h := cfg.AutoCompactionRetention; h != 0 { - srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) + if num := cfg.AutoCompactionRetention; num != 0 { + srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv) + if err != nil { + return nil, err + } srv.compactor.Run() }