Skip to content

Commit

Permalink
Multi-cursor: fix scheduled queue look-ahead (#3364)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Sep 12, 2022
1 parent f84e4c9 commit 642d035
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 81 deletions.
12 changes: 7 additions & 5 deletions service/history/queues/action_pending_task_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) {
a.init()
a.gatherStatistics(readers)
a.findSliceToClear(
a.monitor.GetTotalPendingTaskCount(),
int(float64(a.attributes.CiriticalPendingTaskCount)*targetLoadFactor),
int(float64(a.attributes.CiriticalPendingTaskCount) * targetLoadFactor),
)
a.splitAndClearSlice(readers, readerGroup)
}
Expand Down Expand Up @@ -135,12 +134,13 @@ func (a *actionQueuePendingTask) gatherStatistics(
}

func (a *actionQueuePendingTask) findSliceToClear(
currentPendingTasks int,
targetPendingTasks int,
) {
currentPendingTasks := 0
// order namespace by # of pending tasks
namespaceIDs := make([]namespace.ID, 0, len(a.tasksPerNamespace))
for namespaceID := range a.tasksPerNamespace {
for namespaceID, namespacePendingTasks := range a.tasksPerNamespace {
currentPendingTasks += namespacePendingTasks
namespaceIDs = append(namespaceIDs, namespaceID)
}
pq := collection.NewPriorityQueueWithItems(
Expand All @@ -163,7 +163,9 @@ func (a *actionQueuePendingTask) findSliceToClear(
sliceList = sliceList[1:]
a.slicesPerNamespace[namespaceID] = sliceList

a.tasksPerNamespace[namespaceID] -= a.pendingTaskPerNamespacePerSlice[sliceToClear][namespaceID]
tasksCleared := a.pendingTaskPerNamespacePerSlice[sliceToClear][namespaceID]
a.tasksPerNamespace[namespaceID] -= tasksCleared
currentPendingTasks -= tasksCleared
if a.tasksPerNamespace[namespaceID] > 0 {
pq.Add(namespaceID)
}
Expand Down
7 changes: 7 additions & 0 deletions service/history/queues/action_reader_stuck.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) {
s = left
}

if len(remaining) == 0 {
// s can't split by both min and max of stuck range,
// and stuck range does not contain the range of s,
// the only possible case is s is not overlapping with stuck range at all.
return nil, false
}

splitSlices = append(splitSlices, s)
return remaining, true
})
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/action_slice_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (a *actionSliceCount) pickCompactCandidates(
numSliceToCompact int,
) map[Slice]struct{} {
slices.SortFunc(candidates, func(this, that compactCandidate) bool {
return this.distance.CompareTo(this.distance) < 0
return this.distance.CompareTo(that.distance) < 0
})

sliceToCompact := make(map[Slice]struct{}, numSliceToCompact)
Expand Down
22 changes: 1 addition & 21 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,10 @@ type (
exclusiveDeletionHighWatermark tasks.Key
nonReadableScope Scope
readerGroup *ReaderGroup
lastPollTime time.Time
nextForceNewSliceTime time.Time

checkpointRetrier backoff.Retrier
checkpointTimer *time.Timer
pollTimer *time.Timer

alertCh <-chan *Alert
}
Expand Down Expand Up @@ -192,7 +190,7 @@ func newQueueBase(
return NewReader(
readerID,
slices,
&options.ReaderOptions,
&readerOptions,
scheduler,
rescheduler,
timeSource,
Expand Down Expand Up @@ -263,10 +261,6 @@ func (p *queueBase) Start() {
p.rescheduler.Start()
p.readerGroup.Start()

p.pollTimer = time.NewTimer(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
p.checkpointTimer = time.NewTimer(backoff.JitDuration(
p.options.CheckpointInterval(),
p.options.CheckpointIntervalJitterCoefficient(),
Expand All @@ -277,7 +271,6 @@ func (p *queueBase) Stop() {
p.monitor.Close()
p.readerGroup.Stop()
p.rescheduler.Stop()
p.pollTimer.Stop()
p.checkpointTimer.Stop()
}

Expand All @@ -299,17 +292,6 @@ func (p *queueBase) UnlockTaskProcessing() {
// no-op
}

func (p *queueBase) processPollTimer() {
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(p.timeSource.Now()) {
p.processNewRange()
}

p.pollTimer.Reset(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
}

func (p *queueBase) processNewRange() {
newMaxKey := p.shard.GetQueueExclusiveHighReadWatermark(
p.category,
Expand All @@ -320,8 +302,6 @@ func (p *queueBase) processNewRange() {
return
}

p.lastPollTime = p.timeSource.Now()

var newReadScope Scope
newReadScope, p.nonReadableScope = p.nonReadableScope.SplitByRange(newMaxKey)
newSlice := NewSlice(
Expand Down
20 changes: 18 additions & 2 deletions service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -145,14 +146,20 @@ func (p *immediateQueue) NotifyNewTasks(_ string, tasks []tasks.Task) {
func (p *immediateQueue) processEventLoop() {
defer p.shutdownWG.Done()

pollTimer := time.NewTimer(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
defer pollTimer.Stop()

for {
select {
case <-p.shutdownCh:
return
case <-p.notifyCh:
p.processNewRange()
case <-p.pollTimer.C:
p.processPollTimer()
case <-pollTimer.C:
p.processPollTimer(pollTimer)
case <-p.checkpointTimer.C:
p.checkpoint()
case alert := <-p.alertCh:
Expand All @@ -161,6 +168,15 @@ func (p *immediateQueue) processEventLoop() {
}
}

func (p *immediateQueue) processPollTimer(pollTimer *time.Timer) {
p.processNewRange()

pollTimer.Reset(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
}

func (p *immediateQueue) notify() {
select {
case p.notifyCh <- struct{}{}:
Expand Down
19 changes: 10 additions & 9 deletions service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -193,8 +194,6 @@ func (p *scheduledQueue) processEventLoop() {
p.processNewTime()
case <-p.timerGate.FireChan():
p.processNewRange()
case <-p.pollTimer.C:
p.processPollTimer()
case <-p.checkpointTimer.C:
p.checkpoint()
case alert := <-p.alertCh:
Expand Down Expand Up @@ -232,14 +231,12 @@ func (p *scheduledQueue) processNewRange() {
p.lookAheadTask()
}

func (p *scheduledQueue) processPollTimer() {
p.queueBase.processPollTimer()
p.lookAheadTask()
}

func (p *scheduledQueue) lookAheadTask() {
lookAheadMinTime := p.nonReadableScope.Range.InclusiveMin.FireTime
lookAheadMaxTime := lookAheadMinTime.Add(p.options.MaxPollInterval())
lookAheadMaxTime := lookAheadMinTime.Add(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))

ctx, cancel := newQueueIOContext()
defer cancel()
Expand All @@ -264,7 +261,11 @@ func (p *scheduledQueue) lookAheadTask() {
p.timerGate.Update(response.Tasks[0].GetKey().FireTime)
}

// no look ahead task, wait for max poll interval or new task notification
// no look ahead task, next loading will be triggerred at the end of the current
// look ahead window or when new task notification comes
// NOTE: with this we don't need a separate max poll timer, loading will be triggerred
// every maxPollInterval + jitter.
p.timerGate.Update(lookAheadMaxTime)
}

// IsTimeExpired checks if the testing time is equal or before
Expand Down
52 changes: 25 additions & 27 deletions service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package queues

import (
"context"
"errors"
"math/rand"
"testing"
Expand Down Expand Up @@ -196,11 +197,13 @@ func (s *scheduledQueueSuite) TestLookAheadTask_NoLookAheadTask() {
lookAheadRange, _ := s.setupLookAheadMock(false)
s.scheduledQueue.lookAheadTask()

timerGate.SetCurrentTime(lookAheadRange.InclusiveMin.FireTime.Add(testQueueOptions.MaxPollInterval()))
timerGate.SetCurrentTime(lookAheadRange.InclusiveMin.FireTime.Add(time.Duration(
(1 + testQueueOptions.MaxPollIntervalJitterCoefficient()) * float64(testQueueOptions.MaxPollInterval()),
)))
select {
case <-s.scheduledQueue.timerGate.FireChan():
s.Fail("timer gate should not fire")
default:
s.Fail("timer gate should fire at the end of look ahead window")
}
}

Expand Down Expand Up @@ -238,20 +241,6 @@ func (s *scheduledQueueSuite) TestProcessNewRange_LookAheadPerformed() {
s.scheduledQueue.processNewRange()
}

func (s *scheduledQueueSuite) TestProcessPollTimer_LookAheadPerformed() {
timerGate := timer.NewRemoteGate()
s.scheduledQueue.timerGate = timerGate
s.scheduledQueue.pollTimer = time.NewTimer(time.Second)

// test if look ahead if performed after processing poll timer
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetHistoryTasksResponse{
Tasks: []tasks.Task{},
NextPageToken: nil,
}, nil).Times(1)

s.scheduledQueue.processPollTimer()
}

func (s *scheduledQueueSuite) setupLookAheadMock(
hasLookAheadTask bool,
) (lookAheadRange Range, lookAheadTask *tasks.MockTask) {
Expand All @@ -269,17 +258,26 @@ func (s *scheduledQueueSuite) setupLookAheadMock(
loadedTasks = append(loadedTasks, lookAheadTask)
}

s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryTimer,
InclusiveMinTaskKey: lookAheadRange.InclusiveMin,
ExclusiveMaxTaskKey: lookAheadRange.ExclusiveMax,
BatchSize: 1,
NextPageToken: nil,
}).Return(&persistence.GetHistoryTasksResponse{
Tasks: loadedTasks,
NextPageToken: nil,
}, nil).Times(1)
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error) {
s.Equal(s.mockShard.GetShardID(), request.ShardID)
s.Equal(tasks.CategoryTimer, request.TaskCategory)
s.Equal(lookAheadRange.InclusiveMin, request.InclusiveMinTaskKey)
s.Equal(1, request.BatchSize)
s.Nil(request.NextPageToken)

s.Equal(lookAheadRange.ExclusiveMax.TaskID, request.ExclusiveMaxTaskKey.TaskID)
fireTimeDifference := request.ExclusiveMaxTaskKey.FireTime.Sub(lookAheadRange.ExclusiveMax.FireTime)
if fireTimeDifference < 0 {
fireTimeDifference = -fireTimeDifference
}
maxAllowedFireTimeDifference := time.Duration(float64(testQueueOptions.MaxPollInterval()) * testQueueOptions.MaxPollIntervalJitterCoefficient())
s.LessOrEqual(fireTimeDifference, maxAllowedFireTimeDifference)

return &persistence.GetHistoryTasksResponse{
Tasks: loadedTasks,
NextPageToken: nil,
}, nil
}).Times(1)

return lookAheadRange, lookAheadTask
}
1 change: 1 addition & 0 deletions service/history/queues/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ func (r *ReaderImpl) ShrinkSlices() {
slice := element.Value.(Slice)
slice.ShrinkScope()
if scope := slice.Scope(); scope.IsEmpty() {
r.monitor.RemoveSlice(slice)
r.slices.Remove(element)
}
}
Expand Down
37 changes: 21 additions & 16 deletions service/history/queues/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,28 +188,18 @@ func (s *SliceImpl) MergeWithSlice(slice Slice) []Slice {

mergedSlices := make([]Slice, 0, 3)
currentLeftSlice, currentRightSlice := s.splitByRange(incomingSlice.Scope().Range.InclusiveMin)
if !currentLeftSlice.scope.IsEmpty() {
mergedSlices = append(mergedSlices, currentLeftSlice)
}
mergedSlices = appendMergedSlice(mergedSlices, currentLeftSlice)

if currentRightMax := currentRightSlice.Scope().Range.ExclusiveMax; incomingSlice.CanSplitByRange(currentRightMax) {
leftIncomingSlice, rightIncomingSlice := incomingSlice.splitByRange(currentRightMax)
mergedMidSlice := currentRightSlice.mergeByPredicate(leftIncomingSlice)
if !mergedMidSlice.scope.IsEmpty() {
mergedSlices = append(mergedSlices, mergedMidSlice)
}
if !rightIncomingSlice.scope.IsEmpty() {
mergedSlices = append(mergedSlices, rightIncomingSlice)
}
mergedSlices = appendMergedSlice(mergedSlices, mergedMidSlice)
mergedSlices = appendMergedSlice(mergedSlices, rightIncomingSlice)
} else {
currentMidSlice, currentRightSlice := currentRightSlice.splitByRange(incomingSlice.Scope().Range.ExclusiveMax)
mergedMidSlice := currentMidSlice.mergeByPredicate(incomingSlice)
if !mergedMidSlice.scope.IsEmpty() {
mergedSlices = append(mergedSlices, mergedMidSlice)
}
if !currentRightSlice.scope.IsEmpty() {
mergedSlices = append(mergedSlices, currentRightSlice)
}
mergedSlices = appendMergedSlice(mergedSlices, mergedMidSlice)
mergedSlices = appendMergedSlice(mergedSlices, currentRightSlice)
}

return mergedSlices
Expand Down Expand Up @@ -458,14 +448,29 @@ func (s *SliceImpl) newSlice(
iterators []Iterator,
tracker *executableTracker,
) *SliceImpl {
return &SliceImpl{
slice := &SliceImpl{
paginationFnProvider: s.paginationFnProvider,
executableInitializer: s.executableInitializer,
scope: scope,
iterators: iterators,
executableTracker: tracker,
monitor: s.monitor,
}
slice.monitor.SetSlicePendingTaskCount(slice, len(slice.executableTracker.pendingExecutables))

return slice
}

func appendMergedSlice(
mergedSlices []Slice,
s *SliceImpl,
) []Slice {
if s.scope.IsEmpty() {
s.destroy()
return mergedSlices
}

return append(mergedSlices, s)
}

func validateIteratorsOrderedDisjoint(
Expand Down

0 comments on commit 642d035

Please sign in to comment.