Skip to content

Commit

Permalink
Multi-cursor: pending task count monitoring & mitigation (#3275)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Aug 31, 2022
1 parent d1afec2 commit 5ab8ab8
Show file tree
Hide file tree
Showing 21 changed files with 539 additions and 42 deletions.
21 changes: 20 additions & 1 deletion common/collection/priorityQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,31 @@ type (
)

// NewPriorityQueue create a new priority queue
func NewPriorityQueue[T any](compareLess func(this T, other T) bool) Queue[T] {
func NewPriorityQueue[T any](
compareLess func(this T, other T) bool,
) Queue[T] {
return &priorityQueueImpl[T]{
compareLess: compareLess,
}
}

// NewPriorityQueueWithItems creats a new priority queue
// with the provided list of items.
// PriorityQueue will take ownership of the passed in items,
// so caller should stop modifying it.
// The complexity is O(n) where n is the number of items
func NewPriorityQueueWithItems[T any](
compareLess func(this T, other T) bool,
items []T,
) Queue[T] {
pq := &priorityQueueImpl[T]{
compareLess: compareLess,
items: items,
}
heap.Init(pq)
return pq
}

// Peek returns the top item of the priority queue
func (pq *priorityQueueImpl[T]) Peek() T {
if pq.IsEmpty() {
Expand Down
24 changes: 24 additions & 0 deletions common/collection/priorityQueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,30 @@ func (s *PriorityQueueSuite) SetupTest() {
s.pq = NewPriorityQueue(testPriorityQueueItemCompareLess)
}

func (s *PriorityQueueSuite) TestNewPriorityQueueWithItems() {
items := []*testPriorityQueueItem{
{value: 10},
{value: 3},
{value: 5},
{value: 4},
{value: 1},
{value: 16},
{value: -10},
}
s.pq = NewPriorityQueueWithItems(
testPriorityQueueItemCompareLess,
items,
)

expected := []int{-10, 1, 3, 4, 5, 10, 16}
result := []int{}

for !s.pq.IsEmpty() {
result = append(result, s.pq.Remove().value)
}
s.Equal(expected, result)
}

func (s *PriorityQueueSuite) TestInsertAndPop() {
s.pq.Add(&testPriorityQueueItem{10})
s.pq.Add(&testPriorityQueueItem{3})
Expand Down
9 changes: 9 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,22 @@ const (
// StandbyTaskMissingEventsDiscardDelay is the amount of time standby cluster's will wait (if events are missing)
// before discarding the task
StandbyTaskMissingEventsDiscardDelay = "history.standbyTaskMissingEventsDiscardDelay"
// QueuePendingTaskCriticalCount is the max number of pending task in one queue
// before triggering queue slice splitting and unloading
QueuePendingTaskCriticalCount = "history.queuePendingTaskCriticalCount"
// QueueReaderStuckCriticalAttempts is the max number of task loading attempts for a certain task range
// before that task range is split into a separate slice to unblock loading for later range.
// currently only work for scheduled queues and the task range is 1s.
QueueReaderStuckCriticalAttempts = "history.queueReaderStuckCriticalAttempts"
// QueueCriticalSlicesCount is the max number of slices in one queue
// before force compacting slices
QueueCriticalSlicesCount = "history.queueCriticalSlicesCount"
// QueuePendingTaskMaxCount is the max number of task pending tasks in one queue before stop
// loading new tasks into memory. While QueuePendingTaskCriticalCount won't stop task loading
// for the entire queue but only trigger a queue action to unload tasks. Ideally this max count
// limit should not be hit and task unloading should happen once critical count is exceeded. But
// since queue action is async, we need this hard limit.
QueuePendingTaskMaxCount = "history.queuePendingTasksMaxCount"
// QueueMaxReaderCount is the max number of readers in one multi-cursor queue
QueueMaxReaderCount = "history.queueMaxReaderCount"
// TimerTaskBatchSize is batch size for timer processor to process tasks
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ type Config struct {
StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFn
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFn

QueuePendingTaskCriticalCount dynamicconfig.IntPropertyFn
QueueReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn
QueueCriticalSlicesCount dynamicconfig.IntPropertyFn
QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
QueueMaxReaderCount dynamicconfig.IntPropertyFn

// TimerQueueProcessor settings
Expand Down Expand Up @@ -309,8 +311,10 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute),

QueuePendingTaskCriticalCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskCriticalCount, 9000),
QueueReaderStuckCriticalAttempts: dc.GetIntProperty(dynamicconfig.QueueReaderStuckCriticalAttempts, 2),
QueueCriticalSlicesCount: dc.GetIntProperty(dynamicconfig.QueueCriticalSlicesCount, 50),
QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000),
QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2),

TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100),
Expand Down
230 changes: 230 additions & 0 deletions service/history/queues/action_pending_task_count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import (
"time"

"golang.org/x/exp/slices"

"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/tasks"
)

const (
targetLoadFactor = 0.8
clearSliceThrottleDuration = 10 * time.Second
)

type (
actionQueuePendingTask struct {
attributes *AlertAttributesQueuePendingTaskCount
monitor Monitor
maxReaderCount int
completionFn actionCompletionFn

// state of the action, used when running the action
tasksPerNamespace map[namespace.ID]int
pendingTaskPerNamespacePerSlice map[Slice]map[namespace.ID]int
slicesPerNamespace map[namespace.ID][]Slice
namespaceToClearPerSlice map[Slice][]namespace.ID
}
)

func newQueuePendingTaskAction(
attributes *AlertAttributesQueuePendingTaskCount,
monitor Monitor,
maxReaderCount int,
completionFn actionCompletionFn,
) Action {
return &actionQueuePendingTask{
attributes: attributes,
monitor: monitor,
maxReaderCount: maxReaderCount,
completionFn: completionFn,
}
}

func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) {
defer a.completionFn()

// first check if the alert is still valid
if a.monitor.GetTotalPendingTaskCount() <= a.attributes.CiriticalPendingTaskCount {
return
}

// then try to shrink existing slices, which may reduce pending task count
readers := readerGroup.Readers()
if a.tryShrinkSlice(readers) {
return
}

// have to unload pending tasks to reduce pending task count
a.init()
a.gatherStatistics(readers)
a.findSliceToClear(
a.monitor.GetTotalPendingTaskCount(),
int(float64(a.attributes.CiriticalPendingTaskCount)*targetLoadFactor),
)
a.splitAndClearSlice(readers, readerGroup)
}

func (a *actionQueuePendingTask) tryShrinkSlice(
readers map[int32]Reader,
) bool {
for _, reader := range readers {
reader.ShrinkSlices()
}
return a.monitor.GetTotalPendingTaskCount() <= a.attributes.CiriticalPendingTaskCount
}

func (a *actionQueuePendingTask) init() {
a.tasksPerNamespace = make(map[namespace.ID]int)
a.pendingTaskPerNamespacePerSlice = make(map[Slice]map[namespace.ID]int)
a.slicesPerNamespace = make(map[namespace.ID][]Slice)
a.namespaceToClearPerSlice = make(map[Slice][]namespace.ID)
}

func (a *actionQueuePendingTask) gatherStatistics(
readers map[int32]Reader,
) {
// gather statistic for
// 1. total # of pending tasks per namespace
// 2. for each slice, # of pending taks per namespace
// 3. for each namespace, a list of slices that contains pending tasks from that namespace,
// reversely ordered by slice range. Upon unloading, first unload newer slices.
for _, reader := range readers {
reader.WalkSlices(func(s Slice) {
a.pendingTaskPerNamespacePerSlice[s] = s.TaskStats().PendingPerNamespace
for namespaceID, pendingTaskCount := range a.pendingTaskPerNamespacePerSlice[s] {
a.tasksPerNamespace[namespaceID] += pendingTaskCount
a.slicesPerNamespace[namespaceID] = append(a.slicesPerNamespace[namespaceID], s)
}
})
}
for _, sliceList := range a.slicesPerNamespace {
slices.SortFunc(sliceList, func(this, that Slice) bool {
thisMin := this.Scope().Range.InclusiveMin
thatMin := that.Scope().Range.InclusiveMin
return thisMin.CompareTo(thatMin) > 0
})
}
}

func (a *actionQueuePendingTask) findSliceToClear(
currentPendingTasks int,
targetPendingTasks int,
) {
// order namespace by # of pending tasks
namespaceIDs := make([]namespace.ID, 0, len(a.tasksPerNamespace))
for namespaceID := range a.tasksPerNamespace {
namespaceIDs = append(namespaceIDs, namespaceID)
}
pq := collection.NewPriorityQueueWithItems(
func(this, that namespace.ID) bool {
return a.tasksPerNamespace[this] > a.tasksPerNamespace[that]
},
namespaceIDs,
)

for currentPendingTasks > targetPendingTasks && !pq.IsEmpty() {
namespaceID := pq.Remove()

sliceList := a.slicesPerNamespace[namespaceID]
if len(sliceList) == 0 {
panic("Found namespace with non-zero pending task count but has no correspoding Slice")
}

// pop the first slice in the list
sliceToClear := sliceList[0]
sliceList = sliceList[1:]
a.slicesPerNamespace[namespaceID] = sliceList

a.tasksPerNamespace[namespaceID] -= a.pendingTaskPerNamespacePerSlice[sliceToClear][namespaceID]
if a.tasksPerNamespace[namespaceID] > 0 {
pq.Add(namespaceID)
}

a.namespaceToClearPerSlice[sliceToClear] = append(a.namespaceToClearPerSlice[sliceToClear], namespaceID)
}
}

func (a *actionQueuePendingTask) splitAndClearSlice(
readers map[int32]Reader,
readerGroup *ReaderGroup,
) {
for readerID, reader := range readers {
if readerID == int32(a.maxReaderCount)-1 {
// we can't do further split, have to clear entire slice
cleared := false
reader.ClearSlices(func(s Slice) bool {
_, ok := a.namespaceToClearPerSlice[s]
cleared = cleared || ok
return ok
})
if cleared {
reader.Pause(clearSliceThrottleDuration)
}
continue
}

var splitSlices []Slice
reader.SplitSlices(func(s Slice) ([]Slice, bool) {
namespaceIDs, ok := a.namespaceToClearPerSlice[s]
if !ok {
return nil, false
}

namespaceIDStrings := make([]string, 0, len(namespaceIDs))
for _, namespaceID := range namespaceIDs {
namespaceIDStrings = append(namespaceIDStrings, namespaceID.String())
}

split, remain := s.SplitByPredicate(tasks.NewNamespacePredicate(namespaceIDStrings))
split.Clear()
splitSlices = append(splitSlices, split)
return []Slice{remain}, true
})

if len(splitSlices) == 0 {
continue
}

nextReader, ok := readerGroup.ReaderByID(readerID + 1)
if ok {
nextReader.MergeSlices(splitSlices...)
} else {
nextReader = readerGroup.NewReader(readerID+1, splitSlices...)
}
nextReader.Pause(clearSliceThrottleDuration)
}

// it's likely that after a split, slice range can be shrinked
// as tasks blocking the min key from moving have been moved to another slice/reader
for _, reader := range readers {
reader.ShrinkSlices()
}
}
4 changes: 0 additions & 4 deletions service/history/queues/action_slice_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import (
"golang.org/x/exp/slices"
)

const (
targetLoadFactor = 0.8
)

type (
actionSliceCount struct {
attributes *AlertAttributesSlicesCount
Expand Down
18 changes: 13 additions & 5 deletions service/history/queues/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,19 @@ import (
type (
// Alert is created by a Monitor when some statistics of the Queue is abnormal
Alert struct {
AlertType AlertType
AlertAttributesReaderStuck *AlertAttributesReaderStuck
AlertAttributesSliceCount *AlertAttributesSlicesCount
AlertType AlertType
AlertAttributesQueuePendingTaskCount *AlertAttributesQueuePendingTaskCount
AlertAttributesReaderStuck *AlertAttributesReaderStuck
AlertAttributesSliceCount *AlertAttributesSlicesCount
}

AlertType int

AlertAttributesQueuePendingTaskCount struct {
CurrentPendingTaskCount int
CiriticalPendingTaskCount int
}

AlertAttributesReaderStuck struct {
ReaderID int32
CurrentWatermark tasks.Key
Expand All @@ -53,15 +59,17 @@ type (

const (
AlertTypeUnspecified AlertType = iota
AlertTypeQueuePendingTaskCount
AlertTypeReaderStuck
AlertTypeSliceCount
)

var (
alertTypeNames = map[AlertType]string{
0: "Unspecified",
1: "ReaderStuck",
2: "SliceCount",
1: "QueuePendingTaskCount",
2: "ReaderStuck",
3: "SliceCount",
}
)

Expand Down
Loading

0 comments on commit 5ab8ab8

Please sign in to comment.