Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: adjust interval for period <1-hour #9482

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 86 additions & 32 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,75 +61,129 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact
return t
}

// periodDivisor divides Periodic.period in into checkCompactInterval duration
const periodDivisor = 10

// Run runs periodic compactor.
func (t *Periodic) Run() {
interval := t.period / time.Duration(periodDivisor)
fetchInterval := t.getFetchInterval()
retryInterval := t.getRetryInterval()
retentions := int(t.period/fetchInterval) + 1 // number of revs to keep for t.period
notify := make(chan struct{}, 1)

// periodically updates t.revs and notify to the other goroutine
go func() {
initialWait := t.clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
rev := t.rg.Rev()
t.mu.Lock()
t.revs = append(t.revs, rev)
if len(t.revs) > retentions {
t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
}
t.mu.Unlock()

select {
case notify <- struct{}{}:
default:
// compaction can take time more than interval
}

select {
case <-t.ctx.Done():
return
case <-t.clock.After(interval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
case <-t.clock.After(fetchInterval):
}
}
}()

// wait up to initial given period
if t.clock.Now().Sub(initialWait) < t.period {
// run compaction triggered by the other goroutine thorough the notify channel
// or internal periodic retry
go func() {
var lastCompactedRev int64
for {
select {
case <-t.ctx.Done():
return
case <-notify:
// from the other goroutine
case <-t.clock.After(retryInterval):
// for retry
// when t.rev is not updated, this event will be ignored later,
// so we don't need to think about race with <-notify.
}

t.mu.Lock()
p := t.paused
rev := t.revs[0]
len := len(t.revs)
t.mu.Unlock()
if p {
continue
}

// it's too early to start working
if len != retentions {
continue
}

rev, remaining := t.getRev()
if rev < 0 {
// if t.revs is not updated, we can ignore the event.
// it's not the first time to try comapction in this interval.
if rev == lastCompactedRev {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
// move to next sliding window
t.revs = remaining
plog.Noticef("Finished auto-compaction at revision %d", rev)
lastCompactedRev = rev
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", interval)
plog.Noticef("Retry after %s", retryInterval)
}
}
}()
}

// Stop stops periodic compactor.
// if given compaction period x is <1-hour, compact every x duration.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
// if given compaction period x is >1-hour, compact every hour.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
func (t *Periodic) getFetchInterval() time.Duration {
itv := t.period
if itv > time.Hour {
itv = time.Hour
}
return itv
}

const retryDivisor = 10

func (t *Periodic) getRetryInterval() time.Duration {
itv := t.period / retryDivisor
// we don't want to too aggressive retries
// and also jump between 6-minute through 60-minute
if itv < (6 * time.Minute) { // t.period is less than hour
// if t.period is less than 6-minute,
// retry interval is t.period.
// if we divide byretryDivisor, it's too aggressive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/byretryDivisor/by retryDivisor/?

if t.period < 6*time.Minute {
itv = t.period
} else {
itv = 6 * time.Minute
}
}
return itv
}

func (t *Periodic) Stop() {
t.cancel()
}

// Pause pauses periodic compactor.
func (t *Periodic) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}

// Resume resumes periodic compactor.
func (t *Periodic) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = false
}

func (t *Periodic) getRev() (int64, []int64) {
i := len(t.revs) - periodDivisor
if i < 0 {
return -1, t.revs
}
return t.revs[i], t.revs[i+1:]
}
82 changes: 57 additions & 25 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/jonboulle/clockwork"
)

func TestPeriodic(t *testing.T) {
func TestPeriodicHourly(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour

Expand All @@ -36,31 +36,59 @@ func TestPeriodic(t *testing.T) {

tb.Run()
defer tb.Stop()
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
// simulate 5 hours worth of intervals.
for i := 0; i < n/retentionHours*5; i++ {
// simulate 5 hours

for i := 0; i < 5; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
fc.Advance(time.Hour)
// compaction doesn't happen til 2 hours elapses.
if i < n {
if i < retentionHours {
continue
}
// after 2 hours, compaction happens at every checkCompactInterval.
// after 2 hours, compaction happens at every interval.
// at i = 3, t.revs = [1(2h-ago,T=0h), 2(1h-ago,T=1h), 3(now,T=2h)] (len=3) (rev starts from 1)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(i + 1 - n)
expectedRevision := int64(i - 1)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}
}

func TestPeriodicMinutes(t *testing.T) {
retentionMinutes := 23
retentionDuration := time.Duration(retentionMinutes) * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()

// simulate 115 (23 * 5) minutes
for i := 0; i < 5; i++ {
rg.Wait(1)
fc.Advance(retentionDuration)

// notting happens at T=0
if i == 0 {
continue
}
// from T=23m (i=1), compaction happens at every interval
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(i)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}

// unblock the rev getter, so we can stop the compactor routine.
_, err := rg.Wait(1)
if err != nil {
t.Fatal(err)
}
}

Expand All @@ -75,13 +103,17 @@ func TestPeriodicPause(t *testing.T) {
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
for i := 0; i < 3*n; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
}
// tb ends up waiting for the clock
// T=0
rg.Wait(1) // t.revs = [1]
fc.Advance(time.Hour)
// T=1h
rg.Wait(1) // t.revs = [1, 2]
fc.Advance(time.Hour)
// T=2h
rg.Wait(1) // t.revs = [2, 3]
fc.Advance(time.Hour)
// T=3h
rg.Wait(1) // t.revs = [3, 4]

select {
case a := <-compactable.Chan():
Expand All @@ -92,15 +124,15 @@ func TestPeriodicPause(t *testing.T) {
// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:06
rg.Wait(1)
fc.Advance(checkCompactInterval)
// unblock clock, will kick off a compaction at T=3h6m by retry
fc.Advance(time.Minute * 6)
// T=3h6m
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
// compact the revision from hour 2:06
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
// compact the revision from T=3h
wreq := &pb.CompactionRequest{Revision: int64(3)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
}
Expand Down