diff --git a/service/history/queues/iterator.go b/service/history/queues/iterator.go index 92fe8ca3952..fb31a6729d2 100644 --- a/service/history/queues/iterator.go +++ b/service/history/queues/iterator.go @@ -43,10 +43,10 @@ type ( Remaining() Iterator } - paginationFnProvider func(Range) collection.PaginationFn[tasks.Task] + PaginationFnProvider func(Range) collection.PaginationFn[tasks.Task] IteratorImpl struct { - paginationFnProvider paginationFnProvider + paginationFnProvider PaginationFnProvider remainingRange Range pagingIterator collection.Iterator[tasks.Task] @@ -54,7 +54,7 @@ type ( ) func NewIterator( - paginationFnProvider paginationFnProvider, + paginationFnProvider PaginationFnProvider, r Range, ) *IteratorImpl { return &IteratorImpl{ diff --git a/service/history/queues/range.go b/service/history/queues/range.go index 85101f4507e..d77016c7220 100644 --- a/service/history/queues/range.go +++ b/service/history/queues/range.go @@ -105,7 +105,7 @@ func (r *Range) Merge( ) } -func (r *Range) Equal( +func (r *Range) Equals( input Range, ) bool { return r.InclusiveMin.CompareTo(input.InclusiveMin) == 0 && diff --git a/service/history/queues/range_test.go b/service/history/queues/range_test.go index bfa312c49ad..fc34091b399 100644 --- a/service/history/queues/range_test.go +++ b/service/history/queues/range_test.go @@ -286,8 +286,8 @@ func (s *rangeSuite) TestSplit() { splitKey := NewRandomKeyInRange(r) left, right := r.Split(splitKey) - s.True(left.Equal(NewRange(r.InclusiveMin, splitKey))) - s.True(right.Equal(NewRange(splitKey, r.ExclusiveMax))) + s.True(left.Equals(NewRange(r.InclusiveMin, splitKey))) + s.True(right.Equals(NewRange(splitKey, r.ExclusiveMax))) } func (s *rangeSuite) TestMerge() { @@ -298,29 +298,29 @@ func (s *rangeSuite) TestMerge() { NewRandomKeyInRange(r), ) mergedRange := r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(tasks.MinimumKey, r.ExclusiveMax))) testRange = NewRange( NewRandomKeyInRange(r), tasks.MaximumKey, ) mergedRange = r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(r.InclusiveMin, tasks.MaximumKey))) testRange = NewRange(tasks.MinimumKey, tasks.MaximumKey) mergedRange = r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(tasks.MinimumKey, tasks.MaximumKey))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(tasks.MinimumKey, tasks.MaximumKey))) testRange = NewRange(tasks.MinimumKey, r.InclusiveMin) mergedRange = r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(tasks.MinimumKey, r.ExclusiveMax))) testRange = NewRange(r.ExclusiveMax, tasks.MaximumKey) mergedRange = r.Merge(testRange) - s.True(mergedRange.Equal(testRange.Merge(r))) - s.True(mergedRange.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey))) + s.True(mergedRange.Equals(testRange.Merge(r))) + s.True(mergedRange.Equals(NewRange(r.InclusiveMin, tasks.MaximumKey))) } diff --git a/service/history/queues/scope.go b/service/history/queues/scope.go index 82abcf8d119..c8900ea97d2 100644 --- a/service/history/queues/scope.go +++ b/service/history/queues/scope.go @@ -108,7 +108,7 @@ func (s *Scope) MergeByRange( func (s *Scope) CanMergeByPredicate( incomingScope Scope, ) bool { - return s.Range.Equal(incomingScope.Range) + return s.Range.Equals(incomingScope.Range) } func (s *Scope) MergeByPredicate( @@ -121,3 +121,20 @@ func (s *Scope) MergeByPredicate( // TODO: special check if the predicates are the same type return NewScope(s.Range, predicates.Or(s.Predicate, incomingScope.Predicate)) } + +func (s *Scope) IsEmpty() bool { + if s.Range.IsEmpty() { + return true + } + + if _, ok := s.Predicate.(*predicates.EmptyImpl[tasks.Task]); ok { + return true + } + + return false +} + +func (s *Scope) Equals(scope Scope) bool { + return s.Range.Equals(scope.Range) && + s.Predicate.Equals(scope.Predicate) +} diff --git a/service/history/queues/slice.go b/service/history/queues/slice.go new file mode 100644 index 00000000000..ef8d0084d7e --- /dev/null +++ b/service/history/queues/slice.go @@ -0,0 +1,437 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package queues + +import ( + "fmt" + + ctasks "go.temporal.io/server/common/tasks" + "go.temporal.io/server/service/history/tasks" +) + +type ( + + // Slice manages the loading and status tracking of all + // tasks within its Scope. + // It also provides methods for splitting or merging with + // another slice either by range or by predicate. + Slice interface { + Scope() Scope + CanSplitByRange(tasks.Key) bool + SplitByRange(tasks.Key) (left Slice, right Slice) + SplitByPredicate(tasks.Predicate) (pass Slice, fail Slice) + CanMergeWithSlice(Slice) bool + MergeWithSlice(Slice) []Slice + ShrinkRange() + SelectTasks(int) ([]Executable, error) + } + + ExecutableInitializer func(tasks.Task) Executable + + SliceImpl struct { + paginationFnProvider PaginationFnProvider + executableInitializer ExecutableInitializer + + destroyed bool + + scope Scope + iterators []Iterator + + // TODO: make task tracking a separate component + outstandingExecutables map[tasks.Key]Executable + } +) + +func NewSlice( + paginationFnProvider PaginationFnProvider, + executableInitializer ExecutableInitializer, + scope Scope, +) *SliceImpl { + return &SliceImpl{ + paginationFnProvider: paginationFnProvider, + executableInitializer: executableInitializer, + scope: scope, + outstandingExecutables: make(map[tasks.Key]Executable), + iterators: []Iterator{ + NewIterator(paginationFnProvider, scope.Range), + }, + } +} + +func (s *SliceImpl) Scope() Scope { + s.stateSanityCheck() + return s.scope +} + +func (s *SliceImpl) CanSplitByRange(key tasks.Key) bool { + s.stateSanityCheck() + return s.scope.CanSplitByRange(key) +} + +func (s *SliceImpl) SplitByRange(key tasks.Key) (left Slice, right Slice) { + if !s.CanSplitByRange(key) { + panic(fmt.Sprintf("Unable to split queue slice with range %v at %v", s.scope.Range, key)) + } + + return s.splitByRange(key) +} + +func (s *SliceImpl) splitByRange(key tasks.Key) (left *SliceImpl, right *SliceImpl) { + + leftScope, rightScope := s.scope.SplitByRange(key) + leftExecutables, rightExecutables := s.splitExecutables(leftScope, rightScope) + + leftIterators := make([]Iterator, 0, len(s.iterators)/2) + rightIterators := make([]Iterator, 0, len(s.iterators)/2) + for _, iter := range s.iterators { + iterRange := iter.Range() + if leftScope.Range.ContainsRange(iterRange) { + leftIterators = append(leftIterators, iter) + continue + } + + if rightScope.Range.ContainsRange(iterRange) { + rightIterators = append(rightIterators, iter) + continue + } + + leftIter, rightIter := iter.Split(key) + leftIterators = append(leftIterators, leftIter) + rightIterators = append(rightIterators, rightIter) + } + + left = &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, + executableInitializer: s.executableInitializer, + scope: leftScope, + outstandingExecutables: leftExecutables, + iterators: leftIterators, + } + right = &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, + executableInitializer: s.executableInitializer, + scope: rightScope, + outstandingExecutables: rightExecutables, + iterators: rightIterators, + } + + s.destroy() + return left, right +} + +func (s *SliceImpl) SplitByPredicate(predicate tasks.Predicate) (pass Slice, fail Slice) { + s.stateSanityCheck() + + passScope, failScope := s.scope.SplitByPredicate(predicate) + passExecutables, failExecutables := s.splitExecutables(passScope, failScope) + + passIterators := make([]Iterator, 0, len(s.iterators)) + failIterators := make([]Iterator, 0, len(s.iterators)) + for _, iter := range s.iterators { + // iter.Remaining() is basically a deep copy of iter + passIterators = append(passIterators, iter) + failIterators = append(failIterators, iter.Remaining()) + } + + pass = &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, + executableInitializer: s.executableInitializer, + scope: passScope, + outstandingExecutables: passExecutables, + iterators: passIterators, + } + fail = &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, + executableInitializer: s.executableInitializer, + scope: failScope, + outstandingExecutables: failExecutables, + iterators: failIterators, + } + + s.destroy() + return pass, fail +} + +func (s *SliceImpl) splitExecutables( + thisScope Scope, + thatScope Scope, +) (map[tasks.Key]Executable, map[tasks.Key]Executable) { + thisExecutable := s.outstandingExecutables + thatExecutables := make(map[tasks.Key]Executable, len(s.outstandingExecutables)/2) + for key, executable := range s.outstandingExecutables { + if thisScope.Contains(executable) { + continue + } + + if !thatScope.Contains(executable) { + panic(fmt.Sprintf("Queue slice encountered task doesn't belong to its scope, scope: %v, task: %v, task type: %v", + s.scope, executable.GetTask(), executable.GetType())) + } + delete(thisExecutable, key) + thatExecutables[key] = executable + } + return thisExecutable, thatExecutables +} + +func (s *SliceImpl) CanMergeWithSlice(slice Slice) bool { + s.stateSanityCheck() + + return s != slice && s.scope.Range.CanMerge(slice.Scope().Range) +} + +func (s *SliceImpl) MergeWithSlice(slice Slice) []Slice { + if s.scope.Range.InclusiveMin.CompareTo(slice.Scope().Range.InclusiveMin) > 0 { + return slice.MergeWithSlice(s) + } + + if !s.CanMergeWithSlice(slice) { + panic(fmt.Sprintf("Unable to merge queue slice having scope %v with slice having scope %v", s.scope, slice.Scope())) + } + + incomingSlice, ok := slice.(*SliceImpl) + if !ok { + panic(fmt.Sprintf("Unable to merge queue slice of type %T with type %T", s, slice)) + } + + if s.scope.CanMergeByRange(incomingSlice.scope) { + return []Slice{s.mergeByRange(incomingSlice)} + } + + mergedSlices := make([]Slice, 0, 3) + currentLeftSlice, currentRightSlice := s.splitByRange(incomingSlice.Scope().Range.InclusiveMin) + if !currentLeftSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, currentLeftSlice) + } + + if currentRightMax := currentRightSlice.Scope().Range.ExclusiveMax; incomingSlice.CanSplitByRange(currentRightMax) { + leftIncomingSlice, rightIncomingSlice := incomingSlice.splitByRange(currentRightMax) + mergedMidSlice := currentRightSlice.mergeByPredicate(leftIncomingSlice) + if !mergedMidSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, mergedMidSlice) + } + if !rightIncomingSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, rightIncomingSlice) + } + } else { + currentMidSlice, currentRightSlice := currentRightSlice.splitByRange(incomingSlice.Scope().Range.ExclusiveMax) + mergedMidSlice := currentMidSlice.mergeByPredicate(incomingSlice) + if !mergedMidSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, mergedMidSlice) + } + if !currentRightSlice.scope.IsEmpty() { + mergedSlices = append(mergedSlices, currentRightSlice) + } + } + + return mergedSlices + +} + +func (s *SliceImpl) mergeByRange(incomingSlice *SliceImpl) *SliceImpl { + mergedSlice := &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, + executableInitializer: s.executableInitializer, + scope: s.scope.MergeByRange(incomingSlice.scope), + outstandingExecutables: s.mergeExecutables(incomingSlice), + iterators: s.mergeIterators(incomingSlice), + } + + s.destroy() + incomingSlice.destroy() + + return mergedSlice +} + +func (s *SliceImpl) mergeByPredicate(incomingSlice *SliceImpl) *SliceImpl { + mergedSlice := &SliceImpl{ + paginationFnProvider: s.paginationFnProvider, + executableInitializer: s.executableInitializer, + scope: s.scope.MergeByPredicate(incomingSlice.scope), + outstandingExecutables: s.mergeExecutables(incomingSlice), + iterators: s.mergeIterators(incomingSlice), + } + + s.destroy() + incomingSlice.destroy() + + return mergedSlice +} + +func (s *SliceImpl) mergeExecutables(incomingSlice *SliceImpl) map[tasks.Key]Executable { + thisExecutables := s.outstandingExecutables + thatExecutables := incomingSlice.outstandingExecutables + if len(thisExecutables) < len(thatExecutables) { + thisExecutables, thatExecutables = thatExecutables, thisExecutables + } + + for key, executable := range thatExecutables { + thisExecutables[key] = executable + } + + return thisExecutables +} + +func (s *SliceImpl) mergeIterators(incomingSlice *SliceImpl) []Iterator { + mergedIterators := make([]Iterator, 0, len(s.iterators)+len(incomingSlice.iterators)) + currentIterIdx := 0 + incomingIterIdx := 0 + for currentIterIdx < len(s.iterators) && incomingIterIdx < len(incomingSlice.iterators) { + currentIter := s.iterators[currentIterIdx] + incomingIter := incomingSlice.iterators[incomingIterIdx] + + if currentIter.Range().InclusiveMin.CompareTo(incomingIter.Range().InclusiveMin) < 0 { + mergedIterators = s.appendIterator(mergedIterators, currentIter) + currentIterIdx++ + } else { + mergedIterators = s.appendIterator(mergedIterators, incomingIter) + incomingIterIdx++ + } + } + + for _, iterator := range s.iterators[currentIterIdx:] { + mergedIterators = s.appendIterator(mergedIterators, iterator) + } + for _, iterator := range incomingSlice.iterators[incomingIterIdx:] { + mergedIterators = s.appendIterator(mergedIterators, iterator) + } + + validateIteratorsOrderedDisjoint(mergedIterators) + + return mergedIterators +} + +func (s *SliceImpl) appendIterator( + iterators []Iterator, + iterator Iterator, +) []Iterator { + if len(iterators) == 0 { + return []Iterator{iterator} + } + + size := len(iterators) + if iterators[size-1].CanMerge(iterator) { + iterators[size-1] = iterators[size-1].Merge(iterator) + return iterators + } + + return append(iterators, iterator) +} + +func (s *SliceImpl) ShrinkRange() { + s.stateSanityCheck() + + minPendingTaskKey := tasks.MaximumKey + for key := range s.outstandingExecutables { + if s.outstandingExecutables[key].State() == ctasks.TaskStateAcked { + delete(s.outstandingExecutables, key) + continue + } + + minPendingTaskKey = tasks.MinKey(minPendingTaskKey, key) + } + + minIteratorKey := tasks.MaximumKey + if len(s.iterators) != 0 { + minIteratorKey = s.iterators[0].Range().InclusiveMin + } + + // pick min key for tasks in memory and in persistence + newRangeMin := tasks.MinKey(minPendingTaskKey, minIteratorKey) + + // no pending task in memory and in persistence + if newRangeMin == tasks.MaximumKey { + newRangeMin = s.scope.Range.ExclusiveMax + } + + s.scope.Range.InclusiveMin = newRangeMin +} + +func (s *SliceImpl) SelectTasks(batchSize int) ([]Executable, error) { + s.stateSanityCheck() + + if len(s.iterators) == 0 { + return []Executable{}, nil + } + + executables := make([]Executable, 0, batchSize) + for len(executables) < batchSize && len(s.iterators) != 0 { + if s.iterators[0].HasNext() { + task, err := s.iterators[0].Next() + if err != nil { + s.iterators[0] = s.iterators[0].Remaining() + return nil, err + } + + taskKey := task.GetKey() + if !s.scope.Range.ContainsKey(taskKey) { + panic(fmt.Sprintf("Queue slice get task from iterator doesn't belong to its range, range: %v, task key %v", + s.scope.Range, taskKey)) + } + + if !s.scope.Predicate.Test(task) { + continue + } + + executable := s.executableInitializer(task) + s.outstandingExecutables[taskKey] = executable + executables = append(executables, executable) + } else { + s.iterators = s.iterators[1:] + } + } + + return executables, nil +} + +func (s *SliceImpl) destroy() { + s.destroyed = true + s.iterators = nil + s.outstandingExecutables = nil +} + +func (s *SliceImpl) stateSanityCheck() { + if s.destroyed { + panic("Can not invoke method on destroyed queue slice") + } +} + +func validateIteratorsOrderedDisjoint( + iterators []Iterator, +) { + if len(iterators) <= 1 { + return + } + + for idx, iterator := range iterators[:len(iterators)-1] { + nextIterator := iterators[idx+1] + if iterator.Range().ExclusiveMax.CompareTo(nextIterator.Range().InclusiveMin) >= 0 { + panic(fmt.Sprintf( + "Found overlapping iterators in iterator list, left range: %v, right range: %v", + iterator.Range(), + nextIterator.Range(), + )) + } + } +} diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go new file mode 100644 index 00000000000..acec4e14d16 --- /dev/null +++ b/service/history/queues/slice_test.go @@ -0,0 +1,558 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package queues + +import ( + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + enumsspb "go.temporal.io/server/api/enums/v1" + "go.temporal.io/server/common/collection" + "go.temporal.io/server/common/predicates" + ctasks "go.temporal.io/server/common/tasks" + "go.temporal.io/server/service/history/tasks" + "golang.org/x/exp/slices" +) + +type ( + sliceSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + + executableInitializer ExecutableInitializer + } +) + +func TestSliceSuite(t *testing.T) { + s := new(sliceSuite) + suite.Run(t, s) +} + +func (s *sliceSuite) SetupTest() { + s.Assertions = require.New(s.T()) + + s.controller = gomock.NewController(s.T()) + + s.executableInitializer = func(t tasks.Task) Executable { + return NewMockExecutable(s.controller) + } +} + +func (s *sliceSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *sliceSuite) TestCanSplitByRange() { + r := NewRandomRange() + scope := NewScope(r, predicates.All[tasks.Task]()) + + slice := NewSlice(nil, s.executableInitializer, scope) + s.Equal(scope, slice.Scope()) + + s.True(slice.CanSplitByRange(r.InclusiveMin)) + s.True(slice.CanSplitByRange(r.ExclusiveMax)) + s.True(slice.CanSplitByRange(NewRandomKeyInRange(r))) + + s.False(slice.CanSplitByRange(tasks.NewKey( + r.InclusiveMin.FireTime, + r.InclusiveMin.TaskID-1, + ))) + s.False(slice.CanSplitByRange(tasks.NewKey( + r.ExclusiveMax.FireTime.Add(time.Nanosecond), + r.ExclusiveMax.TaskID, + ))) +} + +func (s *sliceSuite) TestSplitByRange() { + r := NewRandomRange() + slice := s.newTestSlice(r, nil, nil) + + splitKey := NewRandomKeyInRange(r) + leftSlice, rightSlice := slice.SplitByRange(splitKey) + s.Equal(NewScope( + NewRange(r.InclusiveMin, splitKey), + slice.scope.Predicate, + ), leftSlice.Scope()) + s.Equal(NewScope( + NewRange(splitKey, r.ExclusiveMax), + slice.scope.Predicate, + ), rightSlice.Scope()) + + s.validateSliceState(leftSlice.(*SliceImpl)) + s.validateSliceState(rightSlice.(*SliceImpl)) + + s.Panics(func() { slice.stateSanityCheck() }) +} + +func (s *sliceSuite) TestSplitByPredicate() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + slice := s.newTestSlice(r, namespaceIDs, nil) + + splitNamespaceIDs := append(slices.Clone(namespaceIDs[:rand.Intn(len(namespaceIDs))]), uuid.New(), uuid.New()) + splitPredicate := tasks.NewNamespacePredicate(splitNamespaceIDs) + passSlice, failSlice := slice.SplitByPredicate(splitPredicate) + s.Equal(r, passSlice.Scope().Range) + s.Equal(r, failSlice.Scope().Range) + s.True(predicates.And[tasks.Task](slice.scope.Predicate, splitPredicate).Equals(passSlice.Scope().Predicate)) + s.True(predicates.And(slice.scope.Predicate, predicates.Not[tasks.Task](splitPredicate)).Equals(failSlice.Scope().Predicate)) + + s.validateSliceState(passSlice.(*SliceImpl)) + s.validateSliceState(failSlice.(*SliceImpl)) + + s.Panics(func() { slice.stateSanityCheck() }) +} + +func (s *sliceSuite) TestCanMergeWithSlice() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + predicate := tasks.NewNamespacePredicate(namespaceIDs) + slice := NewSlice(nil, nil, NewScope(r, predicate)) + + testPredicates := []tasks.Predicate{ + predicate, + tasks.NewNamespacePredicate(namespaceIDs), + tasks.NewNamespacePredicate([]string{uuid.New(), uuid.New(), uuid.New(), uuid.New()}), + } + s.True(predicate.Equals(testPredicates[0])) + s.True(predicate.Equals(testPredicates[1])) + s.False(predicate.Equals(testPredicates[2])) + + for _, mergePredicate := range testPredicates { + testSlice := NewSlice(nil, nil, NewScope(r, mergePredicate)) + s.True(slice.CanMergeWithSlice(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, r.InclusiveMin), mergePredicate)) + s.True(slice.CanMergeWithSlice(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(r.ExclusiveMax, tasks.MaximumKey), mergePredicate)) + s.True(slice.CanMergeWithSlice(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, NewRandomKeyInRange(r)), mergePredicate)) + s.True(slice.CanMergeWithSlice(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(NewRandomKeyInRange(r), tasks.MaximumKey), mergePredicate)) + s.True(slice.CanMergeWithSlice(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange(tasks.MinimumKey, tasks.MaximumKey), mergePredicate)) + s.True(slice.CanMergeWithSlice(testSlice)) + } + + s.False(slice.CanMergeWithSlice(slice)) + + testSlice := NewSlice(nil, nil, NewScope(NewRange( + tasks.MinimumKey, + tasks.NewKey(r.InclusiveMin.FireTime, r.InclusiveMin.TaskID-1), + ), predicate)) + s.False(slice.CanMergeWithSlice(testSlice)) + + testSlice = NewSlice(nil, nil, NewScope(NewRange( + tasks.NewKey(r.ExclusiveMax.FireTime, r.ExclusiveMax.TaskID+1), + tasks.MaximumKey, + ), predicate)) + s.False(slice.CanMergeWithSlice(testSlice)) +} + +func (s *sliceSuite) TestMergeWithSlice_SamePredicate() { + r := NewRandomRange() + slice := s.newTestSlice(r, nil, nil) + totalExecutables := len(slice.outstandingExecutables) + + incomingRange := NewRange(tasks.MinimumKey, NewRandomKeyInRange(r)) + incomingSlice := s.newTestSlice(incomingRange, nil, nil) + totalExecutables += len(incomingSlice.outstandingExecutables) + + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 1) + + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) +} + +func (s *sliceSuite) TestMergeWithSlice_SameRange() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + slice := s.newTestSlice(r, namespaceIDs, nil) + totalExecutables := len(slice.outstandingExecutables) + + taskTypes := []enumsspb.TaskType{ + enumsspb.TASK_TYPE_ACTIVITY_RETRY_TIMER, + enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, + } + incomingSlice := s.newTestSlice(r, nil, taskTypes) + totalExecutables += len(incomingSlice.outstandingExecutables) + + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 1) + + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) +} + +func (s *sliceSuite) TestMergeWithSlice_SameMinKey() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + slice := s.newTestSlice(r, namespaceIDs, nil) + totalExecutables := len(slice.outstandingExecutables) + + incomingRange := NewRange( + r.InclusiveMin, + NewRandomKeyInRange(NewRange(r.InclusiveMin, tasks.MaximumKey)), + ) + incomingNamespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + incomingSlice := s.newTestSlice(incomingRange, incomingNamespaceIDs, nil) + totalExecutables += len(incomingSlice.outstandingExecutables) + + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 2) + + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) +} + +func (s *sliceSuite) TestMergeWithSlice_SameMaxKey() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + slice := s.newTestSlice(r, namespaceIDs, nil) + totalExecutables := len(slice.outstandingExecutables) + + incomingRange := NewRange( + NewRandomKeyInRange(NewRange(tasks.MinimumKey, r.ExclusiveMax)), + r.ExclusiveMax, + ) + incomingNamespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + incomingSlice := s.newTestSlice(incomingRange, incomingNamespaceIDs, nil) + totalExecutables += len(incomingSlice.outstandingExecutables) + + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 2) + + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) +} + +func (s *sliceSuite) TestMergeWithSlice_DifferentMinMaxKey() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + slice := s.newTestSlice(r, namespaceIDs, nil) + totalExecutables := len(slice.outstandingExecutables) + + incomingMinKey := NewRandomKeyInRange(NewRange(r.InclusiveMin, r.ExclusiveMax)) + incomingRange := NewRange( + incomingMinKey, + NewRandomKeyInRange(NewRange(incomingMinKey, tasks.MaximumKey)), + ) + incomingNamespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + incomingSlice := s.newTestSlice(incomingRange, incomingNamespaceIDs, nil) + totalExecutables += len(incomingSlice.outstandingExecutables) + + s.validateSliceState(slice) + s.validateSliceState(incomingSlice) + + mergedSlices := slice.MergeWithSlice(incomingSlice) + s.Len(mergedSlices, 3) + + s.validateMergedSlice(slice, incomingSlice, mergedSlices, totalExecutables) +} + +func (s *sliceSuite) TestShrinkRange() { + r := NewRandomRange() + predicate := predicates.All[tasks.Task]() + + slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + slice.iterators = s.randomIteratorsInRange(r, rand.Intn(2), nil) + + executables := s.randomExecutablesInRange(r, 5) + slices.SortFunc(executables, func(a, b Executable) bool { + return a.GetKey().CompareTo(b.GetKey()) < 0 + }) + + firstPendingIdx := len(executables) + numAcked := 0 + for idx, executable := range executables { + mockExecutable := executable.(*MockExecutable) + acked := rand.Intn(10) < 8 + if acked { + mockExecutable.EXPECT().State().Return(ctasks.TaskStateAcked).MaxTimes(1) + numAcked++ + } else { + mockExecutable.EXPECT().State().Return(ctasks.TaskStatePending).MaxTimes(1) + if firstPendingIdx == len(executables) { + firstPendingIdx = idx + } + } + + slice.outstandingExecutables[executable.GetKey()] = executable + } + + slice.ShrinkRange() + s.Len(slice.outstandingExecutables, len(executables)-numAcked) + s.validateSliceState(slice) + + newInclusiveMin := r.ExclusiveMax + if len(slice.iterators) != 0 { + newInclusiveMin = slice.iterators[0].Range().InclusiveMin + } + + if numAcked != len(executables) { + newInclusiveMin = tasks.MinKey(newInclusiveMin, executables[firstPendingIdx].GetKey()) + } + + s.Equal(NewRange(newInclusiveMin, r.ExclusiveMax), slice.Scope().Range) +} + +func (s *sliceSuite) TestSelectTasks_NoError() { + r := NewRandomRange() + namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()} + predicate := tasks.NewNamespacePredicate(namespaceIDs) + + numTasks := 20 + paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + return func(paginationToken []byte) ([]tasks.Task, []byte, error) { + + mockTasks := make([]tasks.Task, 0, numTasks) + for i := 0; i != numTasks; i++ { + mockTask := tasks.NewMockTask(s.controller) + key := NewRandomKeyInRange(r) + mockTask.EXPECT().GetKey().Return(key).AnyTimes() + + namespaceID := namespaceIDs[rand.Intn(len(namespaceIDs))] + if i >= numTasks/2 { + namespaceID = uuid.New() // should be filtered out + } + mockTask.EXPECT().GetNamespaceID().Return(namespaceID).AnyTimes() + mockTasks = append(mockTasks, mockTask) + } + + slices.SortFunc(mockTasks, func(a, b tasks.Task) bool { + return a.GetKey().CompareTo(b.GetKey()) < 0 + }) + + return mockTasks, nil, nil + } + } + + for _, batchSize := range []int{1, 2, 5, 10, 20, 100} { + slice := NewSlice(paginationFnProvider, s.executableInitializer, NewScope(r, predicate)) + + executables := make([]Executable, 0, numTasks) + for { + selectedExecutables, err := slice.SelectTasks(batchSize) + s.NoError(err) + if len(selectedExecutables) == 0 { + break + } + + executables = append(executables, selectedExecutables...) + } + + s.Len(executables, numTasks/2) // half of tasks should be filtered out based on its namespaceID + s.Empty(slice.iterators) + } +} + +func (s *sliceSuite) TestSelectTasks_Error() { + r := NewRandomRange() + predicate := predicates.All[tasks.Task]() + + numTasks := 20 + loadErr := true + paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + return func(paginationToken []byte) ([]tasks.Task, []byte, error) { + if loadErr { + loadErr = false + return nil, nil, errors.New("some random load task error") + } + + mockTasks := make([]tasks.Task, 0, numTasks) + for i := 0; i != numTasks; i++ { + mockTask := tasks.NewMockTask(s.controller) + key := NewRandomKeyInRange(r) + mockTask.EXPECT().GetKey().Return(key).AnyTimes() + mockTasks = append(mockTasks, mockTask) + } + + slices.SortFunc(mockTasks, func(a, b tasks.Task) bool { + return a.GetKey().CompareTo(b.GetKey()) < 0 + }) + + return mockTasks, nil, nil + } + } + + slice := NewSlice(paginationFnProvider, s.executableInitializer, NewScope(r, predicate)) + _, err := slice.SelectTasks(100) + s.Error(err) + + executables, err := slice.SelectTasks(100) + s.NoError(err) + s.Len(executables, numTasks) + s.Empty(slice.iterators) +} + +func (s *sliceSuite) newTestSlice( + r Range, + namespaceIDs []string, + taskTypes []enumsspb.TaskType, +) *SliceImpl { + predicate := predicates.All[tasks.Task]() + if len(namespaceIDs) != 0 { + predicate = predicates.And[tasks.Task]( + predicate, + tasks.NewNamespacePredicate(namespaceIDs), + ) + } + if len(taskTypes) != 0 { + predicate = predicates.And[tasks.Task]( + predicate, + tasks.NewTypePredicate(taskTypes), + ) + } + + if len(namespaceIDs) == 0 { + namespaceIDs = []string{uuid.New()} + } + + if len(taskTypes) == 0 { + taskTypes = []enumsspb.TaskType{enumsspb.TASK_TYPE_TRANSFER_CLOSE_EXECUTION} + } + + slice := NewSlice(nil, s.executableInitializer, NewScope(r, predicate)) + for _, executable := range s.randomExecutablesInRange(r, rand.Intn(20)) { + slice.outstandingExecutables[executable.GetKey()] = executable + + mockExecutable := executable.(*MockExecutable) + mockExecutable.EXPECT().GetNamespaceID().Return(namespaceIDs[rand.Intn(len(namespaceIDs))]).AnyTimes() + mockExecutable.EXPECT().GetType().Return(taskTypes[rand.Intn(len(taskTypes))]).AnyTimes() + } + slice.iterators = s.randomIteratorsInRange(r, rand.Intn(10), nil) + + return slice +} + +func (s *sliceSuite) validateMergedSlice( + currentSlice *SliceImpl, + incomingSlice *SliceImpl, + mergedSlices []Slice, + expectedTotalExecutables int, +) { + expectedMergedRange := currentSlice.scope.Range.Merge(incomingSlice.scope.Range) + actualMergedRange := mergedSlices[0].Scope().Range + for _, mergedSlice := range mergedSlices { + actualMergedRange = actualMergedRange.Merge(mergedSlice.Scope().Range) + + containedByCurrent := currentSlice.scope.Range.ContainsRange(mergedSlice.Scope().Range) + containedByIncoming := incomingSlice.scope.Range.ContainsRange(mergedSlice.Scope().Range) + if containedByCurrent && containedByIncoming { + s.True(predicates.Or( + currentSlice.scope.Predicate, + incomingSlice.scope.Predicate, + ).Equals(mergedSlice.Scope().Predicate)) + } else if containedByCurrent { + s.True(currentSlice.scope.Predicate.Equals(mergedSlice.Scope().Predicate)) + } else if containedByIncoming { + s.True(incomingSlice.scope.Predicate.Equals(mergedSlice.Scope().Predicate)) + } else if currentSlice.scope.Predicate.Equals(incomingSlice.scope.Predicate) { + s.True(currentSlice.scope.Predicate.Equals(mergedSlice.Scope().Predicate)) + } else { + s.Fail("Merged slice range not contained by the merging slices") + } + + } + s.True(expectedMergedRange.Equals(actualMergedRange)) + + actualTotalExecutables := 0 + for _, mergedSlice := range mergedSlices { + mergedSliceImpl := mergedSlice.(*SliceImpl) + s.validateSliceState(mergedSliceImpl) + actualTotalExecutables += len(mergedSliceImpl.outstandingExecutables) + } + s.Equal(expectedTotalExecutables, actualTotalExecutables) + + s.Panics(func() { currentSlice.stateSanityCheck() }) + s.Panics(func() { incomingSlice.stateSanityCheck() }) +} + +func (s *sliceSuite) validateSliceState( + slice *SliceImpl, +) { + s.NotNil(slice.executableInitializer) + + for _, executable := range slice.outstandingExecutables { + s.True(slice.scope.Contains(executable)) + } + + r := slice.Scope().Range + for idx, iterator := range slice.iterators { + s.True(r.ContainsRange(iterator.Range())) + if idx != 0 { + currentRange := iterator.Range() + previousRange := slice.iterators[idx-1].Range() + s.False(currentRange.CanMerge(previousRange)) + s.True(previousRange.ExclusiveMax.CompareTo(currentRange.InclusiveMin) < 0) + } + } + + s.False(slice.destroyed) +} + +func (s *sliceSuite) randomExecutablesInRange( + r Range, + numExecutables int, +) []Executable { + executables := make([]Executable, 0, numExecutables) + for i := 0; i != numExecutables; i++ { + mockExecutable := NewMockExecutable(s.controller) + key := NewRandomKeyInRange(r) + mockExecutable.EXPECT().GetKey().Return(key).AnyTimes() + executables = append(executables, mockExecutable) + } + return executables +} + +func (s *sliceSuite) randomIteratorsInRange( + r Range, + numIterators int, + paginationFnProvider PaginationFnProvider, +) []Iterator { + ranges := NewRandomOrderedRangesInRange(r, numIterators) + + iterators := make([]Iterator, 0, numIterators) + for _, r := range ranges { + start := NewRandomKeyInRange(r) + end := NewRandomKeyInRange(r) + if start.CompareTo(end) > 0 { + start, end = end, start + } + iterator := NewIterator(paginationFnProvider, NewRange(start, end)) + iterators = append(iterators, iterator) + } + + return iterators +} diff --git a/service/history/queues/test_util.go b/service/history/queues/test_util.go index 9bda1bd1077..febda1786aa 100644 --- a/service/history/queues/test_util.go +++ b/service/history/queues/test_util.go @@ -29,6 +29,8 @@ import ( "math/rand" "time" + "golang.org/x/exp/slices" + "go.temporal.io/server/service/history/tasks" ) @@ -81,3 +83,23 @@ func NewRandomKeyInRange( return tasks.NewKey(fireTime, rand.Int63()) } + +func NewRandomOrderedRangesInRange( + r Range, + numRanges int, +) []Range { + ranges := []Range{r} + for len(ranges) < numRanges { + r := ranges[0] + left, right := r.Split(NewRandomKeyInRange(r)) + left.ExclusiveMax.FireTime.Add(-time.Nanosecond) + right.InclusiveMin.FireTime.Add(time.Nanosecond) + ranges = append(ranges[1:], left, right) + } + + slices.SortFunc(ranges, func(a, b Range) bool { + return a.InclusiveMin.CompareTo(b.InclusiveMin) < 0 + }) + + return ranges +}