From 9763be1c149ccb10098604100005b0904b521b9b Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 14 Jun 2022 13:20:45 -0700 Subject: [PATCH 1/6] Multi-cursor components: queue slice --- service/history/queues/slice.go | 370 +++++++++++++++++++++++ service/history/queues/slice_test.go | 435 +++++++++++++++++++++++++++ 2 files changed, 805 insertions(+) create mode 100644 service/history/queues/slice.go create mode 100644 service/history/queues/slice_test.go diff --git a/service/history/queues/slice.go b/service/history/queues/slice.go new file mode 100644 index 00000000000..961b786a278 --- /dev/null +++ b/service/history/queues/slice.go @@ -0,0 +1,370 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package queues + +import ( + "fmt" + "sort" + + ctasks "go.temporal.io/server/common/tasks" + "go.temporal.io/server/service/history/tasks" +) + +type ( + Slice interface { + Scope() Scope + CanSplitByRange(tasks.Key) bool + SplitByRange(tasks.Key) (left Slice, right Slice) + SplitByPredicate(tasks.Predicate) (pass Slice, fail Slice) + CanMergeByRange(Slice) bool + MergeByRange(Slice) Slice + MergeByPredicate(Slice) Slice + ShrinkRange() + SelectTasks(int) ([]Executable, error) + } + + executableInitializer func(tasks.Task) Executable + + SliceImpl struct { + executableInitializer executableInitializer + + destroyed bool + + scope Scope + iterators []Iterator + + // TODO: make task tracking a separate component + // and evaluate the performance of using a btree + // for storing executables + outstandingExecutables map[tasks.Key]Executable + } +) + +func NewSlice( + paginationFnProvider paginationFnProvider, + executableInitializer executableInitializer, + scope Scope, +) *SliceImpl { + return &SliceImpl{ + executableInitializer: executableInitializer, + scope: scope, + outstandingExecutables: make(map[tasks.Key]Executable), + iterators: []Iterator{ + NewIterator(paginationFnProvider, scope.Range), + }, + destroyed: false, + } +} + +func (s *SliceImpl) Scope() Scope { + s.validateNotDestroyed() + return s.scope +} + +func (s *SliceImpl) CanSplitByRange(key tasks.Key) bool { + s.validateNotDestroyed() + return s.scope.CanSplitByRange(key) +} + +func (s *SliceImpl) SplitByRange(key tasks.Key) (leftSlice Slice, rightSlice Slice) { + if !s.CanSplitByRange(key) { + panic(fmt.Sprintf("Unable to split queue slice with range %v at %v", s.scope.Range, key)) + } + + leftScope, rightScope := s.scope.SplitByRange(key) + leftExecutables, rightExecutables := s.splitExecutables(leftScope, rightScope) + + leftIterators := make([]Iterator, 0, len(s.iterators)/2) + rightIterators := make([]Iterator, 0, len(s.iterators)/2) + for _, iter := range s.iterators { + iterRange := iter.Range() + if leftScope.Range.ContainsRange(iterRange) { + leftIterators = append(leftIterators, iter) + continue + } + + if rightScope.Range.ContainsRange(iterRange) { + rightIterators = append(rightIterators, iter) + continue + } + + leftIter, rightIter := iter.Split(key) + leftIterators = append(leftIterators, leftIter) + rightIterators = append(rightIterators, rightIter) + } + + leftSlice = &SliceImpl{ + executableInitializer: s.executableInitializer, + scope: leftScope, + outstandingExecutables: leftExecutables, + iterators: leftIterators, + } + rightSlice = &SliceImpl{ + executableInitializer: s.executableInitializer, + scope: rightScope, + outstandingExecutables: rightExecutables, + iterators: rightIterators, + } + + s.destroy() + return leftSlice, rightSlice +} + +func (s *SliceImpl) SplitByPredicate(predicate tasks.Predicate) (passSlice Slice, failSlice Slice) { + s.validateNotDestroyed() + + passScope, failScope := s.scope.SplitByPredicate(predicate) + passExecutables, failExecutables := s.splitExecutables(passScope, failScope) + + passIterators := make([]Iterator, 0, len(s.iterators)) + failIterators := make([]Iterator, 0, len(s.iterators)) + for _, iter := range s.iterators { + passIterators = append(passIterators, iter) + failIterators = append(failIterators, iter.Remaining()) + } + + passSlice = &SliceImpl{ + executableInitializer: s.executableInitializer, + scope: passScope, + outstandingExecutables: passExecutables, + iterators: passIterators, + } + failSlice = &SliceImpl{ + executableInitializer: s.executableInitializer, + scope: failScope, + outstandingExecutables: failExecutables, + iterators: failIterators, + } + + s.destroy() + return passSlice, failSlice +} + +func (s *SliceImpl) splitExecutables( + thisScope Scope, + thatScope Scope, +) (map[tasks.Key]Executable, map[tasks.Key]Executable) { + thisExecutable := s.outstandingExecutables + thatExecutables := make(map[tasks.Key]Executable, len(s.outstandingExecutables)/2) + for key, executable := range s.outstandingExecutables { + if thisScope.Contains(executable) { + continue + } + + if !thatScope.Contains(executable) { + panic(fmt.Sprintf("Queue slice encountered task doesn't belong to its scope, scope: %v, task: %v, task type: %v", + s.scope, executable.GetTask(), executable.GetType())) + } + delete(thisExecutable, key) + thatExecutables[key] = executable + } + return thisExecutable, thatExecutables +} + +func (s *SliceImpl) CanMergeByRange(slice Slice) bool { + s.validateNotDestroyed() + + return s.scope.CanMergeByRange(slice.Scope()) +} + +func (s *SliceImpl) MergeByRange(slice Slice) Slice { + if !s.CanMergeByRange(slice) { + panic(fmt.Sprintf("Unalbed to merge queue slice having scope %v with slice having scope %v", s.scope, slice.Scope())) + } + + incomingSlice, ok := slice.(*SliceImpl) + if !ok { + panic(fmt.Sprintf("Unabled to merge queue slice of type %T with type %T", s, slice)) + } + + mergedScope := s.scope.MergeByRange(incomingSlice.scope) + + s.destroy() + incomingSlice.destroy() + return &SliceImpl{ + executableInitializer: s.executableInitializer, + scope: mergedScope, + outstandingExecutables: s.mergeExecutables(incomingSlice), + iterators: s.mergeIterators(incomingSlice), + } +} + +func (s *SliceImpl) MergeByPredicate(slice Slice) Slice { + s.validateNotDestroyed() + + incomingSlice, ok := slice.(*SliceImpl) + if !ok { + panic(fmt.Sprintf("Unabled to merge queue slice of type %T with type %T", s, slice)) + } + + if !s.scope.Range.Equal(incomingSlice.scope.Range) { + panic(fmt.Sprintf("Unabled to merge queue slice having range %v with slice having range %v", s.scope, incomingSlice.scope)) + } + + mergedScope := s.scope.MergeByPredicate(incomingSlice.scope) + + s.destroy() + incomingSlice.destroy() + return &SliceImpl{ + executableInitializer: s.executableInitializer, + scope: mergedScope, + outstandingExecutables: s.mergeExecutables(incomingSlice), + iterators: s.mergeIterators(incomingSlice), + } +} + +func (s *SliceImpl) mergeExecutables(incomingSlice *SliceImpl) map[tasks.Key]Executable { + thisExecutables := s.outstandingExecutables + thatExecutables := incomingSlice.outstandingExecutables + if len(thisExecutables) < len(thatExecutables) { + thisExecutables, thatExecutables = thatExecutables, thisExecutables + } + + for key, executable := range thatExecutables { + thisExecutables[key] = executable + } + + return thisExecutables +} + +func (s *SliceImpl) mergeIterators(incomingSlice *SliceImpl) []Iterator { + mergedIterators := make([]Iterator, 0, len(s.iterators)+len(incomingSlice.iterators)) + currentIterIdx := 0 + incomingIterIdx := 0 + for currentIterIdx < len(s.iterators) && incomingIterIdx < len(incomingSlice.iterators) { + currentIter := s.iterators[currentIterIdx] + incomingIter := incomingSlice.iterators[incomingIterIdx] + + if currentIter.Range().InclusiveMin.CompareTo(incomingIter.Range().InclusiveMin) < 0 { + mergedIterators = s.appendIterator(mergedIterators, currentIter) + currentIterIdx++ + } else { + mergedIterators = s.appendIterator(mergedIterators, incomingIter) + incomingIterIdx++ + } + } + + for _, iterator := range s.iterators[currentIterIdx:] { + mergedIterators = s.appendIterator(mergedIterators, iterator) + } + for _, iterator := range incomingSlice.iterators[incomingIterIdx:] { + mergedIterators = s.appendIterator(mergedIterators, iterator) + } + + return mergedIterators +} + +func (s *SliceImpl) appendIterator( + iterators []Iterator, + iterator Iterator, +) []Iterator { + if len(iterators) == 0 { + return []Iterator{iterator} + } + + size := len(iterators) + if iterators[size-1].CanMerge(iterator) { + iterators[size-1] = iterators[size-1].Merge(iterator) + return iterators + } + + return append(iterators, iterator) +} + +func (s *SliceImpl) ShrinkRange() { + s.validateNotDestroyed() + + var taskKeys tasks.Keys + taskKeys = make([]tasks.Key, 0, len(s.outstandingExecutables)) + for key := range s.outstandingExecutables { + taskKeys = append(taskKeys, key) + } + sort.Sort(taskKeys) + + for _, key := range taskKeys { + if s.outstandingExecutables[key].State() == ctasks.TaskStateAcked { + delete(s.outstandingExecutables, key) + continue + } + + s.scope.Range.InclusiveMin = key + break + } + + if len(s.outstandingExecutables) == 0 { + if len(s.iterators) == 0 { + s.scope.Range.InclusiveMin = s.scope.Range.ExclusiveMax + } else { + s.scope.Range.InclusiveMin = s.iterators[0].Range().InclusiveMin + } + } +} + +func (s *SliceImpl) SelectTasks(batchSize int) ([]Executable, error) { + s.validateNotDestroyed() + + if len(s.iterators) == 0 { + return []Executable{}, nil + } + + executables := make([]Executable, 0, batchSize) + for len(executables) < batchSize && len(s.iterators) != 0 { + if s.iterators[0].HasNext() { + task, err := s.iterators[0].Next() + if err != nil { + s.iterators[0] = s.iterators[0].Remaining() + return nil, err + } + + taskKey := task.GetKey() + if !s.scope.Range.ContainsKey(taskKey) { + panic(fmt.Sprintf("Queue slice get task from iterator doesn't belong to its range, range: %v, task key %v", + s.scope.Range, taskKey)) + } + + if !s.scope.Predicate.Test(task) { + continue + } + + executable := s.executableInitializer(task) + s.outstandingExecutables[taskKey] = executable + executables = append(executables, executable) + } else { + s.iterators = s.iterators[1:] + } + } + + return executables, nil +} + +func (s *SliceImpl) destroy() { + s.destroyed = true +} + +func (s *SliceImpl) validateNotDestroyed() { + if s.destroyed { + panic("Can not invoke method on destroyed queue slice") + } +} diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go new file mode 100644 index 00000000000..679c90c5739 --- /dev/null +++ b/service/history/queues/slice_test.go @@ -0,0 +1,435 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package queues + +import ( + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/collection" + "go.temporal.io/server/common/predicates" + ctasks "go.temporal.io/server/common/tasks" + "go.temporal.io/server/service/history/tasks" + "golang.org/x/exp/slices" +) + +type ( + sliceSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + + executableInitializer executableInitializer + } +) + +func TestSliceSuite(t *testing.T) { + s := new(sliceSuite) + suite.Run(t, s) +} + +func (s *sliceSuite) SetupTest() { + s.Assertions = require.New(s.T()) + + s.controller = gomock.NewController(s.T()) + + s.executableInitializer = func(t tasks.Task) Executable { + return NewMockExecutable(s.controller) + } +} + +func (s *sliceSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *sliceSuite) TestCanSplitByRange() { + r := NewRandomRange() + scope := NewScope(r, predicates.All[tasks.Task]()) + + slice := NewSlice(nil, s.executableInitializer, scope) + s.Equal(scope, slice.Scope()) + + s.True(slice.CanSplitByRange(r.InclusiveMin)) + s.True(slice.CanSplitByRange(r.ExclusiveMax)) + s.True(slice.CanSplitByRange(NewRandomKeyInRange(r))) + + s.False(slice.CanSplitByRange(tasks.NewKey( + r.InclusiveMin.FireTime, + r.InclusiveMin.TaskID-1, + ))) + s.False(slice.CanSplitByRange(tasks.NewKey( + r.ExclusiveMax.FireTime.Add(time.Nanosecond), + r.ExclusiveMax.TaskID, + ))) +} + +func (s *sliceSuite) TestSplitByRange() { + r := NewRandomRange() + predicate := predicates.All[tasks.Task]() + scope := NewScope(r, predicates.All[tasks.Task]()) + + slice := NewSlice(nil, s.executableInitializer, scope) + for _, executable := range s.randomExecutablesInRange(r, 10) { + slice.outstandingExecutables[executable.GetKey()] = executable + } + slice.iterators = s.randomIteratorsInRange(r, 5, nil) + + splitKey := NewRandomKeyInRange(r) + leftSlice, rightSlice := slice.SplitByRange(splitKey) + s.Equal(NewScope( + NewRange(r.InclusiveMin, splitKey), + predicate, + ), leftSlice.Scope()) + s.Equal(NewScope( + NewRange(splitKey, r.ExclusiveMax), + predicate, + ), rightSlice.Scope()) + + s.validateSliceState(leftSlice.(*SliceImpl)) + s.validateSliceState(rightSlice.(*SliceImpl)) + + s.Panics(func() { slice.validateNotDestroyed() }) +} + +func (s *sliceSuite) TestSplitByPredicate() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + predicate := tasks.NewNamespacePredicate(namespaceIDs) + scope := NewScope(r, predicate) + + slice := NewSlice(nil, s.executableInitializer, scope) + for _, executable := range s.randomExecutablesInRange(r, 10) { + mockExecutable := executable.(*MockExecutable) + mockExecutable.EXPECT().GetNamespaceID().Return(namespaceIDs[rand.Intn(len(namespaceIDs))]).AnyTimes() + slice.outstandingExecutables[executable.GetKey()] = executable + } + slice.iterators = s.randomIteratorsInRange(r, 5, nil) + + splitNamespaceIDs := append(slices.Clone(namespaceIDs[:rand.Intn(len(namespaceIDs))]), uuid.New(), uuid.New()) + splitPredicate := tasks.NewNamespacePredicate(splitNamespaceIDs) + passSlice, failSlice := slice.SplitByPredicate(splitPredicate) + s.Equal(r, passSlice.Scope().Range) + s.Equal(r, failSlice.Scope().Range) + s.True(predicates.And[tasks.Task](predicate, splitPredicate).Equals(passSlice.Scope().Predicate)) + s.True(predicates.And[tasks.Task](predicate, predicates.Not[tasks.Task](splitPredicate)).Equals(failSlice.Scope().Predicate)) + + s.validateSliceState(passSlice.(*SliceImpl)) + s.validateSliceState(failSlice.(*SliceImpl)) + + s.Panics(func() { slice.validateNotDestroyed() }) +} + +func (s *sliceSuite) TestCanMergeByRange() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + predicate := tasks.NewNamespacePredicate(namespaceIDs) + slice := NewSlice(nil, nil, NewScope(r, predicate)) + + testPredicates := []tasks.Predicate{ + predicate, + tasks.NewNamespacePredicate(namespaceIDs), + tasks.NewNamespacePredicate([]string{uuid.New(), uuid.New(), uuid.New(), uuid.New()}), + } + s.True(predicate.Equals(testPredicates[0])) + s.True(predicate.Equals(testPredicates[1])) + s.False(predicate.Equals(testPredicates[2])) + + for _, mergePredicate := range testPredicates { + canMerge := predicate.Equals(mergePredicate) + + testSlice := NewSlice(nil, nil, NewScope(r, mergePredicate)) + s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, r.InclusiveMin), mergePredicate)) + s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(r.ExclusiveMax, tasks.MaximumKey), mergePredicate)) + s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, NewRandomKeyInRange(r)), mergePredicate)) + s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(NewRandomKeyInRange(r), tasks.MaximumKey), mergePredicate)) + s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, tasks.MaximumKey), mergePredicate)) + s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + } + + testSlice := NewSlice(nil, nil, NewScope(NewRange( + tasks.MinimumKey, + tasks.NewKey(r.InclusiveMin.FireTime, r.InclusiveMin.TaskID-1), + ), predicate)) + s.False(slice.CanMergeByRange(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange( + tasks.NewKey(r.ExclusiveMax.FireTime, r.ExclusiveMax.TaskID+1), + tasks.MaximumKey, + ), predicate)) + s.False(slice.CanMergeByRange(testSlice)) +} + +func (s *sliceSuite) TestMergeByRange() { + r := NewRandomRange() + predicate := predicates.All[tasks.Task]() + + slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + for _, executable := range s.randomExecutablesInRange(r, rand.Intn(20)) { + slice.outstandingExecutables[executable.GetKey()] = executable + } + totalExecutables := len(slice.outstandingExecutables) + slice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) + + incomingRange := NewRange(tasks.MinimumKey, NewRandomKeyInRange(r)) + incomingSlice := NewSlice(nil, s.executableInitializer, NewScope(incomingRange, predicate)) + for _, executable := range s.randomExecutablesInRange(incomingRange, rand.Intn(20)) { + incomingSlice.outstandingExecutables[executable.GetKey()] = executable + } + totalExecutables += len(incomingSlice.outstandingExecutables) + incomingSlice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) + + mergedSlice := slice.MergeByRange(incomingSlice) + mergedSliceImpl := mergedSlice.(*SliceImpl) + + s.Equal(NewScope( + NewRange(tasks.MinimumKey, r.ExclusiveMax), + predicate, + ), mergedSlice.Scope()) + + s.validateSliceState(mergedSliceImpl) + s.Len(mergedSliceImpl.outstandingExecutables, totalExecutables) + + s.Panics(func() { slice.validateNotDestroyed() }) + s.Panics(func() { incomingSlice.validateNotDestroyed() }) +} + +func (s *sliceSuite) TestShrinkRange() { + r := NewRandomRange() + predicate := predicates.All[tasks.Task]() + + slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + slice.iterators = s.randomIteratorsInRange(r, rand.Intn(2), nil) + + executables := s.randomExecutablesInRange(r, 5) + slices.SortFunc(executables, func(a, b Executable) bool { + return a.GetKey().CompareTo(b.GetKey()) < 0 + }) + + firstPendingIdx := len(executables) + for idx, executable := range executables { + mockExecutable := executable.(*MockExecutable) + acked := rand.Intn(10) < 8 + if acked { + mockExecutable.EXPECT().State().Return(ctasks.TaskStateAcked).MaxTimes(1) + } else { + mockExecutable.EXPECT().State().Return(ctasks.TaskStatePending).MaxTimes(1) + if firstPendingIdx == len(executables) { + firstPendingIdx = idx + } + } + + slice.outstandingExecutables[executable.GetKey()] = executable + } + + slice.ShrinkRange() + s.Len(slice.outstandingExecutables, len(executables)-firstPendingIdx) + + newInclusiveMin := r.ExclusiveMax + + if firstPendingIdx == len(executables) { + if len(slice.iterators) != 0 { + newInclusiveMin = slice.iterators[0].Range().InclusiveMin + } + } else { + newInclusiveMin = executables[firstPendingIdx].GetKey() + } + + s.Equal(NewRange(newInclusiveMin, r.ExclusiveMax), slice.Scope().Range) +} + +func (s *sliceSuite) TestSelectTasks_NoError() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + predicate := tasks.NewNamespacePredicate(namespaceIDs) + + numTasks := 20 + paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + return func(paginationToken []byte) ([]tasks.Task, []byte, error) { + + mockTasks := make([]tasks.Task, 0, numTasks) + for i := 0; i != numTasks; i++ { + mockTask := tasks.NewMockTask(s.controller) + key := NewRandomKeyInRange(r) + mockTask.EXPECT().GetKey().Return(key).AnyTimes() + + namespaceID := namespaceIDs[rand.Intn(len(namespaceIDs))] + if i >= numTasks/2 { + namespaceID = uuid.New() // should be filtered out + } + mockTask.EXPECT().GetNamespaceID().Return(namespaceID).AnyTimes() + mockTasks = append(mockTasks, mockTask) + } + + slices.SortFunc(mockTasks, func(a, b tasks.Task) bool { + return a.GetKey().CompareTo(b.GetKey()) < 0 + }) + + return mockTasks, nil, nil + } + } + + for _, batchSize := range []int{1, 2, 5, 10, 20, 100} { + slice := NewSlice(paginationFnProvider, s.executableInitializer, NewScope(r, predicate)) + + executables := make([]Executable, 0, numTasks) + for { + selectedExecutables, err := slice.SelectTasks(batchSize) + s.NoError(err) + if len(selectedExecutables) == 0 { + break + } + + executables = append(executables, selectedExecutables...) + } + + s.Len(executables, numTasks/2) // half of tasks should be filtered out based on its namespaceID + s.Empty(slice.iterators) + } +} + +func (s *sliceSuite) TestSelectTasks_Error() { + r := NewRandomRange() + predicate := predicates.All[tasks.Task]() + + numTasks := 20 + loadErr := true + paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + return func(paginationToken []byte) ([]tasks.Task, []byte, error) { + if loadErr { + loadErr = false + return nil, nil, errors.New("some random load task error") + } + + mockTasks := make([]tasks.Task, 0, numTasks) + for i := 0; i != numTasks; i++ { + mockTask := tasks.NewMockTask(s.controller) + key := NewRandomKeyInRange(r) + mockTask.EXPECT().GetKey().Return(key).AnyTimes() + mockTasks = append(mockTasks, mockTask) + } + + slices.SortFunc(mockTasks, func(a, b tasks.Task) bool { + return a.GetKey().CompareTo(b.GetKey()) < 0 + }) + + return mockTasks, nil, nil + } + } + + slice := NewSlice(paginationFnProvider, s.executableInitializer, NewScope(r, predicate)) + _, err := slice.SelectTasks(100) + s.Error(err) + + executables, err := slice.SelectTasks(100) + s.NoError(err) + s.Len(executables, numTasks) + s.Empty(slice.iterators) +} + +func (s *sliceSuite) validateSliceState( + slice *SliceImpl, +) { + s.NotNil(slice.executableInitializer) + + for _, executable := range slice.outstandingExecutables { + s.True(slice.scope.Contains(executable)) + } + + r := slice.Scope().Range + for idx, iterator := range slice.iterators { + s.True(r.ContainsRange(iterator.Range())) + if idx != 0 { + currentRange := iterator.Range() + previousRange := slice.iterators[idx-1].Range() + s.False(currentRange.CanMerge(previousRange)) + s.True(previousRange.ExclusiveMax.CompareTo(currentRange.InclusiveMin) < 0) + } + } + + s.False(slice.destroyed) +} + +func (s *sliceSuite) randomExecutablesInRange( + r Range, + numExecutables int, +) []Executable { + executables := make([]Executable, 0, numExecutables) + for i := 0; i != numExecutables; i++ { + mockExecutable := NewMockExecutable(s.controller) + key := NewRandomKeyInRange(r) + mockExecutable.EXPECT().GetKey().Return(key).AnyTimes() + executables = append(executables, mockExecutable) + } + return executables +} + +func (s *sliceSuite) randomIteratorsInRange( + r Range, + numIterators int, + paginationFnProvider paginationFnProvider, +) []Iterator { + ranges := []Range{r} + for len(ranges) < numIterators { + r := ranges[0] + left, right := r.Split(NewRandomKeyInRange(r)) + left.ExclusiveMax.FireTime.Add(-time.Nanosecond) + right.InclusiveMin.FireTime.Add(time.Nanosecond) + ranges = append(ranges[1:], left, right) + } + + slices.SortFunc(ranges, func(a, b Range) bool { + return a.InclusiveMin.CompareTo(b.InclusiveMin) < 0 + }) + + iterators := make([]Iterator, 0, numIterators) + for _, r := range ranges { + start := NewRandomKeyInRange(r) + end := NewRandomKeyInRange(r) + if start.CompareTo(end) > 0 { + start, end = end, start + } + iterator := NewIterator(paginationFnProvider, NewRange(start, end)) + iterators = append(iterators, iterator) + } + + return iterators +} From b03be2e7a56b2ccd9f6b5b594cb4328b361e3a4b Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 14 Jun 2022 13:47:54 -0700 Subject: [PATCH 2/6] update --- service/history/queues/slice.go | 31 ++++++------- service/history/queues/slice_test.go | 67 ++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 15 deletions(-) diff --git a/service/history/queues/slice.go b/service/history/queues/slice.go index 961b786a278..e0fb2ca04d3 100644 --- a/service/history/queues/slice.go +++ b/service/history/queues/slice.go @@ -40,6 +40,7 @@ type ( SplitByPredicate(tasks.Predicate) (pass Slice, fail Slice) CanMergeByRange(Slice) bool MergeByRange(Slice) Slice + CanMergeByPredicate(Slice) bool MergeByPredicate(Slice) Slice ShrinkRange() SelectTasks(int) ([]Executable, error) @@ -186,50 +187,50 @@ func (s *SliceImpl) splitExecutables( func (s *SliceImpl) CanMergeByRange(slice Slice) bool { s.validateNotDestroyed() - return s.scope.CanMergeByRange(slice.Scope()) + return s != slice && s.scope.CanMergeByRange(slice.Scope()) } func (s *SliceImpl) MergeByRange(slice Slice) Slice { if !s.CanMergeByRange(slice) { - panic(fmt.Sprintf("Unalbed to merge queue slice having scope %v with slice having scope %v", s.scope, slice.Scope())) + panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v by range", s.scope, slice.Scope())) } incomingSlice, ok := slice.(*SliceImpl) if !ok { - panic(fmt.Sprintf("Unabled to merge queue slice of type %T with type %T", s, slice)) + panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) } - mergedScope := s.scope.MergeByRange(incomingSlice.scope) - s.destroy() incomingSlice.destroy() return &SliceImpl{ executableInitializer: s.executableInitializer, - scope: mergedScope, + scope: s.scope.MergeByRange(incomingSlice.scope), outstandingExecutables: s.mergeExecutables(incomingSlice), iterators: s.mergeIterators(incomingSlice), } } -func (s *SliceImpl) MergeByPredicate(slice Slice) Slice { +func (s *SliceImpl) CanMergeByPredicate(slice Slice) bool { s.validateNotDestroyed() - incomingSlice, ok := slice.(*SliceImpl) - if !ok { - panic(fmt.Sprintf("Unabled to merge queue slice of type %T with type %T", s, slice)) - } + return s != slice && s.scope.CanMergeByPredicate(slice.Scope()) +} - if !s.scope.Range.Equal(incomingSlice.scope.Range) { - panic(fmt.Sprintf("Unabled to merge queue slice having range %v with slice having range %v", s.scope, incomingSlice.scope)) +func (s *SliceImpl) MergeByPredicate(slice Slice) Slice { + if !s.CanMergeByPredicate(slice) { + panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v by predicate", s.scope, slice.Scope())) } - mergedScope := s.scope.MergeByPredicate(incomingSlice.scope) + incomingSlice, ok := slice.(*SliceImpl) + if !ok { + panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) + } s.destroy() incomingSlice.destroy() return &SliceImpl{ executableInitializer: s.executableInitializer, - scope: mergedScope, + scope: s.scope.MergeByPredicate(incomingSlice.scope), outstandingExecutables: s.mergeExecutables(incomingSlice), iterators: s.mergeIterators(incomingSlice), } diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go index 679c90c5739..4cc7e5f760b 100644 --- a/service/history/queues/slice_test.go +++ b/service/history/queues/slice_test.go @@ -34,6 +34,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/predicates" ctasks "go.temporal.io/server/common/tasks" @@ -185,6 +186,8 @@ func (s *sliceSuite) TestCanMergeByRange() { s.Equal(canMerge, slice.CanMergeByRange(testSlice)) } + s.False(slice.CanMergeByRange(slice)) + testSlice := NewSlice(nil, nil, NewScope(NewRange( tasks.MinimumKey, tasks.NewKey(r.InclusiveMin.FireTime, r.InclusiveMin.TaskID-1), @@ -232,6 +235,70 @@ func (s *sliceSuite) TestMergeByRange() { s.Panics(func() { incomingSlice.validateNotDestroyed() }) } +func (s *sliceSuite) TestCanMergeByPredicate() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + predicate := tasks.NewNamespacePredicate(namespaceIDs) + slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + + testSlice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + s.True(slice.CanMergeByPredicate(testSlice)) + + testSlice = NewSlice(nil, s.executableInitializer, NewScope(r, tasks.NewTypePredicate([]enumsspb.TaskType{}))) + s.True(slice.CanMergeByPredicate(testSlice)) + + s.False(slice.CanMergeByPredicate(slice)) + + testSlice = NewSlice(nil, s.executableInitializer, NewScope(NewRandomRange(), predicate)) + s.False(slice.CanMergeByPredicate(testSlice)) + + testSlice = NewSlice(nil, s.executableInitializer, NewScope(NewRandomRange(), predicates.All[tasks.Task]())) + s.False(slice.CanMergeByPredicate(testSlice)) +} + +func (s *sliceSuite) TestMergeByPredicate() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + predicate := tasks.NewNamespacePredicate(namespaceIDs) + + slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + for _, executable := range s.randomExecutablesInRange(r, rand.Intn(20)) { + mockExecutable := executable.(*MockExecutable) + mockExecutable.EXPECT().GetNamespaceID().Return(namespaceIDs[rand.Intn(len(namespaceIDs))]).AnyTimes() + mockExecutable.EXPECT().GetType().Return(enumsspb.TASK_TYPE_TRANSFER_CLOSE_EXECUTION).AnyTimes() + slice.outstandingExecutables[executable.GetKey()] = executable + } + totalExecutables := len(slice.outstandingExecutables) + slice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) + + taskTypes := []enumsspb.TaskType{ + enumsspb.TASK_TYPE_ACTIVITY_RETRY_TIMER, + enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, + } + incomingPredicate := tasks.NewTypePredicate(taskTypes) + incomingSlice := NewSlice(nil, s.executableInitializer, NewScope(r, incomingPredicate)) + for _, executable := range s.randomExecutablesInRange(r, rand.Intn(20)) { + mockExecutable := executable.(*MockExecutable) + mockExecutable.EXPECT().GetNamespaceID().Return(uuid.New()).AnyTimes() + mockExecutable.EXPECT().GetType().Return(taskTypes[rand.Intn(len(taskTypes))]).AnyTimes() + incomingSlice.outstandingExecutables[executable.GetKey()] = executable + } + totalExecutables += len(incomingSlice.outstandingExecutables) + incomingSlice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) + + mergedSlice := slice.MergeByPredicate(incomingSlice) + mergedSliceImpl := mergedSlice.(*SliceImpl) + + s.Equal(r, mergedSlice.Scope().Range) + s.True(predicates.Or[tasks.Task](predicate, incomingPredicate).Equals(mergedSlice.Scope().Predicate)) + + s.validateSliceState(mergedSliceImpl) + s.Len(mergedSliceImpl.outstandingExecutables, totalExecutables) + + s.Panics(func() { slice.validateNotDestroyed() }) + s.Panics(func() { incomingSlice.validateNotDestroyed() }) +} + func (s *sliceSuite) TestShrinkRange() { r := NewRandomRange() predicate := predicates.All[tasks.Task]() From 441a502513d4ca45c6d1c6f5633f62ebc2737c71 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 14 Jun 2022 13:57:25 -0700 Subject: [PATCH 3/6] move check --- service/history/queues/slice.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/service/history/queues/slice.go b/service/history/queues/slice.go index e0fb2ca04d3..b9fe31b0263 100644 --- a/service/history/queues/slice.go +++ b/service/history/queues/slice.go @@ -187,6 +187,10 @@ func (s *SliceImpl) splitExecutables( func (s *SliceImpl) CanMergeByRange(slice Slice) bool { s.validateNotDestroyed() + if _, ok := slice.(*SliceImpl); !ok { + panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) + } + return s != slice && s.scope.CanMergeByRange(slice.Scope()) } @@ -195,10 +199,7 @@ func (s *SliceImpl) MergeByRange(slice Slice) Slice { panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v by range", s.scope, slice.Scope())) } - incomingSlice, ok := slice.(*SliceImpl) - if !ok { - panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) - } + incomingSlice := slice.(*SliceImpl) s.destroy() incomingSlice.destroy() @@ -213,6 +214,10 @@ func (s *SliceImpl) MergeByRange(slice Slice) Slice { func (s *SliceImpl) CanMergeByPredicate(slice Slice) bool { s.validateNotDestroyed() + if _, ok := slice.(*SliceImpl); !ok { + panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) + } + return s != slice && s.scope.CanMergeByPredicate(slice.Scope()) } @@ -221,10 +226,7 @@ func (s *SliceImpl) MergeByPredicate(slice Slice) Slice { panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v by predicate", s.scope, slice.Scope())) } - incomingSlice, ok := slice.(*SliceImpl) - if !ok { - panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) - } + incomingSlice := slice.(*SliceImpl) s.destroy() incomingSlice.destroy() From 44073e670b71cd4918593e07cb318e8f4101d935 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 16 Jun 2022 14:40:16 -0700 Subject: [PATCH 4/6] update shrink range --- service/history/queues/slice.go | 49 +++++++++++++--------------- service/history/queues/slice_test.go | 4 ++- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/service/history/queues/slice.go b/service/history/queues/slice.go index b9fe31b0263..c9015c55df8 100644 --- a/service/history/queues/slice.go +++ b/service/history/queues/slice.go @@ -26,7 +26,6 @@ package queues import ( "fmt" - "sort" ctasks "go.temporal.io/server/common/tasks" "go.temporal.io/server/service/history/tasks" @@ -187,10 +186,6 @@ func (s *SliceImpl) splitExecutables( func (s *SliceImpl) CanMergeByRange(slice Slice) bool { s.validateNotDestroyed() - if _, ok := slice.(*SliceImpl); !ok { - panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) - } - return s != slice && s.scope.CanMergeByRange(slice.Scope()) } @@ -199,7 +194,10 @@ func (s *SliceImpl) MergeByRange(slice Slice) Slice { panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v by range", s.scope, slice.Scope())) } - incomingSlice := slice.(*SliceImpl) + incomingSlice, ok := slice.(*SliceImpl) + if !ok { + panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) + } s.destroy() incomingSlice.destroy() @@ -214,10 +212,6 @@ func (s *SliceImpl) MergeByRange(slice Slice) Slice { func (s *SliceImpl) CanMergeByPredicate(slice Slice) bool { s.validateNotDestroyed() - if _, ok := slice.(*SliceImpl); !ok { - panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) - } - return s != slice && s.scope.CanMergeByPredicate(slice.Scope()) } @@ -226,7 +220,10 @@ func (s *SliceImpl) MergeByPredicate(slice Slice) Slice { panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v by predicate", s.scope, slice.Scope())) } - incomingSlice := slice.(*SliceImpl) + incomingSlice, ok := slice.(*SliceImpl) + if !ok { + panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) + } s.destroy() incomingSlice.destroy() @@ -299,30 +296,30 @@ func (s *SliceImpl) appendIterator( func (s *SliceImpl) ShrinkRange() { s.validateNotDestroyed() - var taskKeys tasks.Keys - taskKeys = make([]tasks.Key, 0, len(s.outstandingExecutables)) + minTaskKey := tasks.MaximumKey for key := range s.outstandingExecutables { - taskKeys = append(taskKeys, key) - } - sort.Sort(taskKeys) - - for _, key := range taskKeys { if s.outstandingExecutables[key].State() == ctasks.TaskStateAcked { delete(s.outstandingExecutables, key) continue } - s.scope.Range.InclusiveMin = key - break + minTaskKey = tasks.MinKey(minTaskKey, key) } - if len(s.outstandingExecutables) == 0 { - if len(s.iterators) == 0 { - s.scope.Range.InclusiveMin = s.scope.Range.ExclusiveMax - } else { - s.scope.Range.InclusiveMin = s.iterators[0].Range().InclusiveMin - } + // still has pending task in memory + if len(s.outstandingExecutables) != 0 { + s.scope.Range.InclusiveMin = minTaskKey + return + } + + // no more pending task in memory, but has more tasks to read from persistence + if len(s.iterators) != 0 { + s.scope.Range.InclusiveMin = s.iterators[0].Range().InclusiveMin + return } + + // no pending task in memory and persistence + s.scope.Range.InclusiveMin = s.scope.Range.ExclusiveMax } func (s *SliceImpl) SelectTasks(batchSize int) ([]Executable, error) { diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go index 4cc7e5f760b..15b4c2ddb3b 100644 --- a/service/history/queues/slice_test.go +++ b/service/history/queues/slice_test.go @@ -312,11 +312,13 @@ func (s *sliceSuite) TestShrinkRange() { }) firstPendingIdx := len(executables) + numAcked := 0 for idx, executable := range executables { mockExecutable := executable.(*MockExecutable) acked := rand.Intn(10) < 8 if acked { mockExecutable.EXPECT().State().Return(ctasks.TaskStateAcked).MaxTimes(1) + numAcked++ } else { mockExecutable.EXPECT().State().Return(ctasks.TaskStatePending).MaxTimes(1) if firstPendingIdx == len(executables) { @@ -328,7 +330,7 @@ func (s *sliceSuite) TestShrinkRange() { } slice.ShrinkRange() - s.Len(slice.outstandingExecutables, len(executables)-firstPendingIdx) + s.Len(slice.outstandingExecutables, len(executables)-numAcked) newInclusiveMin := r.ExclusiveMax From 57e7f5ef3dd581c394794d52a65fd3877024d2a1 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 23 Jun 2022 10:46:44 -0700 Subject: [PATCH 5/6] pr comments --- service/history/queues/iterator.go | 6 +- service/history/queues/range.go | 2 +- service/history/queues/range_test.go | 24 +-- service/history/queues/scope.go | 19 +- service/history/queues/slice.go | 171 +++++++++++----- service/history/queues/slice_test.go | 290 ++++++++++++++++----------- service/history/queues/test_util.go | 22 ++ 7 files changed, 347 insertions(+), 187 deletions(-) diff --git a/service/history/queues/iterator.go b/service/history/queues/iterator.go index 92fe8ca3952..fb31a6729d2 100644 --- a/service/history/queues/iterator.go +++ b/service/history/queues/iterator.go @@ -43,10 +43,10 @@ type ( Remaining() Iterator } - paginationFnProvider func(Range) collection.PaginationFn[tasks.Task] + PaginationFnProvider func(Range) collection.PaginationFn[tasks.Task] IteratorImpl struct { - paginationFnProvider paginationFnProvider + paginationFnProvider PaginationFnProvider remainingRange Range pagingIterator collection.Iterator[tasks.Task] @@ -54,7 +54,7 @@ type ( ) func NewIterator( - paginationFnProvider paginationFnProvider, + paginationFnProvider PaginationFnProvider, r Range, ) *IteratorImpl { return &IteratorImpl{ diff --git a/service/history/queues/range.go b/service/history/queues/range.go index 85101f4507e..d77016c7220 100644 --- a/service/history/queues/range.go +++ b/service/history/queues/range.go @@ -105,7 +105,7 @@ func (r *Range) Merge( ) } -func (r *Range) Equal( +func (r *Range) Equals( input Range, ) bool { return r.InclusiveMin.CompareTo(input.InclusiveMin) == 0 && diff --git a/service/history/queues/range_test.go b/service/history/queues/range_test.go index bfa312c49ad..fc34091b399 100644 --- a/service/history/queues/range_test.go +++ b/service/history/queues/range_test.go @@ -286,8 +286,8 @@ func (s *rangeSuite) TestSplit() { splitKey := NewRandomKeyInRange(r) left, right := r.Split(splitKey) - s.True(left.Equal(NewRange(r.InclusiveMin, splitKey))) - s.True(right.Equal(NewRange(splitKey, r.ExclusiveMax))) + s.True(left.Equals(NewRange(r.InclusiveMin, splitKey))) + s.True(right.Equals(NewRange(splitKey, r.ExclusiveMax))) } func (s *rangeSuite) TestMerge() { @@ -298,29 +298,29 @@ func (s *rangeSuite) TestMerge() { NewRandomKeyInRange(r), ) mergedRange := r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(tasks.MinimumKey, r.ExclusiveMax))) testRange = NewRange( NewRandomKeyInRange(r), tasks.MaximumKey, ) mergedRange = r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(r.InclusiveMin, tasks.MaximumKey))) testRange = NewRange(tasks.MinimumKey, tasks.MaximumKey) mergedRange = r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(tasks.MinimumKey, tasks.MaximumKey))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(tasks.MinimumKey, tasks.MaximumKey))) testRange = NewRange(tasks.MinimumKey, r.InclusiveMin) mergedRange = r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(tasks.MinimumKey, r.ExclusiveMax))) testRange = NewRange(r.ExclusiveMax, tasks.MaximumKey) mergedRange = r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(r.InclusiveMin, tasks.MaximumKey))) } diff --git a/service/history/queues/scope.go b/service/history/queues/scope.go index 82abcf8d119..c8900ea97d2 100644 --- a/service/history/queues/scope.go +++ b/service/history/queues/scope.go @@ -108,7 +108,7 @@ func (s *Scope) MergeByRange( func (s *Scope) CanMergeByPredicate( incomingScope Scope, ) bool { - return s.Range.Equal(incomingScope.Range) + return s.Range.Equals(incomingScope.Range) } func (s *Scope) MergeByPredicate( @@ -121,3 +121,20 @@ func (s *Scope) MergeByPredicate( // TODO: special check if the predicates are the same type return NewScope(s.Range, predicates.Or(s.Predicate, incomingScope.Predicate)) } + +func (s *Scope) IsEmpty() bool { + if s.Range.IsEmpty() { + return true + } + + if _, ok := s.Predicate.(*predicates.EmptyImpl[tasks.Task]); ok { + return true + } + + return false +} + +func (s *Scope) Equals(scope Scope) bool { + return s.Range.Equals(scope.Range) && + s.Predicate.Equals(scope.Predicate) +} diff --git a/service/history/queues/slice.go b/service/history/queues/slice.go index c9015c55df8..417d74a57dd 100644 --- a/service/history/queues/slice.go +++ b/service/history/queues/slice.go @@ -32,23 +32,27 @@ import ( ) type ( + + // Slice manages the loading and status tracking of all + // tasks within its Scope. + // It also provides methods for splitting or merging with + // another slice either by range or by predicate. Slice interface { Scope() Scope CanSplitByRange(tasks.Key) bool SplitByRange(tasks.Key) (left Slice, right Slice) SplitByPredicate(tasks.Predicate) (pass Slice, fail Slice) - CanMergeByRange(Slice) bool - MergeByRange(Slice) Slice - CanMergeByPredicate(Slice) bool - MergeByPredicate(Slice) Slice + CanMergeWithSlice(Slice) bool + MergeWithSlice(Slice) []Slice ShrinkRange() SelectTasks(int) ([]Executable, error) } - executableInitializer func(tasks.Task) Executable + ExecutableInitializer func(tasks.Task) Executable SliceImpl struct { - executableInitializer executableInitializer + paginationFnProvider PaginationFnProvider + executableInitializer ExecutableInitializer destroyed bool @@ -56,43 +60,46 @@ type ( iterators []Iterator // TODO: make task tracking a separate component - // and evaluate the performance of using a btree - // for storing executables outstandingExecutables map[tasks.Key]Executable } ) func NewSlice( - paginationFnProvider paginationFnProvider, - executableInitializer executableInitializer, + paginationFnProvider PaginationFnProvider, + executableInitializer ExecutableInitializer, scope Scope, ) *SliceImpl { return &SliceImpl{ + paginationFnProvider: paginationFnProvider, executableInitializer: executableInitializer, scope: scope, outstandingExecutables: make(map[tasks.Key]Executable), iterators: []Iterator{ NewIterator(paginationFnProvider, scope.Range), }, - destroyed: false, } } func (s *SliceImpl) Scope() Scope { - s.validateNotDestroyed() + s.stateSanityCheck() return s.scope } func (s *SliceImpl) CanSplitByRange(key tasks.Key) bool { - s.validateNotDestroyed() + s.stateSanityCheck() return s.scope.CanSplitByRange(key) } -func (s *SliceImpl) SplitByRange(key tasks.Key) (leftSlice Slice, rightSlice Slice) { +func (s *SliceImpl) SplitByRange(key tasks.Key) (left Slice, right Slice) { if !s.CanSplitByRange(key) { panic(fmt.Sprintf("Unable to split queue slice with range %v at %v", s.scope.Range, key)) } + return s.splitByRange(key) +} + +func (s *SliceImpl) splitByRange(key tasks.Key) (left *SliceImpl, right *SliceImpl) { + leftScope, rightScope := s.scope.SplitByRange(key) leftExecutables, rightExecutables := s.splitExecutables(leftScope, rightScope) @@ -115,13 +122,15 @@ func (s *SliceImpl) SplitByRange(key tasks.Key) (leftSlice Slice, rightSlice Sli rightIterators = append(rightIterators, rightIter) } - leftSlice = &SliceImpl{ + left = &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, executableInitializer: s.executableInitializer, scope: leftScope, outstandingExecutables: leftExecutables, iterators: leftIterators, } - rightSlice = &SliceImpl{ + right = &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, executableInitializer: s.executableInitializer, scope: rightScope, outstandingExecutables: rightExecutables, @@ -129,11 +138,11 @@ func (s *SliceImpl) SplitByRange(key tasks.Key) (leftSlice Slice, rightSlice Sli } s.destroy() - return leftSlice, rightSlice + return left, right } -func (s *SliceImpl) SplitByPredicate(predicate tasks.Predicate) (passSlice Slice, failSlice Slice) { - s.validateNotDestroyed() +func (s *SliceImpl) SplitByPredicate(predicate tasks.Predicate) (pass Slice, fail Slice) { + s.stateSanityCheck() passScope, failScope := s.scope.SplitByPredicate(predicate) passExecutables, failExecutables := s.splitExecutables(passScope, failScope) @@ -141,17 +150,20 @@ func (s *SliceImpl) SplitByPredicate(predicate tasks.Predicate) (passSlice Slice passIterators := make([]Iterator, 0, len(s.iterators)) failIterators := make([]Iterator, 0, len(s.iterators)) for _, iter := range s.iterators { + // iter.Remaining() is basically a deep copy of iter passIterators = append(passIterators, iter) failIterators = append(failIterators, iter.Remaining()) } - passSlice = &SliceImpl{ + pass = &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, executableInitializer: s.executableInitializer, scope: passScope, outstandingExecutables: passExecutables, iterators: passIterators, } - failSlice = &SliceImpl{ + fail = &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, executableInitializer: s.executableInitializer, scope: failScope, outstandingExecutables: failExecutables, @@ -159,7 +171,7 @@ func (s *SliceImpl) SplitByPredicate(predicate tasks.Predicate) (passSlice Slice } s.destroy() - return passSlice, failSlice + return pass, fail } func (s *SliceImpl) splitExecutables( @@ -183,15 +195,19 @@ func (s *SliceImpl) splitExecutables( return thisExecutable, thatExecutables } -func (s *SliceImpl) CanMergeByRange(slice Slice) bool { - s.validateNotDestroyed() +func (s *SliceImpl) CanMergeWithSlice(slice Slice) bool { + s.stateSanityCheck() - return s != slice && s.scope.CanMergeByRange(slice.Scope()) + return s != slice && s.scope.Range.CanMerge(slice.Scope().Range) } -func (s *SliceImpl) MergeByRange(slice Slice) Slice { - if !s.CanMergeByRange(slice) { - panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v by range", s.scope, slice.Scope())) +func (s *SliceImpl) MergeWithSlice(slice Slice) []Slice { + if s.scope.Range.InclusiveMin.CompareTo(slice.Scope().Range.InclusiveMin) > 0 { + return slice.MergeWithSlice(s) + } + + if !s.CanMergeWithSlice(slice) { + panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v", s.scope, slice.Scope())) } incomingSlice, ok := slice.(*SliceImpl) @@ -199,40 +215,68 @@ func (s *SliceImpl) MergeByRange(slice Slice) Slice { panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) } - s.destroy() - incomingSlice.destroy() - return &SliceImpl{ - executableInitializer: s.executableInitializer, - scope: s.scope.MergeByRange(incomingSlice.scope), - outstandingExecutables: s.mergeExecutables(incomingSlice), - iterators: s.mergeIterators(incomingSlice), + if s.scope.CanMergeByRange(incomingSlice.scope) { + return []Slice{s.mergeByRange(incomingSlice)} } -} - -func (s *SliceImpl) CanMergeByPredicate(slice Slice) bool { - s.validateNotDestroyed() - return s != slice && s.scope.CanMergeByPredicate(slice.Scope()) -} + mergedSlices := make([]Slice, 0, 3) + currentLeftSlice, currentRightSlice := s.splitByRange(incomingSlice.Scope().Range.InclusiveMin) + if !currentLeftSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, currentLeftSlice) + } -func (s *SliceImpl) MergeByPredicate(slice Slice) Slice { - if !s.CanMergeByPredicate(slice) { - panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v by predicate", s.scope, slice.Scope())) + if currentRightMax := currentRightSlice.Scope().Range.ExclusiveMax; incomingSlice.CanSplitByRange(currentRightMax) { + leftIncomingSlice, rightIncomingSlice := incomingSlice.splitByRange(currentRightMax) + mergedMidSlice := currentRightSlice.mergeByPredicate(leftIncomingSlice) + if !mergedMidSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, mergedMidSlice) + } + if !rightIncomingSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, rightIncomingSlice) + } + } else { + currentMidSlice, currentRightSlice := currentRightSlice.splitByRange(incomingSlice.Scope().Range.ExclusiveMax) + mergedMidSlice := currentMidSlice.mergeByPredicate(incomingSlice) + if !mergedMidSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, mergedMidSlice) + } + if !currentRightSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, currentRightSlice) + } } - incomingSlice, ok := slice.(*SliceImpl) - if !ok { - panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) + return mergedSlices + +} + +func (s *SliceImpl) mergeByRange(incomingSlice *SliceImpl) *SliceImpl { + mergedSlice := &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, + executableInitializer: s.executableInitializer, + scope: s.scope.MergeByRange(incomingSlice.scope), + outstandingExecutables: s.mergeExecutables(incomingSlice), + iterators: s.mergeIterators(incomingSlice), } s.destroy() incomingSlice.destroy() - return &SliceImpl{ + + return mergedSlice +} + +func (s *SliceImpl) mergeByPredicate(incomingSlice *SliceImpl) *SliceImpl { + mergedSlice := &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, executableInitializer: s.executableInitializer, scope: s.scope.MergeByPredicate(incomingSlice.scope), outstandingExecutables: s.mergeExecutables(incomingSlice), iterators: s.mergeIterators(incomingSlice), } + + s.destroy() + incomingSlice.destroy() + + return mergedSlice } func (s *SliceImpl) mergeExecutables(incomingSlice *SliceImpl) map[tasks.Key]Executable { @@ -273,6 +317,8 @@ func (s *SliceImpl) mergeIterators(incomingSlice *SliceImpl) []Iterator { mergedIterators = s.appendIterator(mergedIterators, iterator) } + validateIteratorsOrderedDisjoint(mergedIterators) + return mergedIterators } @@ -294,7 +340,7 @@ func (s *SliceImpl) appendIterator( } func (s *SliceImpl) ShrinkRange() { - s.validateNotDestroyed() + s.stateSanityCheck() minTaskKey := tasks.MaximumKey for key := range s.outstandingExecutables { @@ -307,7 +353,7 @@ func (s *SliceImpl) ShrinkRange() { } // still has pending task in memory - if len(s.outstandingExecutables) != 0 { + if minTaskKey != tasks.MaximumKey { s.scope.Range.InclusiveMin = minTaskKey return } @@ -323,7 +369,7 @@ func (s *SliceImpl) ShrinkRange() { } func (s *SliceImpl) SelectTasks(batchSize int) ([]Executable, error) { - s.validateNotDestroyed() + s.stateSanityCheck() if len(s.iterators) == 0 { return []Executable{}, nil @@ -361,10 +407,31 @@ func (s *SliceImpl) SelectTasks(batchSize int) ([]Executable, error) { func (s *SliceImpl) destroy() { s.destroyed = true + s.iterators = nil + s.outstandingExecutables = nil } -func (s *SliceImpl) validateNotDestroyed() { +func (s *SliceImpl) stateSanityCheck() { if s.destroyed { panic("Can not invoke method on destroyed queue slice") } } + +func validateIteratorsOrderedDisjoint( + iterators []Iterator, +) { + if len(iterators) <= 1 { + return + } + + for idx, iterator := range iterators[:len(iterators)-1] { + nextIterator := iterators[idx+1] + if iterator.Range().ExclusiveMax.CompareTo(nextIterator.Range().InclusiveMin) >= 0 { + panic(fmt.Sprintf( + "Found overlapping iterators in iterator list, left range: %v, right range: %v", + iterator.Range(), + nextIterator.Range(), + )) + } + } +} diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go index 15b4c2ddb3b..ff7c2b2a72a 100644 --- a/service/history/queues/slice_test.go +++ b/service/history/queues/slice_test.go @@ -49,7 +49,7 @@ type ( controller *gomock.Controller - executableInitializer executableInitializer + executableInitializer ExecutableInitializer } ) @@ -95,61 +95,45 @@ func (s *sliceSuite) TestCanSplitByRange() { func (s *sliceSuite) TestSplitByRange() { r := NewRandomRange() - predicate := predicates.All[tasks.Task]() - scope := NewScope(r, predicates.All[tasks.Task]()) - - slice := NewSlice(nil, s.executableInitializer, scope) - for _, executable := range s.randomExecutablesInRange(r, 10) { - slice.outstandingExecutables[executable.GetKey()] = executable - } - slice.iterators = s.randomIteratorsInRange(r, 5, nil) + slice := s.newTestSlice(r, nil, nil) splitKey := NewRandomKeyInRange(r) leftSlice, rightSlice := slice.SplitByRange(splitKey) s.Equal(NewScope( NewRange(r.InclusiveMin, splitKey), - predicate, + slice.scope.Predicate, ), leftSlice.Scope()) s.Equal(NewScope( NewRange(splitKey, r.ExclusiveMax), - predicate, + slice.scope.Predicate, ), rightSlice.Scope()) s.validateSliceState(leftSlice.(*SliceImpl)) s.validateSliceState(rightSlice.(*SliceImpl)) - s.Panics(func() { slice.validateNotDestroyed() }) + s.Panics(func() { slice.stateSanityCheck() }) } func (s *sliceSuite) TestSplitByPredicate() { r := NewRandomRange() namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} - predicate := tasks.NewNamespacePredicate(namespaceIDs) - scope := NewScope(r, predicate) - - slice := NewSlice(nil, s.executableInitializer, scope) - for _, executable := range s.randomExecutablesInRange(r, 10) { - mockExecutable := executable.(*MockExecutable) - mockExecutable.EXPECT().GetNamespaceID().Return(namespaceIDs[rand.Intn(len(namespaceIDs))]).AnyTimes() - slice.outstandingExecutables[executable.GetKey()] = executable - } - slice.iterators = s.randomIteratorsInRange(r, 5, nil) + slice := s.newTestSlice(r, namespaceIDs, nil) splitNamespaceIDs := append(slices.Clone(namespaceIDs[:rand.Intn(len(namespaceIDs))]), uuid.New(), uuid.New()) splitPredicate := tasks.NewNamespacePredicate(splitNamespaceIDs) passSlice, failSlice := slice.SplitByPredicate(splitPredicate) s.Equal(r, passSlice.Scope().Range) s.Equal(r, failSlice.Scope().Range) - s.True(predicates.And[tasks.Task](predicate, splitPredicate).Equals(passSlice.Scope().Predicate)) - s.True(predicates.And[tasks.Task](predicate, predicates.Not[tasks.Task](splitPredicate)).Equals(failSlice.Scope().Predicate)) + s.True(predicates.And[tasks.Task](slice.scope.Predicate, splitPredicate).Equals(passSlice.Scope().Predicate)) + s.True(predicates.And(slice.scope.Predicate, predicates.Not[tasks.Task](splitPredicate)).Equals(failSlice.Scope().Predicate)) s.validateSliceState(passSlice.(*SliceImpl)) s.validateSliceState(failSlice.(*SliceImpl)) - s.Panics(func() { slice.validateNotDestroyed() }) + s.Panics(func() { slice.stateSanityCheck() }) } -func (s *sliceSuite) TestCanMergeByRange() { +func (s *sliceSuite) TestCanMergeWithSlice() { r := NewRandomRange() namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} predicate := tasks.NewNamespacePredicate(namespaceIDs) @@ -165,138 +149,136 @@ func (s *sliceSuite) TestCanMergeByRange() { s.False(predicate.Equals(testPredicates[2])) for _, mergePredicate := range testPredicates { - canMerge := predicate.Equals(mergePredicate) - testSlice := NewSlice(nil, nil, NewScope(r, mergePredicate)) - s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + s.True(slice.CanMergeWithSlice(testSlice)) testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, r.InclusiveMin), mergePredicate)) - s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + s.True(slice.CanMergeWithSlice(testSlice)) testSlice = NewSlice(nil, nil, NewScope(NewRange(r.ExclusiveMax, tasks.MaximumKey), mergePredicate)) - s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + s.True(slice.CanMergeWithSlice(testSlice)) testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, NewRandomKeyInRange(r)), mergePredicate)) - s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + s.True(slice.CanMergeWithSlice(testSlice)) testSlice = NewSlice(nil, nil, NewScope(NewRange(NewRandomKeyInRange(r), tasks.MaximumKey), mergePredicate)) - s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + s.True(slice.CanMergeWithSlice(testSlice)) testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, tasks.MaximumKey), mergePredicate)) - s.Equal(canMerge, slice.CanMergeByRange(testSlice)) + s.True(slice.CanMergeWithSlice(testSlice)) } - s.False(slice.CanMergeByRange(slice)) + s.False(slice.CanMergeWithSlice(slice)) testSlice := NewSlice(nil, nil, NewScope(NewRange( tasks.MinimumKey, tasks.NewKey(r.InclusiveMin.FireTime, r.InclusiveMin.TaskID-1), ), predicate)) - s.False(slice.CanMergeByRange(testSlice)) + s.False(slice.CanMergeWithSlice(testSlice)) testSlice = NewSlice(nil, nil, NewScope(NewRange( tasks.NewKey(r.ExclusiveMax.FireTime, r.ExclusiveMax.TaskID+1), tasks.MaximumKey, ), predicate)) - s.False(slice.CanMergeByRange(testSlice)) + s.False(slice.CanMergeWithSlice(testSlice)) } -func (s *sliceSuite) TestMergeByRange() { +func (s *sliceSuite) TestMergeWithSlice_SamePredicate() { r := NewRandomRange() - predicate := predicates.All[tasks.Task]() - - slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) - for _, executable := range s.randomExecutablesInRange(r, rand.Intn(20)) { - slice.outstandingExecutables[executable.GetKey()] = executable - } + slice := s.newTestSlice(r, nil, nil) totalExecutables := len(slice.outstandingExecutables) - slice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) incomingRange := NewRange(tasks.MinimumKey, NewRandomKeyInRange(r)) - incomingSlice := NewSlice(nil, s.executableInitializer, NewScope(incomingRange, predicate)) - for _, executable := range s.randomExecutablesInRange(incomingRange, rand.Intn(20)) { - incomingSlice.outstandingExecutables[executable.GetKey()] = executable - } + incomingSlice := s.newTestSlice(incomingRange, nil, nil) totalExecutables += len(incomingSlice.outstandingExecutables) - incomingSlice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) - mergedSlice := slice.MergeByRange(incomingSlice) - mergedSliceImpl := mergedSlice.(*SliceImpl) + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 1) - s.Equal(NewScope( - NewRange(tasks.MinimumKey, r.ExclusiveMax), - predicate, - ), mergedSlice.Scope()) + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) +} - s.validateSliceState(mergedSliceImpl) - s.Len(mergedSliceImpl.outstandingExecutables, totalExecutables) +func (s *sliceSuite) TestMergeWithSlice_SameRange() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + slice := s.newTestSlice(r, namespaceIDs, nil) + totalExecutables := len(slice.outstandingExecutables) - s.Panics(func() { slice.validateNotDestroyed() }) - s.Panics(func() { incomingSlice.validateNotDestroyed() }) + taskTypes := []enumsspb.TaskType{ + enumsspb.TASK_TYPE_ACTIVITY_RETRY_TIMER, + enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, + } + incomingSlice := s.newTestSlice(r, nil, taskTypes) + totalExecutables += len(incomingSlice.outstandingExecutables) + + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 1) + + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) } -func (s *sliceSuite) TestCanMergeByPredicate() { +func (s *sliceSuite) TestMergeWithSlice_SameMinKey() { r := NewRandomRange() namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} - predicate := tasks.NewNamespacePredicate(namespaceIDs) - slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + slice := s.newTestSlice(r, namespaceIDs, nil) + totalExecutables := len(slice.outstandingExecutables) + + incomingRange := NewRange( + r.InclusiveMin, + NewRandomKeyInRange(NewRange(r.InclusiveMin, tasks.MaximumKey)), + ) + incomingNamespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + incomingSlice := s.newTestSlice(incomingRange, incomingNamespaceIDs, nil) + totalExecutables += len(incomingSlice.outstandingExecutables) + + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 2) - testSlice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) - s.True(slice.CanMergeByPredicate(testSlice)) + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) +} - testSlice = NewSlice(nil, s.executableInitializer, NewScope(r, tasks.NewTypePredicate([]enumsspb.TaskType{}))) - s.True(slice.CanMergeByPredicate(testSlice)) +func (s *sliceSuite) TestMergeWithSlice_SameMaxKey() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + slice := s.newTestSlice(r, namespaceIDs, nil) + totalExecutables := len(slice.outstandingExecutables) - s.False(slice.CanMergeByPredicate(slice)) + incomingRange := NewRange( + NewRandomKeyInRange(NewRange(tasks.MinimumKey, r.ExclusiveMax)), + r.ExclusiveMax, + ) + incomingNamespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + incomingSlice := s.newTestSlice(incomingRange, incomingNamespaceIDs, nil) + totalExecutables += len(incomingSlice.outstandingExecutables) - testSlice = NewSlice(nil, s.executableInitializer, NewScope(NewRandomRange(), predicate)) - s.False(slice.CanMergeByPredicate(testSlice)) + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 2) - testSlice = NewSlice(nil, s.executableInitializer, NewScope(NewRandomRange(), predicates.All[tasks.Task]())) - s.False(slice.CanMergeByPredicate(testSlice)) + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) } -func (s *sliceSuite) TestMergeByPredicate() { +func (s *sliceSuite) TestMergeWithSlice_DifferentMinMaxKey() { r := NewRandomRange() namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} - predicate := tasks.NewNamespacePredicate(namespaceIDs) - - slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) - for _, executable := range s.randomExecutablesInRange(r, rand.Intn(20)) { - mockExecutable := executable.(*MockExecutable) - mockExecutable.EXPECT().GetNamespaceID().Return(namespaceIDs[rand.Intn(len(namespaceIDs))]).AnyTimes() - mockExecutable.EXPECT().GetType().Return(enumsspb.TASK_TYPE_TRANSFER_CLOSE_EXECUTION).AnyTimes() - slice.outstandingExecutables[executable.GetKey()] = executable - } + slice := s.newTestSlice(r, namespaceIDs, nil) totalExecutables := len(slice.outstandingExecutables) - slice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) - taskTypes := []enumsspb.TaskType{ - enumsspb.TASK_TYPE_ACTIVITY_RETRY_TIMER, - enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, - } - incomingPredicate := tasks.NewTypePredicate(taskTypes) - incomingSlice := NewSlice(nil, s.executableInitializer, NewScope(r, incomingPredicate)) - for _, executable := range s.randomExecutablesInRange(r, rand.Intn(20)) { - mockExecutable := executable.(*MockExecutable) - mockExecutable.EXPECT().GetNamespaceID().Return(uuid.New()).AnyTimes() - mockExecutable.EXPECT().GetType().Return(taskTypes[rand.Intn(len(taskTypes))]).AnyTimes() - incomingSlice.outstandingExecutables[executable.GetKey()] = executable - } + incomingMinKey := NewRandomKeyInRange(NewRange(r.InclusiveMin, r.ExclusiveMax)) + incomingRange := NewRange( + incomingMinKey, + NewRandomKeyInRange(NewRange(incomingMinKey, tasks.MaximumKey)), + ) + incomingNamespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + incomingSlice := s.newTestSlice(incomingRange, incomingNamespaceIDs, nil) totalExecutables += len(incomingSlice.outstandingExecutables) - incomingSlice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) - - mergedSlice := slice.MergeByPredicate(incomingSlice) - mergedSliceImpl := mergedSlice.(*SliceImpl) - s.Equal(r, mergedSlice.Scope().Range) - s.True(predicates.Or[tasks.Task](predicate, incomingPredicate).Equals(mergedSlice.Scope().Predicate)) + s.validateSliceState(slice) + s.validateSliceState(incomingSlice) - s.validateSliceState(mergedSliceImpl) - s.Len(mergedSliceImpl.outstandingExecutables, totalExecutables) + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 3) - s.Panics(func() { slice.validateNotDestroyed() }) - s.Panics(func() { incomingSlice.validateNotDestroyed() }) + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) } func (s *sliceSuite) TestShrinkRange() { @@ -434,6 +416,89 @@ func (s *sliceSuite) TestSelectTasks_Error() { s.Empty(slice.iterators) } +func (s *sliceSuite) newTestSlice( + r Range, + namespaceIDs []string, + taskTypes []enumsspb.TaskType, +) *SliceImpl { + predicate := predicates.All[tasks.Task]() + if len(namespaceIDs) != 0 { + predicate = predicates.And[tasks.Task]( + predicate, + tasks.NewNamespacePredicate(namespaceIDs), + ) + } + if len(taskTypes) != 0 { + predicate = predicates.And[tasks.Task]( + predicate, + tasks.NewTypePredicate(taskTypes), + ) + } + + if len(namespaceIDs) == 0 { + namespaceIDs = []string{uuid.New()} + } + + if len(taskTypes) == 0 { + taskTypes = []enumsspb.TaskType{enumsspb.TASK_TYPE_TRANSFER_CLOSE_EXECUTION} + } + + slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + for _, executable := range s.randomExecutablesInRange(r, rand.Intn(20)) { + slice.outstandingExecutables[executable.GetKey()] = executable + + mockExecutable := executable.(*MockExecutable) + mockExecutable.EXPECT().GetNamespaceID().Return(namespaceIDs[rand.Intn(len(namespaceIDs))]).AnyTimes() + mockExecutable.EXPECT().GetType().Return(taskTypes[rand.Intn(len(taskTypes))]).AnyTimes() + } + slice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) + + return slice +} + +func (s *sliceSuite) validateMergedSlice( + currentSlice *SliceImpl, + incomingSlice *SliceImpl, + mergedSlices []Slice, + expectedTotalExecutables int, +) { + expectedMergedRange := currentSlice.scope.Range.Merge(incomingSlice.scope.Range) + actualMergedRange := mergedSlices[0].Scope().Range + for _, mergedSlice := range mergedSlices { + actualMergedRange = actualMergedRange.Merge(mergedSlice.Scope().Range) + + containedByCurrent := currentSlice.scope.Range.ContainsRange(mergedSlice.Scope().Range) + containedByIncoming := incomingSlice.scope.Range.ContainsRange(mergedSlice.Scope().Range) + if containedByCurrent && containedByIncoming { + s.True(predicates.Or( + currentSlice.scope.Predicate, + incomingSlice.scope.Predicate, + ).Equals(mergedSlice.Scope().Predicate)) + } else if containedByCurrent { + s.True(currentSlice.scope.Predicate.Equals(mergedSlice.Scope().Predicate)) + } else if containedByIncoming { + s.True(incomingSlice.scope.Predicate.Equals(mergedSlice.Scope().Predicate)) + } else if currentSlice.scope.Predicate.Equals(incomingSlice.scope.Predicate) { + s.True(currentSlice.scope.Predicate.Equals(mergedSlice.Scope().Predicate)) + } else { + s.Fail("Merged slice range not contained by the merging slices") + } + + } + s.True(expectedMergedRange.Equals(actualMergedRange)) + + actualTotalExecutables := 0 + for _, mergedSlice := range mergedSlices { + mergedSliceImpl := mergedSlice.(*SliceImpl) + s.validateSliceState(mergedSliceImpl) + actualTotalExecutables += len(mergedSliceImpl.outstandingExecutables) + } + s.Equal(expectedTotalExecutables, actualTotalExecutables) + + s.Panics(func() { currentSlice.stateSanityCheck() }) + s.Panics(func() { incomingSlice.stateSanityCheck() }) +} + func (s *sliceSuite) validateSliceState( slice *SliceImpl, ) { @@ -474,20 +539,9 @@ func (s *sliceSuite) randomExecutablesInRange( func (s *sliceSuite) randomIteratorsInRange( r Range, numIterators int, - paginationFnProvider paginationFnProvider, + paginationFnProvider PaginationFnProvider, ) []Iterator { - ranges := []Range{r} - for len(ranges) < numIterators { - r := ranges[0] - left, right := r.Split(NewRandomKeyInRange(r)) - left.ExclusiveMax.FireTime.Add(-time.Nanosecond) - right.InclusiveMin.FireTime.Add(time.Nanosecond) - ranges = append(ranges[1:], left, right) - } - - slices.SortFunc(ranges, func(a, b Range) bool { - return a.InclusiveMin.CompareTo(b.InclusiveMin) < 0 - }) + ranges := NewRandomOrderedRangesInRange(r, numIterators) iterators := make([]Iterator, 0, numIterators) for _, r := range ranges { diff --git a/service/history/queues/test_util.go b/service/history/queues/test_util.go index 9bda1bd1077..febda1786aa 100644 --- a/service/history/queues/test_util.go +++ b/service/history/queues/test_util.go @@ -29,6 +29,8 @@ import ( "math/rand" "time" + "golang.org/x/exp/slices" + "go.temporal.io/server/service/history/tasks" ) @@ -81,3 +83,23 @@ func NewRandomKeyInRange( return tasks.NewKey(fireTime, rand.Int63()) } + +func NewRandomOrderedRangesInRange( + r Range, + numRanges int, +) []Range { + ranges := []Range{r} + for len(ranges) < numRanges { + r := ranges[0] + left, right := r.Split(NewRandomKeyInRange(r)) + left.ExclusiveMax.FireTime.Add(-time.Nanosecond) + right.InclusiveMin.FireTime.Add(time.Nanosecond) + ranges = append(ranges[1:], left, right) + } + + slices.SortFunc(ranges, func(a, b Range) bool { + return a.InclusiveMin.CompareTo(b.InclusiveMin) < 0 + }) + + return ranges +} From 8f051a11ea6fa90be29443f03ea1022d217e2d6b Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 24 Jun 2022 11:39:54 -0700 Subject: [PATCH 6/6] fix shrink range --- service/history/queues/slice.go | 24 ++++++++++++------------ service/history/queues/slice_test.go | 12 ++++++------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/service/history/queues/slice.go b/service/history/queues/slice.go index 417d74a57dd..ef8d0084d7e 100644 --- a/service/history/queues/slice.go +++ b/service/history/queues/slice.go @@ -342,30 +342,30 @@ func (s *SliceImpl) appendIterator( func (s *SliceImpl) ShrinkRange() { s.stateSanityCheck() - minTaskKey := tasks.MaximumKey + minPendingTaskKey := tasks.MaximumKey for key := range s.outstandingExecutables { if s.outstandingExecutables[key].State() == ctasks.TaskStateAcked { delete(s.outstandingExecutables, key) continue } - minTaskKey = tasks.MinKey(minTaskKey, key) + minPendingTaskKey = tasks.MinKey(minPendingTaskKey, key) } - // still has pending task in memory - if minTaskKey != tasks.MaximumKey { - s.scope.Range.InclusiveMin = minTaskKey - return + minIteratorKey := tasks.MaximumKey + if len(s.iterators) != 0 { + minIteratorKey = s.iterators[0].Range().InclusiveMin } - // no more pending task in memory, but has more tasks to read from persistence - if len(s.iterators) != 0 { - s.scope.Range.InclusiveMin = s.iterators[0].Range().InclusiveMin - return + // pick min key for tasks in memory and in persistence + newRangeMin := tasks.MinKey(minPendingTaskKey, minIteratorKey) + + // no pending task in memory and in persistence + if newRangeMin == tasks.MaximumKey { + newRangeMin = s.scope.Range.ExclusiveMax } - // no pending task in memory and persistence - s.scope.Range.InclusiveMin = s.scope.Range.ExclusiveMax + s.scope.Range.InclusiveMin = newRangeMin } func (s *SliceImpl) SelectTasks(batchSize int) ([]Executable, error) { diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go index ff7c2b2a72a..acec4e14d16 100644 --- a/service/history/queues/slice_test.go +++ b/service/history/queues/slice_test.go @@ -313,15 +313,15 @@ func (s *sliceSuite) TestShrinkRange() { slice.ShrinkRange() s.Len(slice.outstandingExecutables, len(executables)-numAcked) + s.validateSliceState(slice) newInclusiveMin := r.ExclusiveMax + if len(slice.iterators) != 0 { + newInclusiveMin = slice.iterators[0].Range().InclusiveMin + } - if firstPendingIdx == len(executables) { - if len(slice.iterators) != 0 { - newInclusiveMin = slice.iterators[0].Range().InclusiveMin - } - } else { - newInclusiveMin = executables[firstPendingIdx].GetKey() + if numAcked != len(executables) { + newInclusiveMin = tasks.MinKey(newInclusiveMin, executables[firstPendingIdx].GetKey()) } s.Equal(NewRange(newInclusiveMin, r.ExclusiveMax), slice.Scope().Range)