Skip to content

Commit

Permalink
Multi-cursor: slice predicate action (#3312)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Sep 7, 2022
1 parent e4bf183 commit 2739f2f
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 4 deletions.
114 changes: 114 additions & 0 deletions service/history/queues/action_slice_predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import "go.temporal.io/server/service/history/tasks"

const (
moveSliceDefaultReaderMinPendingTaskCount = 50
moveSliceDefaultReaderMinSliceCount = 3
)

type (
// slicePredicateAction will move all slices in default reader
// with non-universal predicate to the next reader so that upon restart
// task loading for those slices won't blocking loading for other slices
// in the default reader.
//
// Slice.ShrinkScope() will shrink its predicate when there's few
// namespaces left. But those slices may still in the default reader.
// If there are many such slices in the default reader, then upon restart
// task loading for other namespace will be blocked. So we need to move those
// slices to a different reader.
//
// NOTE: When there's no restart/shard movement, this movement won't affect
// anything, as slice with non-universal predicate must have already loaded
// all tasks into memory.
slicePredicateAction struct {
monitor Monitor
maxReaderCount int
}
)

func newSlicePredicateAction(
monitor Monitor,
maxReaderCount int,
) Action {
return &slicePredicateAction{
monitor: monitor,
maxReaderCount: maxReaderCount,
}
}

func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) {
reader, ok := readerGroup.ReaderByID(DefaultReaderId)
if !ok {
return
}

if a.maxReaderCount <= DefaultReaderId+1 {
return
}

sliceCount := a.monitor.GetSliceCount(DefaultReaderId)
pendingTasks := 0
hasNonUniversalPredicate := false
reader.WalkSlices(func(s Slice) {
pendingTasks += a.monitor.GetSlicePendingTaskCount(s)

if !tasks.IsUniverisalPredicate(s.Scope().Predicate) {
hasNonUniversalPredicate = true
}
})

// only move slices when either default reader slice count or
// pending task count is high
if !hasNonUniversalPredicate ||
(pendingTasks < moveSliceDefaultReaderMinPendingTaskCount &&
sliceCount < moveSliceDefaultReaderMinSliceCount) {
return
}

var moveSlices []Slice
reader.SplitSlices(func(s Slice) (remaining []Slice, split bool) {
if tasks.IsUniverisalPredicate(s.Scope().Predicate) {
return []Slice{s}, false
}

moveSlices = append(moveSlices, s)
return nil, true
})

if len(moveSlices) == 0 {
return
}

nextReader, ok := readerGroup.ReaderByID(DefaultReaderId + 1)
if !ok {
readerGroup.NewReader(DefaultReaderId+1, moveSlices...)
} else {
nextReader.MergeSlices(moveSlices...)
}
}
9 changes: 8 additions & 1 deletion service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,17 @@ func (p *queueBase) processNewRange() {
}

