From ba30aa8ab03153a4ceb4a4cae2bcae873f5a0b57 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Wed, 12 Jan 2022 15:03:25 -0800 Subject: [PATCH] Check max taskID instead of max read level when process task (#2371) --- service/history/shard/context.go | 2 ++ service/history/shard/context_impl.go | 7 +++++++ service/history/shard/context_mock.go | 14 ++++++++++++++ service/history/taskProcessor.go | 4 ++-- 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 99fd66659a2..a45d2dacd49 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -69,6 +69,8 @@ type ( GetTransferMaxReadLevel() int64 UpdateTimerMaxReadLevel(cluster string) time.Time + GetMaxTaskIDForCurrentRangeID() int64 + SetCurrentTime(cluster string, currentTime time.Time) GetCurrentTime(cluster string) time.Time GetLastUpdatedTime() time.Time diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 4fe6ceadbba..a2553a83212 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -185,6 +185,13 @@ func (s *ContextImpl) GetEngine() (Engine, error) { return s.engine, nil } +func (s *ContextImpl) GetMaxTaskIDForCurrentRangeID() int64 { + s.rLock() + defer s.rUnlock() + // maxTransferSequenceNumber is the exclusive upper bound of task ID for current range. + return s.maxTransferSequenceNumber - 1 +} + func (s *ContextImpl) GenerateTransferTaskID() (int64, error) { s.wLock() defer s.wUnlock() diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 24f55dfc7d5..9da0949f1aa 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -387,6 +387,20 @@ func (mr *MockContextMockRecorder) GetLogger() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLogger", reflect.TypeOf((*MockContext)(nil).GetLogger)) } +// GetMaxTaskIDForCurrentRangeID mocks base method. +func (m *MockContext) GetMaxTaskIDForCurrentRangeID() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMaxTaskIDForCurrentRangeID") + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetMaxTaskIDForCurrentRangeID indicates an expected call of GetMaxTaskIDForCurrentRangeID. +func (mr *MockContextMockRecorder) GetMaxTaskIDForCurrentRangeID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMaxTaskIDForCurrentRangeID", reflect.TypeOf((*MockContext)(nil).GetMaxTaskIDForCurrentRangeID)) +} + // GetMetricsClient mocks base method. func (m *MockContext) GetMetricsClient() metrics.Client { m.ctrl.T.Helper() diff --git a/service/history/taskProcessor.go b/service/history/taskProcessor.go index 5deeacb3f36..81660115507 100644 --- a/service/history/taskProcessor.go +++ b/service/history/taskProcessor.go @@ -168,8 +168,8 @@ func (t *taskProcessor) taskWorker( if !ok { return } - if task.GetTaskID() > t.shard.GetTransferMaxReadLevel() { - // this could happen if we lost ownership and was not aware of it. + if task.GetTaskID() > t.shard.GetMaxTaskIDForCurrentRangeID() { + // this could happen if we lost ownership and were not aware of it. // unload shard t.shard.Unload() return