Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make queue max read level always exclusive #3036

Merged
merged 4 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (p *ackMgrImpl) GetMaxTaskID() int64 {
// use ImmediateTaskMaxReadLevel which is the max task id of any immediate task queues.
// ImmediateTaskMaxReadLevel will be the lower bound of new range_id if shard reload. Remote cluster will quickly (in
// a few seconds) ack to the latest ImmediateTaskMaxReadLevel if there is no replication tasks at all.
return p.shard.GetQueueMaxReadLevel(tasks.CategoryReplication, p.currentClusterName).TaskID
return p.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, p.currentClusterName).Prev().TaskID
}

return *p.maxTaskID
Expand Down Expand Up @@ -312,7 +312,7 @@ func (p *ackMgrImpl) taskIDsRange(
lastReadMessageID int64,
) (minTaskID int64, maxTaskID int64) {
minTaskID = lastReadMessageID
maxTaskID = p.shard.GetQueueMaxReadLevel(tasks.CategoryReplication, p.currentClusterName).TaskID
maxTaskID = p.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, p.currentClusterName).Prev().TaskID

p.Lock()
defer p.Unlock()
Expand Down
16 changes: 8 additions & 8 deletions service/history/replication/ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (s *ackManagerSuite) TestNotifyNewTasks_Initialized() {

func (s *ackManagerSuite) TestTaskIDRange_NotInitialized() {
s.replicationAckManager.sanityCheckTime = time.Time{}
expectMaxTaskID := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID
expectMinTaskID := expectMaxTaskID - 100
s.replicationAckManager.maxTaskID = convert.Int64Ptr(expectMinTaskID - 100)

Expand All @@ -165,8 +165,8 @@ func (s *ackManagerSuite) TestTaskIDRange_Initialized_UseHighestReplicationTaskI
now := time.Now().UTC()
sanityCheckTime := now.Add(2 * time.Minute)
s.replicationAckManager.sanityCheckTime = sanityCheckTime
expectMinTaskID := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 50
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 50
s.replicationAckManager.maxTaskID = convert.Int64Ptr(expectMaxTaskID)

minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(expectMinTaskID)
Expand All @@ -180,8 +180,8 @@ func (s *ackManagerSuite) TestTaskIDRange_Initialized_NoHighestReplicationTaskID
now := time.Now().UTC()
sanityCheckTime := now.Add(2 * time.Minute)
s.replicationAckManager.sanityCheckTime = sanityCheckTime
expectMinTaskID := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID
s.replicationAckManager.maxTaskID = nil

minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(expectMinTaskID)
Expand All @@ -195,9 +195,9 @@ func (s *ackManagerSuite) TestTaskIDRange_Initialized_UseHighestTransferTaskID()
now := time.Now().UTC()
sanityCheckTime := now.Add(-2 * time.Minute)
s.replicationAckManager.sanityCheckTime = sanityCheckTime
expectMinTaskID := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID
s.replicationAckManager.maxTaskID = convert.Int64Ptr(s.mockShard.GetQueueMaxReadLevel(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 50)
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID
s.replicationAckManager.maxTaskID = convert.Int64Ptr(s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 50)

minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(expectMinTaskID)
s.Equal(expectMinTaskID, minTaskID)
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func (p *taskProcessorImpl) cleanupReplicationTasks() error {
metrics.TargetClusterTag(p.currentCluster),
).RecordDistribution(
metrics.ReplicationTasksLag,
int(p.shard.GetQueueMaxReadLevel(tasks.CategoryReplication, currentCluster).TaskID-*minAckedTaskID),
int(p.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, currentCluster).Prev().TaskID-*minAckedTaskID),
)
err := p.shard.GetExecutionManager().RangeCompleteHistoryTasks(
context.TODO(),
Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type (
GenerateTaskID() (int64, error)
GenerateTaskIDs(number int) ([]int64, error)

GetQueueMaxReadLevel(category tasks.Category, cluster string) tasks.Key
GetQueueExclusiveHighReadWatermark(category tasks.Category, cluster string) tasks.Key
GetQueueAckLevel(category tasks.Category) tasks.Key
UpdateQueueAckLevel(category tasks.Category, ackLevel tasks.Key) error
GetQueueClusterAckLevel(category tasks.Category, cluster string) tasks.Key
Expand Down
80 changes: 41 additions & 39 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ type (
lifecycleCancel context.CancelFunc

// All following fields are protected by rwLock, and only valid if state >= Acquiring:
rwLock sync.RWMutex
state contextState
engineFuture *future.FutureImpl[Engine]
lastUpdated time.Time
shardInfo *persistence.ShardInfoWithFailover
taskSequenceNumber int64
maxTaskSequenceNumber int64
immediateTaskMaxReadLevel int64
scheduledTaskMaxReadLevelMap map[string]time.Time // cluster -> scheduledTaskMaxReadLevel
rwLock sync.RWMutex
state contextState
engineFuture *future.FutureImpl[Engine]
lastUpdated time.Time
shardInfo *persistence.ShardInfoWithFailover
taskSequenceNumber int64
maxTaskSequenceNumber int64
immediateTaskExclusiveMaxReadLevel int64
scheduledTaskMaxReadLevelMap map[string]time.Time // cluster -> scheduledTaskMaxReadLevel

// exist only in memory
remoteClusterInfos map[string]*remoteClusterInfo
Expand Down Expand Up @@ -264,24 +264,24 @@ func (s *ContextImpl) GenerateTaskIDs(number int) ([]int64, error) {
return result, nil
}

func (s *ContextImpl) GetQueueMaxReadLevel(
func (s *ContextImpl) GetQueueExclusiveHighReadWatermark(
category tasks.Category,
cluster string,
) tasks.Key {
switch categoryType := category.Type(); categoryType {
case tasks.CategoryTypeImmediate:
return s.getImmediateTaskMaxReadLevel()
return s.getImmediateTaskExclusiveMaxReadLevel()
case tasks.CategoryTypeScheduled:
return s.updateScheduledTaskMaxReadLevel(cluster)
default:
panic(fmt.Sprintf("invalid task category type: %v", categoryType))
}
}

func (s *ContextImpl) getImmediateTaskMaxReadLevel() tasks.Key {
func (s *ContextImpl) getImmediateTaskExclusiveMaxReadLevel() tasks.Key {
s.rLock()
defer s.rUnlock()
return tasks.NewImmediateKey(s.immediateTaskMaxReadLevel)
return tasks.NewImmediateKey(s.immediateTaskExclusiveMaxReadLevel)
}

func (s *ContextImpl) getScheduledTaskMaxReadLevel(cluster string) tasks.Key {
Expand Down Expand Up @@ -313,6 +313,8 @@ func (s *ContextImpl) updateScheduledTaskMaxReadLevel(cluster string) tasks.Key
return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0)
}

// NOTE: the ack level returned is inclusive for immediate task category (acked),
// but exclusive for scheduled task category (not acked).
func (s *ContextImpl) GetQueueAckLevel(category tasks.Category) tasks.Key {
s.rLock()
defer s.rUnlock()
Expand Down Expand Up @@ -616,20 +618,20 @@ func (s *ContextImpl) CreateWorkflowExecution(
return nil, err
}

transferMaxReadLevel := int64(0)
transferExclusiveMaxReadLevel := int64(0)
if err := s.allocateTaskIDsLocked(
namespaceEntry,
workflowID,
request.NewWorkflowSnapshot.Tasks,
&transferMaxReadLevel,
&transferExclusiveMaxReadLevel,
); err != nil {
return nil, err
}

currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
resp, err := s.executionManager.CreateWorkflowExecution(ctx, request)
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferExclusiveMaxReadLevel); err != nil {
return nil, err
}
return resp, nil
Expand Down Expand Up @@ -660,12 +662,12 @@ func (s *ContextImpl) UpdateWorkflowExecution(
return nil, err
}

transferMaxReadLevel := int64(0)
transferExclusiveMaxReadLevel := int64(0)
if err := s.allocateTaskIDsLocked(
namespaceEntry,
workflowID,
request.UpdateWorkflowMutation.Tasks,
&transferMaxReadLevel,
&transferExclusiveMaxReadLevel,
); err != nil {
return nil, err
}
Expand All @@ -675,7 +677,7 @@ func (s *ContextImpl) UpdateWorkflowExecution(
namespaceEntry,
workflowID,
request.NewWorkflowSnapshot.Tasks,
&transferMaxReadLevel,
&transferExclusiveMaxReadLevel,
); err != nil {
return nil, err
}
Expand All @@ -685,7 +687,7 @@ func (s *ContextImpl) UpdateWorkflowExecution(
currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
resp, err := s.executionManager.UpdateWorkflowExecution(ctx, request)
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferExclusiveMaxReadLevel); err != nil {
return nil, err
}
return resp, nil
Expand Down Expand Up @@ -731,13 +733,13 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
return nil, err
}

transferMaxReadLevel := int64(0)
transferExclusiveMaxReadLevel := int64(0)
if request.CurrentWorkflowMutation != nil {
if err := s.allocateTaskIDsLocked(
namespaceEntry,
workflowID,
request.CurrentWorkflowMutation.Tasks,
&transferMaxReadLevel,
&transferExclusiveMaxReadLevel,
); err != nil {
return nil, err
}
Expand All @@ -746,7 +748,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
namespaceEntry,
workflowID,
request.ResetWorkflowSnapshot.Tasks,
&transferMaxReadLevel,
&transferExclusiveMaxReadLevel,
); err != nil {
return nil, err
}
Expand All @@ -755,7 +757,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
namespaceEntry,
workflowID,
request.NewWorkflowSnapshot.Tasks,
&transferMaxReadLevel,
&transferExclusiveMaxReadLevel,
); err != nil {
return nil, err
}
Expand All @@ -764,7 +766,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
resp, err := s.executionManager.ConflictResolveWorkflowExecution(ctx, request)
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferExclusiveMaxReadLevel); err != nil {
return nil, err
}
return resp, nil
Expand Down Expand Up @@ -795,20 +797,20 @@ func (s *ContextImpl) SetWorkflowExecution(
return nil, err
}

transferMaxReadLevel := int64(0)
transferExclusiveMaxReadLevel := int64(0)
if err := s.allocateTaskIDsLocked(
namespaceEntry,
workflowID,
request.SetWorkflowSnapshot.Tasks,
&transferMaxReadLevel,
&transferExclusiveMaxReadLevel,
); err != nil {
return nil, err
}

currentRangeID := s.getRangeIDLocked()
request.RangeID = currentRangeID
resp, err := s.executionManager.SetWorkflowExecution(ctx, request)
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel); err != nil {
if err = s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferExclusiveMaxReadLevel); err != nil {
return nil, err
}
return resp, nil
Expand Down Expand Up @@ -849,19 +851,19 @@ func (s *ContextImpl) addTasksLocked(
request *persistence.AddHistoryTasksRequest,
namespaceEntry *namespace.Namespace,
) error {
transferMaxReadLevel := int64(0)
transferExclusiveMaxReadLevel := int64(0)
if err := s.allocateTaskIDsLocked(
namespaceEntry,
request.WorkflowID,
request.Tasks,
&transferMaxReadLevel,
&transferExclusiveMaxReadLevel,
); err != nil {
return err
}

request.RangeID = s.getRangeIDLocked()
err := s.executionManager.AddHistoryTasks(ctx, request)
return s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferMaxReadLevel)
return s.handleWriteErrorAndUpdateMaxReadLevelLocked(err, transferExclusiveMaxReadLevel)
}

func (s *ContextImpl) AppendHistoryEvents(
Expand Down Expand Up @@ -1131,16 +1133,16 @@ func (s *ContextImpl) renewRangeLocked(isStealing bool) error {

s.taskSequenceNumber = updatedShardInfo.GetRangeId() << s.config.RangeSizeBits
s.maxTaskSequenceNumber = (updatedShardInfo.GetRangeId() + 1) << s.config.RangeSizeBits
s.immediateTaskMaxReadLevel = s.taskSequenceNumber - 1
s.immediateTaskExclusiveMaxReadLevel = s.taskSequenceNumber
s.shardInfo = updatedShardInfo

return nil
}

func (s *ContextImpl) updateMaxReadLevelLocked(rl int64) {
if rl > s.immediateTaskMaxReadLevel {
if rl > s.immediateTaskExclusiveMaxReadLevel {
s.contextTaggedLogger.Debug("Updating MaxTaskID", tag.MaxLevel(rl))
s.immediateTaskMaxReadLevel = rl
s.immediateTaskExclusiveMaxReadLevel = rl
}
}

Expand Down Expand Up @@ -1207,10 +1209,10 @@ func (s *ContextImpl) emitShardInfoMetricsLogsLocked() {
diffTransferLevel := maxTransferLevel.TaskID - minTransferLevel.TaskID
diffTimerLevel := maxTimerLevel.FireTime.Sub(minTimerLevel.FireTime)

replicationLag := s.immediateTaskMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryReplication).TaskID
transferLag := s.immediateTaskMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryTransfer).TaskID
replicationLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryReplication).TaskID - 1
transferLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryTransfer).TaskID - 1
timerLag := s.timeSource.Now().Sub(s.getQueueAckLevelLocked(tasks.CategoryTimer).FireTime)
visibilityLag := s.immediateTaskMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryVisibility).TaskID
visibilityLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryVisibility).TaskID - 1