func (p *queueBase) checkpoint() {
for _, reader := range p.readerGroup.Readers() {
reader.ShrinkSlices()
}
// Run slicePredicateAction to move slices with non-universal predicate to non-default reader
// so that upon shard reload, task loading for those slices won't block other slices in the default
// reader.
newSlicePredicateAction(p.monitor, p.mitigator.maxReaderCount()).Run(p.readerGroup)

readerScopes := make(map[int32][]Scope)
newExclusiveDeletionHighWatermark := p.nonReadableScope.Range.InclusiveMin
for readerID, reader := range p.readerGroup.Readers() {
reader.ShrinkSlices()
scopes := reader.Scopes()

if len(scopes) == 0 {
Expand Down
78 changes: 76 additions & 2 deletions service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (s *queueBaseSuite) TestProcessNewRange() {
s.True(base.nonReadableScope.Range.Equals(NewRange(scopes[0].Range.ExclusiveMax, tasks.MaximumKey)))
}

func (s *queueBaseSuite) TestCompleteTaskAndPersistState_WithPendingTasks() {
func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() {
scopeMinKey := tasks.MaximumKey
readerScopes := map[int32][]Scope{}
for _, readerID := range []int32{DefaultReaderId, 2, 3} {
Expand Down Expand Up @@ -436,7 +436,7 @@ func (s *queueBaseSuite) TestCompleteTaskAndPersistState_WithPendingTasks() {
s.True(scopeMinKey.CompareTo(base.exclusiveDeletionHighWatermark) == 0)
}

func (s *queueBaseSuite) TestCompleteTaskAndPersistState_NoPendingTasks() {
func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() {
exclusiveReaderHighWatermark := NewRandomKey()
queueState := &queueState{
readerScopes: map[int32][]Scope{},
Expand Down Expand Up @@ -509,3 +509,77 @@ func (s *queueBaseSuite) TestCompleteTaskAndPersistState_NoPendingTasks() {

s.True(exclusiveReaderHighWatermark.CompareTo(base.exclusiveDeletionHighWatermark) == 0)
}

func (s *queueBaseSuite) TestCheckPoint_MoveSlices() {
exclusiveReaderHighWatermark := tasks.MaximumKey
scopes := NewRandomScopes(3)
scopes[0].Predicate = tasks.NewNamespacePredicate([]string{uuid.New()})
scopes[2].Predicate = tasks.NewTypePredicate([]enumsspb.TaskType{enumsspb.TASK_TYPE_ACTIVITY_RETRY_TIMER})
initialQueueState := &queueState{
readerScopes: map[int32][]Scope{
DefaultReaderId: scopes,
},
exclusiveReaderHighWatermark: exclusiveReaderHighWatermark,
}
initialPersistenceState := ToPersistenceQueueState(initialQueueState)

expectedQueueState := &queueState{
readerScopes: map[int32][]Scope{
DefaultReaderId: {scopes[1]},
DefaultReaderId + 1: {scopes[0], scopes[2]},
},
exclusiveReaderHighWatermark: exclusiveReaderHighWatermark,
}
expectedPersistenceState := ToPersistenceQueueState(expectedQueueState)

mockShard := shard.NewTestContext(
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 0,
RangeId: 10,
QueueStates: map[int32]*persistencespb.QueueState{
tasks.CategoryIDTimer: initialPersistenceState,
},
},
},
s.config,
)
mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()

base := newQueueBase(
mockShard,
tasks.CategoryTimer,
nil,
s.mockScheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
s.rateLimiter,
s.logger,
s.metricsHandler,
)
base.checkpointTimer = time.NewTimer(s.options.CheckpointInterval())
s.True(scopes[0].Range.InclusiveMin.CompareTo(base.exclusiveDeletionHighWatermark) == 0)

// set to a smaller value so that delete will be triggered
base.exclusiveDeletionHighWatermark = tasks.MinimumKey

// manually set pending task count to trigger slice predicate action
base.monitor.SetSlicePendingTaskCount(&SliceImpl{}, 2*moveSliceDefaultReaderMinPendingTaskCount)

gomock.InOrder(
mockShard.Resource.ExecutionMgr.EXPECT().RangeCompleteHistoryTasks(gomock.Any(), gomock.Any()).Return(nil).Times(1),
mockShard.Resource.ShardMgr.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *persistence.UpdateShardRequest) error {
s.Equal(expectedPersistenceState, request.ShardInfo.QueueStates[tasks.CategoryIDTimer])
return nil
},
).Times(1),
)

base.checkpoint()

s.True(scopes[0].Range.InclusiveMin.CompareTo(base.exclusiveDeletionHighWatermark) == 0)
}
17 changes: 16 additions & 1 deletion service/history/tasks/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
package tasks

import (
"go.temporal.io/server/common/predicates"
"golang.org/x/exp/maps"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/predicates"
)

type (
Expand Down Expand Up @@ -150,6 +150,21 @@ func OrPredicates(a Predicate, b Predicate) Predicate {
return predicates.Or(a, b)
}

func IsUniverisalPredicate(p Predicate) bool {
_, ok := p.(*predicates.UniversalImpl[Task])
return ok
}

func IsNamespacePredicate(p Predicate) bool {
_, ok := p.(*NamespacePredicate)
return ok
}

func IsTypePredicate(p Predicate) bool {
_, ok := p.(*TypePredicate)
return ok
}

func intersect[K comparable](this, that map[K]struct{}) map[K]struct{} {
intersection := make(map[K]struct{})
for key := range this {
Expand Down

0 comments on commit 2739f2f

Please sign in to comment.