From 2739f2fdc8a91a0f73c071106ac78c4c734fbb95 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 6 Sep 2022 22:36:27 -0700 Subject: [PATCH] Multi-cursor: slice predicate action (#3312) --- .../history/queues/action_slice_predicate.go | 114 ++++++++++++++++++ service/history/queues/queue_base.go | 9 +- service/history/queues/queue_base_test.go | 78 +++++++++++- service/history/tasks/predicates.go | 17 ++- 4 files changed, 214 insertions(+), 4 deletions(-) create mode 100644 service/history/queues/action_slice_predicate.go diff --git a/service/history/queues/action_slice_predicate.go b/service/history/queues/action_slice_predicate.go new file mode 100644 index 00000000000..b9a22a78ed7 --- /dev/null +++ b/service/history/queues/action_slice_predicate.go @@ -0,0 +1,114 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package queues + +import "go.temporal.io/server/service/history/tasks" + +const ( + moveSliceDefaultReaderMinPendingTaskCount = 50 + moveSliceDefaultReaderMinSliceCount = 3 +) + +type ( + // slicePredicateAction will move all slices in default reader + // with non-universal predicate to the next reader so that upon restart + // task loading for those slices won't blocking loading for other slices + // in the default reader. + // + // Slice.ShrinkScope() will shrink its predicate when there's few + // namespaces left. But those slices may still in the default reader. + // If there are many such slices in the default reader, then upon restart + // task loading for other namespace will be blocked. So we need to move those + // slices to a different reader. + // + // NOTE: When there's no restart/shard movement, this movement won't affect + // anything, as slice with non-universal predicate must have already loaded + // all tasks into memory. + slicePredicateAction struct { + monitor Monitor + maxReaderCount int + } +) + +func newSlicePredicateAction( + monitor Monitor, + maxReaderCount int, +) Action { + return &slicePredicateAction{ + monitor: monitor, + maxReaderCount: maxReaderCount, + } +} + +func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) { + reader, ok := readerGroup.ReaderByID(DefaultReaderId) + if !ok { + return + } + + if a.maxReaderCount <= DefaultReaderId+1 { + return + } + + sliceCount := a.monitor.GetSliceCount(DefaultReaderId) + pendingTasks := 0 + hasNonUniversalPredicate := false + reader.WalkSlices(func(s Slice) { + pendingTasks += a.monitor.GetSlicePendingTaskCount(s) + + if !tasks.IsUniverisalPredicate(s.Scope().Predicate) { + hasNonUniversalPredicate = true + } + }) + + // only move slices when either default reader slice count or + // pending task count is high + if !hasNonUniversalPredicate || + (pendingTasks < moveSliceDefaultReaderMinPendingTaskCount && + sliceCount < moveSliceDefaultReaderMinSliceCount) { + return + } + + var moveSlices []Slice + reader.SplitSlices(func(s Slice) (remaining []Slice, split bool) { + if tasks.IsUniverisalPredicate(s.Scope().Predicate) { + return []Slice{s}, false + } + + moveSlices = append(moveSlices, s) + return nil, true + }) + + if len(moveSlices) == 0 { + return + } + + nextReader, ok := readerGroup.ReaderByID(DefaultReaderId + 1) + if !ok { + readerGroup.NewReader(DefaultReaderId+1, moveSlices...) + } else { + nextReader.MergeSlices(moveSlices...) + } +} diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index a3cde89abf8..cc8f11efd92 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -346,10 +346,17 @@ func (p *queueBase) processNewRange() { } func (p *queueBase) checkpoint() { + for _, reader := range p.readerGroup.Readers() { + reader.ShrinkSlices() + } + // Run slicePredicateAction to move slices with non-universal predicate to non-default reader + // so that upon shard reload, task loading for those slices won't block other slices in the default + // reader. + newSlicePredicateAction(p.monitor, p.mitigator.maxReaderCount()).Run(p.readerGroup) + readerScopes := make(map[int32][]Scope) newExclusiveDeletionHighWatermark := p.nonReadableScope.Range.InclusiveMin for readerID, reader := range p.readerGroup.Readers() { - reader.ShrinkSlices() scopes := reader.Scopes() if len(scopes) == 0 { diff --git a/service/history/queues/queue_base_test.go b/service/history/queues/queue_base_test.go index 2625bc79496..e3916ae131f 100644 --- a/service/history/queues/queue_base_test.go +++ b/service/history/queues/queue_base_test.go @@ -355,7 +355,7 @@ func (s *queueBaseSuite) TestProcessNewRange() { s.True(base.nonReadableScope.Range.Equals(NewRange(scopes[0].Range.ExclusiveMax, tasks.MaximumKey))) } -func (s *queueBaseSuite) TestCompleteTaskAndPersistState_WithPendingTasks() { +func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() { scopeMinKey := tasks.MaximumKey readerScopes := map[int32][]Scope{} for _, readerID := range []int32{DefaultReaderId, 2, 3} { @@ -436,7 +436,7 @@ func (s *queueBaseSuite) TestCompleteTaskAndPersistState_WithPendingTasks() { s.True(scopeMinKey.CompareTo(base.exclusiveDeletionHighWatermark) == 0) } -func (s *queueBaseSuite) TestCompleteTaskAndPersistState_NoPendingTasks() { +func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() { exclusiveReaderHighWatermark := NewRandomKey() queueState := &queueState{ readerScopes: map[int32][]Scope{}, @@ -509,3 +509,77 @@ func (s *queueBaseSuite) TestCompleteTaskAndPersistState_NoPendingTasks() { s.True(exclusiveReaderHighWatermark.CompareTo(base.exclusiveDeletionHighWatermark) == 0) } + +func (s *queueBaseSuite) TestCheckPoint_MoveSlices() { + exclusiveReaderHighWatermark := tasks.MaximumKey + scopes := NewRandomScopes(3) + scopes[0].Predicate = tasks.NewNamespacePredicate([]string{uuid.New()}) + scopes[2].Predicate = tasks.NewTypePredicate([]enumsspb.TaskType{enumsspb.TASK_TYPE_ACTIVITY_RETRY_TIMER}) + initialQueueState := &queueState{ + readerScopes: map[int32][]Scope{ + DefaultReaderId: scopes, + }, + exclusiveReaderHighWatermark: exclusiveReaderHighWatermark, + } + initialPersistenceState := ToPersistenceQueueState(initialQueueState) + + expectedQueueState := &queueState{ + readerScopes: map[int32][]Scope{ + DefaultReaderId: {scopes[1]}, + DefaultReaderId + 1: {scopes[0], scopes[2]}, + }, + exclusiveReaderHighWatermark: exclusiveReaderHighWatermark, + } + expectedPersistenceState := ToPersistenceQueueState(expectedQueueState) + + mockShard := shard.NewTestContext( + s.controller, + &persistence.ShardInfoWithFailover{ + ShardInfo: &persistencespb.ShardInfo{ + ShardId: 0, + RangeId: 10, + QueueStates: map[int32]*persistencespb.QueueState{ + tasks.CategoryIDTimer: initialPersistenceState, + }, + }, + }, + s.config, + ) + mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() + + base := newQueueBase( + mockShard, + tasks.CategoryTimer, + nil, + s.mockScheduler, + NewNoopPriorityAssigner(), + nil, + s.options, + s.rateLimiter, + s.logger, + s.metricsHandler, + ) + base.checkpointTimer = time.NewTimer(s.options.CheckpointInterval()) + s.True(scopes[0].Range.InclusiveMin.CompareTo(base.exclusiveDeletionHighWatermark) == 0) + + // set to a smaller value so that delete will be triggered + base.exclusiveDeletionHighWatermark = tasks.MinimumKey + + // manually set pending task count to trigger slice predicate action + base.monitor.SetSlicePendingTaskCount(&SliceImpl{}, 2*moveSliceDefaultReaderMinPendingTaskCount) + + gomock.InOrder( + mockShard.Resource.ExecutionMgr.EXPECT().RangeCompleteHistoryTasks(gomock.Any(), gomock.Any()).Return(nil).Times(1), + mockShard.Resource.ShardMgr.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, request *persistence.UpdateShardRequest) error { + s.Equal(expectedPersistenceState, request.ShardInfo.QueueStates[tasks.CategoryIDTimer]) + return nil + }, + ).Times(1), + ) + + base.checkpoint() + + s.True(scopes[0].Range.InclusiveMin.CompareTo(base.exclusiveDeletionHighWatermark) == 0) +} diff --git a/service/history/tasks/predicates.go b/service/history/tasks/predicates.go index 9c0566951f8..6acf021f1dc 100644 --- a/service/history/tasks/predicates.go +++ b/service/history/tasks/predicates.go @@ -25,10 +25,10 @@ package tasks import ( - "go.temporal.io/server/common/predicates" "golang.org/x/exp/maps" enumsspb "go.temporal.io/server/api/enums/v1" + "go.temporal.io/server/common/predicates" ) type ( @@ -150,6 +150,21 @@ func OrPredicates(a Predicate, b Predicate) Predicate { return predicates.Or(a, b) } +func IsUniverisalPredicate(p Predicate) bool { + _, ok := p.(*predicates.UniversalImpl[Task]) + return ok +} + +func IsNamespacePredicate(p Predicate) bool { + _, ok := p.(*NamespacePredicate) + return ok +} + +func IsTypePredicate(p Predicate) bool { + _, ok := p.(*TypePredicate) + return ok +} + func intersect[K comparable](this, that map[K]struct{}) map[K]struct{} { intersection := make(map[K]struct{}) for key := range this {