transferFailoverInProgress := len(s.shardInfo.FailoverLevels[tasks.CategoryTransfer])
timerFailoverInProgress := len(s.shardInfo.FailoverLevels[tasks.CategoryTimer])
Expand Down Expand Up @@ -1251,7 +1253,7 @@ func (s *ContextImpl) allocateTaskIDsLocked(
namespaceEntry *namespace.Namespace,
workflowID string,
newTasks map[tasks.Category][]tasks.Task,
transferMaxReadLevel *int64,
transferExclusiveMaxReadLevel *int64,
) error {
currentCluster := s.GetClusterMetadata().GetCurrentClusterName()
for category, tasksByCategory := range newTasks {
Expand All @@ -1263,7 +1265,7 @@ func (s *ContextImpl) allocateTaskIDsLocked(
}
s.contextTaggedLogger.Debug("Assigning task ID", tag.TaskID(id))
task.SetTaskID(id)
*transferMaxReadLevel = id
*transferExclusiveMaxReadLevel = id + 1

// if scheduled task, check if fire time is in the past
if category.Type() == tasks.CategoryTypeScheduled {
Expand Down
12 changes: 6 additions & 6 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ func (s *contextSuite) TestTimerMaxReadLevelInitialization() {
func (s *contextSuite) TestTimerMaxReadLevelUpdate() {
now := time.Now()
s.timeSource.Update(now)
maxReadLevel := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryTimer, cluster.TestCurrentClusterName)
maxReadLevel := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName)

s.timeSource.Update(now.Add(-time.Minute))
newMaxReadLevel := s.mockShard.GetQueueMaxReadLevel(tasks.CategoryTimer, cluster.TestCurrentClusterName)
newMaxReadLevel := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName)
s.Equal(maxReadLevel, newMaxReadLevel)

s.timeSource.Update(now.Add(time.Minute))
newMaxReadLevel = s.mockShard.GetQueueMaxReadLevel(tasks.CategoryTimer, cluster.TestCurrentClusterName)
newMaxReadLevel = s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName)
s.True(newMaxReadLevel.FireTime.After(maxReadLevel.FireTime))
}
Loading