From 6f418713e7c5c45e41d3a529ee00d26d21c4687c Mon Sep 17 00:00:00 2001 From: Nate Mortensen Date: Fri, 13 Dec 2024 09:59:20 -0800 Subject: [PATCH] Add task isolation leaking (#6544) Add taskIsolationDuration, the time during which tasks will only be routed to pollers belonging to the same isolation group as them*. Additionally replace the hardcoded constants of 1 minute and 10s for startup flexibilility and recent poller lookback respectively with a single value: TaskIsolationPollerWindow. For the sake of measuring impact we now include the original partition group within the partition config prior to dispatching the task. This value isn't persisted within the tasks table and is only necessary so that a task that is forwarded after abandoning isolation will emit the correct metric. No behavior depends on this value, it is only used in metrics. Add integration tests for tasklist isolation, as well as unit tests for getIsolationGroupForTask. Update the matching simulator to include this new value. --- common/dynamicconfig/constants.go | 26 +++ common/log/tag/tags.go | 8 + common/metrics/defs.go | 5 +- common/metrics/tags.go | 23 +- common/partition/default-partitioner.go | 5 +- host/matching_simulation_test.go | 1 + host/onebox.go | 6 +- host/task_list_isolation_test.go | 213 ++++++++++++++++++ host/taskpoller.go | 177 +++++++++++++-- host/test_suites.go | 5 + .../matching_simulation_zonal_isolation.yaml | 1 + ...imulation_zonal_isolation_few_pollers.yaml | 1 + ...mulation_zonal_isolation_many_pollers.yaml | 1 + ...tion_zonal_isolation_single_partition.yaml | 1 + ...ching_simulation_zonal_isolation_skew.yaml | 1 + ...mulation_zonal_isolation_skew_extreme.yaml | 1 + ...ation_zonal_isolation_skew_forwarding.yaml | 1 + .../task_list_isolation_test_cluster.yaml | 13 ++ scripts/run_matching_simulator.sh | 18 +- service/matching/config/config.go | 8 +- service/matching/config/config_test.go | 2 + service/matching/handler/engine.go | 8 +- service/matching/tasklist/task.go | 16 ++ .../matching/tasklist/task_list_manager.go | 43 ++-- .../tasklist/task_list_manager_test.go | 162 +++++++++++++ service/matching/tasklist/task_test.go | 163 ++++++++++++++ 26 files changed, 852 insertions(+), 57 deletions(-) create mode 100644 host/task_list_isolation_test.go create mode 100644 host/testdata/task_list_isolation_test_cluster.yaml create mode 100644 service/matching/tasklist/task_test.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index df35b2c766b..a9233c54a10 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2796,6 +2796,20 @@ const ( // Allowed filters: domainName, taskListName, taskListType LocalTaskWaitTime + // TaskIsolationDuration is the time period for which we attempt to respect tasklist isolation before allowing any poller to process the task + // KeyName: matching.taskIsolationDuration + // Value type: Duration + // Default value: 0 + // Allowed filters: domainName, taskListName, taskListType + TaskIsolationDuration + + // TaskIsolationPollerWindow is the time period for which pollers are remembered when deciding whether to skip tasklist isolation due to unpolled isolation groups. + // KeyName: matching.taskIsolationPollerWindow + // Value type: Duration + // Default value: 10s + // Allowed filters: domainName, taskListName, taskListType + TaskIsolationPollerWindow + // LastDurationKey must be the last one in this const group LastDurationKey ) @@ -5077,6 +5091,18 @@ var DurationKeys = map[DurationKey]DynamicDuration{ Description: "LocalTaskWaitTime is the time a task waits for a poller to arrive before considering task forwarding", DefaultValue: time.Millisecond * 10, }, + TaskIsolationDuration: { + KeyName: "matching.taskIsolationDuration", + Filters: []Filter{DomainName, TaskListName, TaskType}, + Description: "TaskIsolationDuration is the time period for which we attempt to respect tasklist isolation before allowing any poller to process the task", + DefaultValue: 0, + }, + TaskIsolationPollerWindow: { + KeyName: "matching.taskIsolationPollerWindow", + Filters: []Filter{DomainName, TaskListName, TaskType}, + Description: "TaskIsolationDuration is the time period for which we attempt to respect tasklist isolation before allowing any poller to process the task", + DefaultValue: time.Second * 10, + }, } var MapKeys = map[MapKey]DynamicMap{ diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 475802ce750..ac24cfbbaff 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -982,6 +982,14 @@ func IsolationGroup(group string) Tag { return newStringTag("isolation-group", group) } +func TaskLatency(duration time.Duration) Tag { + return newDurationTag("task-latency", duration) +} + +func IsolationDuration(duration time.Duration) Tag { + return newDurationTag("isolation-duration", duration) +} + func PartitionConfig(p map[string]string) Tag { return newObjectTag("partition-config", p) } diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 5cf03a74c0c..e6827aab0c0 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2638,7 +2638,8 @@ const ( StandbyClusterTasksCompletedCounterPerTaskList StandbyClusterTasksNotStartedCounterPerTaskList StandbyClusterTasksCompletionFailurePerTaskList - + TaskIsolationExpiredPerTaskList + TaskIsolationErrorPerTaskList NumMatchingMetrics ) @@ -3334,6 +3335,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ StandbyClusterTasksCompletedCounterPerTaskList: {metricName: "standby_cluster_tasks_completed_per_tl", metricType: Counter}, StandbyClusterTasksNotStartedCounterPerTaskList: {metricName: "standby_cluster_tasks_not_started_per_tl", metricType: Counter}, StandbyClusterTasksCompletionFailurePerTaskList: {metricName: "standby_cluster_tasks_completion_failure_per_tl", metricType: Counter}, + TaskIsolationExpiredPerTaskList: {metricName: "task_isolation_expired_per_tl", metricRollupName: "task_isolation_expired"}, + TaskIsolationErrorPerTaskList: {metricName: "task_isolation_error_per_tl", metricRollupName: "task_isolation_error"}, }, Worker: { ReplicatorMessages: {metricName: "replicator_messages"}, diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 3368a9cb518..f76a42ebf13 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -21,7 +21,6 @@ package metrics import ( - "fmt" "regexp" "strconv" ) @@ -65,6 +64,8 @@ const ( workflowTerminationReason = "workflow_termination_reason" workflowCloseStatus = "workflow_close_status" isolationEnabled = "isolation_enabled" + isolationGroup = "isolation_group" + originalIsolationGroup = "original_isolation_group" topic = "topic" mode = "mode" @@ -312,19 +313,13 @@ func WorkflowCloseStatusTag(value string) Tag { return simpleMetric{key: workflowCloseStatus, value: value} } -// PartitionConfigTags returns a list of partition config tags -func PartitionConfigTags(partitionConfig map[string]string) []Tag { - tags := make([]Tag, 0, len(partitionConfig)) - for k, v := range partitionConfig { - if len(k) == 0 { - continue - } - if len(v) == 0 { - v = unknownValue - } - tags = append(tags, simpleMetric{key: sanitizer.Value(fmt.Sprintf("pk_%s", k)), value: sanitizer.Value(v)}) - } - return tags +func OriginalIsolationGroupTag(group string) Tag { + return simpleMetric{key: originalIsolationGroup, value: sanitizer.Value(group)} +} + +func IsolationGroupTag(group string) Tag { + return simpleMetric{key: isolationGroup, value: sanitizer.Value(group)} + } // IsolationEnabledTag returns whether isolation is enabled diff --git a/common/partition/default-partitioner.go b/common/partition/default-partitioner.go index be33344a70b..e37081fe70f 100644 --- a/common/partition/default-partitioner.go +++ b/common/partition/default-partitioner.go @@ -34,8 +34,9 @@ import ( ) const ( - IsolationGroupKey = "isolation-group" - WorkflowIDKey = "wf-id" + IsolationGroupKey = "isolation-group" + OriginalIsolationGroupKey = "original-isolation-group" + WorkflowIDKey = "wf-id" ) // ErrNoIsolationGroupsAvailable is returned when there are no available isolation-groups diff --git a/host/matching_simulation_test.go b/host/matching_simulation_test.go index 12b1f99f7bc..37a080caeae 100644 --- a/host/matching_simulation_test.go +++ b/host/matching_simulation_test.go @@ -125,6 +125,7 @@ func TestMatchingSimulationSuite(t *testing.T) { dynamicconfig.MatchingPartitionUpscaleSustainedDuration: clusterConfig.MatchingConfig.SimulationConfig.PartitionUpscaleSustainedDuration, dynamicconfig.MatchingPartitionDownscaleSustainedDuration: clusterConfig.MatchingConfig.SimulationConfig.PartitionDownscaleSustainedDuration, dynamicconfig.MatchingAdaptiveScalerUpdateInterval: clusterConfig.MatchingConfig.SimulationConfig.AdaptiveScalerUpdateInterval, + dynamicconfig.TaskIsolationDuration: clusterConfig.MatchingConfig.SimulationConfig.TaskIsolationDuration, } ctrl := gomock.NewController(t) diff --git a/host/onebox.go b/host/onebox.go index ebe2ea5cfa6..112a3343699 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -204,6 +204,7 @@ type ( PartitionUpscaleSustainedDuration time.Duration PartitionDownscaleSustainedDuration time.Duration AdaptiveScalerUpdateInterval time.Duration + TaskIsolationDuration time.Duration } SimulationPollerConfiguration struct { @@ -1109,7 +1110,10 @@ func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo TChannelAddress: tchannelAddress, GRPCAddress: grpcAddress, InboundMiddleware: yarpc.InboundMiddleware{ - Unary: &versionMiddleware{}, + Unary: yarpc.UnaryInboundMiddleware(&versionMiddleware{}, &rpc.ClientPartitionConfigMiddleware{}, &rpc.ForwardPartitionConfigMiddleware{}), + }, + OutboundMiddleware: yarpc.OutboundMiddleware{ + Unary: &rpc.ForwardPartitionConfigMiddleware{}, }, // For integration tests to generate client out of the same outbound. diff --git a/host/task_list_isolation_test.go b/host/task_list_isolation_test.go new file mode 100644 index 00000000000..f39ae2a84d1 --- /dev/null +++ b/host/task_list_isolation_test.go @@ -0,0 +1,213 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package host + +import ( + "errors" + "flag" + "strconv" + "testing" + "time" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.uber.org/yarpc" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/types" +) + +const ( + tl = "integration-task-list-isolation-tl" +) + +func TestTaskListIsolationSuite(t *testing.T) { + flag.Parse() + + var isolationGroups = []any{ + "a", "b", "c", + } + clusterConfig, err := GetTestClusterConfig("testdata/task_list_isolation_test_cluster.yaml") + if err != nil { + panic(err) + } + testCluster := NewPersistenceTestCluster(t, clusterConfig) + clusterConfig.FrontendDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ + dynamicconfig.EnableTasklistIsolation: true, + dynamicconfig.AllIsolationGroups: isolationGroups, + dynamicconfig.MatchingNumTasklistWritePartitions: 1, + dynamicconfig.MatchingNumTasklistReadPartitions: 1, + } + clusterConfig.HistoryDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ + dynamicconfig.EnableTasklistIsolation: true, + dynamicconfig.AllIsolationGroups: isolationGroups, + dynamicconfig.MatchingNumTasklistWritePartitions: 1, + dynamicconfig.MatchingNumTasklistReadPartitions: 1, + } + clusterConfig.MatchingDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ + dynamicconfig.EnableTasklistIsolation: true, + dynamicconfig.AllIsolationGroups: isolationGroups, + dynamicconfig.TaskIsolationDuration: time.Second * 5, + dynamicconfig.MatchingNumTasklistWritePartitions: 1, + dynamicconfig.MatchingNumTasklistReadPartitions: 1, + } + + s := new(TaskListIsolationIntegrationSuite) + params := IntegrationBaseParams{ + DefaultTestCluster: testCluster, + VisibilityTestCluster: testCluster, + TestClusterConfig: clusterConfig, + } + s.IntegrationBase = NewIntegrationBase(params) + suite.Run(t, s) +} + +func (s *TaskListIsolationIntegrationSuite) SetupSuite() { + s.setupSuite() +} + +func (s *TaskListIsolationIntegrationSuite) TearDownSuite() { + s.tearDownSuite() +} + +func (s *TaskListIsolationIntegrationSuite) SetupTest() { + // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil + s.Assertions = require.New(s.T()) +} + +func (s *TaskListIsolationIntegrationSuite) TestTaskListIsolation() { + aPoller := s.createPoller("a") + bPoller := s.createPoller("b") + + cancelB := bPoller.PollAndProcessDecisions() + defer cancelB() + cancelA := aPoller.PollAndProcessDecisions() + defer cancelA() + + // Give pollers time to start + time.Sleep(time.Second) + + // Running a single workflow is a bit of a coinflip: if isolation didn't work, it would pass 50% of the time. + // Run 10 workflows to demonstrate that we consistently isolate tasks to the correct poller + for i := 0; i < 10; i++ { + runID := s.startWorkflow("a").RunID + result, err := s.getWorkflowResult(runID) + s.NoError(err) + s.Equal("a", result) + } +} + +func (s *TaskListIsolationIntegrationSuite) TestTaskListIsolationLeak() { + runID := s.startWorkflow("a").RunID + + bPoller := s.createPoller("b") + // B will get the task as there are no pollers from A + cancelB := bPoller.PollAndProcessDecisions() + defer cancelB() + + result, err := s.getWorkflowResult(runID) + s.NoError(err) + s.Equal("b", result) +} + +func (s *TaskListIsolationIntegrationSuite) createPoller(group string) *TaskPoller { + return &TaskPoller{ + Engine: s.engine, + Domain: s.domainName, + TaskList: &types.TaskList{Name: tl, Kind: types.TaskListKindNormal.Ptr()}, + Identity: group, + DecisionHandler: func(execution *types.WorkflowExecution, wt *types.WorkflowType, previousStartedEventID, startedEventID int64, history *types.History) ([]byte, []*types.Decision, error) { + // Complete the workflow with the group name + return []byte(strconv.Itoa(0)), []*types.Decision{{ + DecisionType: types.DecisionTypeCompleteWorkflowExecution.Ptr(), + CompleteWorkflowExecutionDecisionAttributes: &types.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte(group), + }, + }}, nil + }, + Logger: s.Logger, + T: s.T(), + CallOptions: []yarpc.CallOption{withIsolationGroup(group)}, + } +} + +func (s *TaskListIsolationIntegrationSuite) startWorkflow(group string) *types.StartWorkflowExecutionResponse { + identity := "test" + + request := &types.StartWorkflowExecutionRequest{ + RequestID: uuid.New(), + Domain: s.domainName, + WorkflowID: s.T().Name(), + WorkflowType: &types.WorkflowType{ + Name: "integration-task-list-isolation-type", + }, + TaskList: &types.TaskList{ + Name: tl, + Kind: types.TaskListKindNormal.Ptr(), + }, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Identity: identity, + WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(), + } + + ctx, cancel := createContext() + defer cancel() + result, err := s.engine.StartWorkflowExecution(ctx, request, withIsolationGroup(group)) + s.Nil(err) + + return result +} + +func (s *TaskListIsolationIntegrationSuite) getWorkflowResult(runID string) (string, error) { + ctx, cancel := createContext() + historyResponse, err := s.engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{ + Domain: s.domainName, + Execution: &types.WorkflowExecution{ + WorkflowID: s.T().Name(), + RunID: runID, + }, + HistoryEventFilterType: types.HistoryEventFilterTypeCloseEvent.Ptr(), + WaitForNewEvent: true, + }) + cancel() + if err != nil { + return "", err + } + history := historyResponse.History + + lastEvent := history.Events[len(history.Events)-1] + if *lastEvent.EventType != types.EventTypeWorkflowExecutionCompleted { + return "", errors.New("workflow didn't complete") + } + + return string(lastEvent.WorkflowExecutionCompletedEventAttributes.Result), nil + +} + +func withIsolationGroup(group string) yarpc.CallOption { + return yarpc.WithHeader(common.ClientIsolationGroupHeaderName, group) +} diff --git a/host/taskpoller.go b/host/taskpoller.go index ce6720812e7..7413ece3ca9 100644 --- a/host/taskpoller.go +++ b/host/taskpoller.go @@ -22,10 +22,13 @@ package host import ( "context" + "errors" + "sync" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/yarpc" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" @@ -55,6 +58,7 @@ type ( QueryHandler queryHandler Logger log.Logger T *testing.T + CallOptions []yarpc.CallOption } ) @@ -136,7 +140,7 @@ Loop: Domain: p.Domain, TaskList: taskList, Identity: p.Identity, - }) + }, p.CallOptions...) cancel() if err1 != nil { @@ -173,7 +177,7 @@ Loop: Domain: p.Domain, Execution: response.WorkflowExecution, NextPageToken: nextPageToken, - }) + }, p.CallOptions...) cancel() if err2 != nil { @@ -219,7 +223,7 @@ Loop: } ctx, cancel := createContext() - taskErr := p.Engine.RespondQueryTaskCompleted(ctx, completeRequest) + taskErr := p.Engine.RespondQueryTaskCompleted(ctx, completeRequest, p.CallOptions...) cancel() return true, nil, taskErr } @@ -245,7 +249,7 @@ Loop: Cause: types.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), Details: []byte(err.Error()), Identity: p.Identity, - }) + }, p.CallOptions...) cancel() return isQueryTask, nil, taskErr } @@ -262,7 +266,7 @@ Loop: ReturnNewDecisionTask: forceCreateNewDecision, ForceCreateNewDecisionTask: forceCreateNewDecision, QueryResults: getQueryResults(response.GetQueries(), queryResult), - }) + }, p.CallOptions...) cancel() return false, newTask, err } @@ -283,6 +287,7 @@ Loop: ForceCreateNewDecisionTask: forceCreateNewDecision, QueryResults: getQueryResults(response.GetQueries(), queryResult), }, + p.CallOptions..., ) cancel() @@ -322,7 +327,7 @@ func (p *TaskPoller) HandlePartialDecision(response *types.PollForDecisionTaskRe Cause: types.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), Details: []byte(err.Error()), Identity: p.Identity, - }) + }, p.CallOptions...) } p.Logger.Info("Completing Decision. Decisions: %v", tag.Value(decisions)) @@ -344,6 +349,7 @@ func (p *TaskPoller) HandlePartialDecision(response *types.PollForDecisionTaskRe ReturnNewDecisionTask: true, ForceCreateNewDecisionTask: true, }, + p.CallOptions..., ) return newTask, err @@ -357,7 +363,7 @@ func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error { Domain: p.Domain, TaskList: p.TaskList, Identity: p.Identity, - }) + }, p.CallOptions...) ctxCancel() if err1 != nil { @@ -384,7 +390,7 @@ func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error { TaskToken: response.TaskToken, Details: []byte("details"), Identity: p.Identity, - }) + }, p.CallOptions...) ctxCancel() return taskErr } @@ -396,7 +402,7 @@ func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error { Reason: common.StringPtr(err2.Error()), Details: []byte(err2.Error()), Identity: p.Identity, - }) + }, p.CallOptions...) ctxCancel() return taskErr } @@ -406,7 +412,7 @@ func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error { TaskToken: response.TaskToken, Identity: p.Identity, Result: result, - }) + }, p.CallOptions...) ctxCancel() return taskErr } @@ -422,7 +428,7 @@ func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error { Domain: p.Domain, TaskList: p.TaskList, Identity: p.Identity, - }) + }, p.CallOptions...) ctxCancel() if err1 != nil { @@ -457,7 +463,7 @@ func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error { ActivityID: response.GetActivityID(), Details: []byte("details"), Identity: p.Identity, - }) + }, p.CallOptions...) ctxCancel() return taskErr } @@ -472,7 +478,7 @@ func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error { Reason: common.StringPtr(err2.Error()), Details: []byte(err2.Error()), Identity: p.Identity, - }) + }, p.CallOptions...) ctxCancel() return taskErr } @@ -485,7 +491,7 @@ func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error { ActivityID: response.GetActivityID(), Identity: p.Identity, Result: result, - }) + }, p.CallOptions...) ctxCancel() return taskErr } @@ -493,6 +499,149 @@ func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error { return tasklist.ErrNoTasks } +func (p *TaskPoller) PollAndProcessDecisions() context.CancelFunc { + var wg sync.WaitGroup + wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + go p.pollLoop(ctx, &wg, p.doPollDecisionTask) + + return func() { + cancel() + wg.Wait() + } +} + +func (p *TaskPoller) pollLoop(ctx context.Context, wg *sync.WaitGroup, pollFunc func(context.Context) error) { + for { + select { + case <-ctx.Done(): + wg.Done() + return + default: + err := pollFunc(ctx) + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + p.Logger.Error("Error while polling", tag.Error(err)) + } + } + } +} + +func (p *TaskPoller) doPollDecisionTask(ctx context.Context) error { + taskList := p.TaskList + pollCtx, cancel := context.WithTimeout(ctx, time.Second*90) + response, err := p.Engine.PollForDecisionTask(pollCtx, &types.PollForDecisionTaskRequest{ + Domain: p.Domain, + TaskList: taskList, + Identity: p.Identity, + }, p.CallOptions...) + cancel() + + if err != nil { + return err + } + + if response == nil || len(response.TaskToken) == 0 { + p.Logger.Info("Empty Decision task: Polling again.") + return nil + } + + if response.GetNextEventID() == 0 { + p.Logger.Fatal("NextEventID is not set for decision or query task") + } + + var events []*types.HistoryEvent + if response.Query == nil { + // if not query task, should have some history events + // for non sticky query, there should be events returned + history := response.History + if history == nil { + p.Logger.Fatal("History is nil") + } + + events = history.Events + if len(events) == 0 { + p.Logger.Fatal("History Events are empty") + } + + nextPageToken := response.NextPageToken + for nextPageToken != nil { + historyCtx, cancel := context.WithTimeout(ctx, time.Second*90) + resp, err2 := p.Engine.GetWorkflowExecutionHistory(historyCtx, &types.GetWorkflowExecutionHistoryRequest{ + Domain: p.Domain, + Execution: response.WorkflowExecution, + NextPageToken: nextPageToken, + }, p.CallOptions...) + cancel() + + if err2 != nil { + return err2 + } + + events = append(events, resp.History.Events...) + nextPageToken = resp.NextPageToken + } + } else { + // for sticky query, there should be NO events returned + // since worker side already has the state machine and we do not intend to update that. + history := response.History + nextPageToken := response.NextPageToken + if !(history == nil || (len(history.Events) == 0 && nextPageToken == nil)) { + // if history is not nil, and contains events or next token + p.Logger.Fatal("History is not empty for sticky query") + } + } + + // handle query task response + if response.Query != nil { + blob, err := p.QueryHandler(response) + + completeRequest := &types.RespondQueryTaskCompletedRequest{TaskToken: response.TaskToken} + if err != nil { + completeType := types.QueryTaskCompletedTypeFailed + completeRequest.CompletedType = &completeType + completeRequest.ErrorMessage = err.Error() + } else { + completeType := types.QueryTaskCompletedTypeCompleted + completeRequest.CompletedType = &completeType + completeRequest.QueryResult = blob + } + + respondCtx, cancel := context.WithTimeout(ctx, time.Second*90) + taskErr := p.Engine.RespondQueryTaskCompleted(respondCtx, completeRequest, p.CallOptions...) + cancel() + return taskErr + } + executionCtx, decisions, err := p.DecisionHandler(response.WorkflowExecution, response.WorkflowType, + common.Int64Default(response.PreviousStartedEventID), response.StartedEventID, response.History) + if err != nil { + p.Logger.Info("Failing Decision. Decision handler failed with error", tag.Error(err)) + respondCtx, cancel := context.WithTimeout(ctx, time.Second*90) + taskErr := p.Engine.RespondDecisionTaskFailed(respondCtx, &types.RespondDecisionTaskFailedRequest{ + TaskToken: response.TaskToken, + Cause: types.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), + Details: []byte(err.Error()), + Identity: p.Identity, + }, p.CallOptions...) + cancel() + return taskErr + } + + p.Logger.Info("Completing Decision. Decisions", tag.Value(decisions)) + // non sticky tasklist + respondCtx, cancel := context.WithTimeout(ctx, time.Second*90) + _, err = p.Engine.RespondDecisionTaskCompleted(respondCtx, &types.RespondDecisionTaskCompletedRequest{ + TaskToken: response.TaskToken, + Identity: p.Identity, + ExecutionContext: executionCtx, + Decisions: decisions, + ReturnNewDecisionTask: false, + ForceCreateNewDecisionTask: false, + }, p.CallOptions...) + cancel() + + return err +} + func createContext() (context.Context, context.CancelFunc) { ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) return ctx, cancel diff --git a/host/test_suites.go b/host/test_suites.go index c3911427f04..5a1421d91eb 100644 --- a/host/test_suites.go +++ b/host/test_suites.go @@ -75,4 +75,9 @@ type ( *require.Assertions *IntegrationBase } + + TaskListIsolationIntegrationSuite struct { + *require.Assertions + *IntegrationBase + } ) diff --git a/host/testdata/matching_simulation_zonal_isolation.yaml b/host/testdata/matching_simulation_zonal_isolation.yaml index 118dbbb55c3..362b9d1d0d1 100644 --- a/host/testdata/matching_simulation_zonal_isolation.yaml +++ b/host/testdata/matching_simulation_zonal_isolation.yaml @@ -16,6 +16,7 @@ matchingconfig: forwardermaxchildrenpernode: 20 localpollwaittime: 10ms localtaskwaittime: 10ms + taskisolationduration: 1s tasks: - numtaskgenerators: 30 taskspersecond: 500 diff --git a/host/testdata/matching_simulation_zonal_isolation_few_pollers.yaml b/host/testdata/matching_simulation_zonal_isolation_few_pollers.yaml index fb06fb2640b..c6ecc23f75e 100644 --- a/host/testdata/matching_simulation_zonal_isolation_few_pollers.yaml +++ b/host/testdata/matching_simulation_zonal_isolation_few_pollers.yaml @@ -16,6 +16,7 @@ matchingconfig: forwardermaxchildrenpernode: 20 localpollwaittime: 10ms localtaskwaittime: 10ms + taskisolationduration: 1s tasks: - numtaskgenerators: 30 taskspersecond: 500 diff --git a/host/testdata/matching_simulation_zonal_isolation_many_pollers.yaml b/host/testdata/matching_simulation_zonal_isolation_many_pollers.yaml index ac99ce348b0..3f28631f2d6 100644 --- a/host/testdata/matching_simulation_zonal_isolation_many_pollers.yaml +++ b/host/testdata/matching_simulation_zonal_isolation_many_pollers.yaml @@ -16,6 +16,7 @@ matchingconfig: forwardermaxchildrenpernode: 20 localpollwaittime: 10ms localtaskwaittime: 10ms + taskisolationduration: 1s tasks: - numtaskgenerators: 30 taskspersecond: 500 diff --git a/host/testdata/matching_simulation_zonal_isolation_single_partition.yaml b/host/testdata/matching_simulation_zonal_isolation_single_partition.yaml index 191a18ef4c1..c2b0b3f9a1a 100644 --- a/host/testdata/matching_simulation_zonal_isolation_single_partition.yaml +++ b/host/testdata/matching_simulation_zonal_isolation_single_partition.yaml @@ -16,6 +16,7 @@ matchingconfig: forwardermaxchildrenpernode: 20 localpollwaittime: 10ms localtaskwaittime: 10ms + taskisolationduration: 1s tasks: - numtaskgenerators: 30 taskspersecond: 500 diff --git a/host/testdata/matching_simulation_zonal_isolation_skew.yaml b/host/testdata/matching_simulation_zonal_isolation_skew.yaml index 77da56caf0a..d126a99e177 100644 --- a/host/testdata/matching_simulation_zonal_isolation_skew.yaml +++ b/host/testdata/matching_simulation_zonal_isolation_skew.yaml @@ -16,6 +16,7 @@ matchingconfig: forwardermaxchildrenpernode: 20 localpollwaittime: 10ms localtaskwaittime: 10ms + taskisolationduration: 1s tasks: - numtaskgenerators: 10 taskspersecond: 180 diff --git a/host/testdata/matching_simulation_zonal_isolation_skew_extreme.yaml b/host/testdata/matching_simulation_zonal_isolation_skew_extreme.yaml index d57fcead6ab..c38e345049c 100644 --- a/host/testdata/matching_simulation_zonal_isolation_skew_extreme.yaml +++ b/host/testdata/matching_simulation_zonal_isolation_skew_extreme.yaml @@ -16,6 +16,7 @@ matchingconfig: forwardermaxchildrenpernode: 20 localpollwaittime: 10ms localtaskwaittime: 10ms + taskisolationduration: 1s tasks: - numtaskgenerators: 3 taskspersecond: 50 diff --git a/host/testdata/matching_simulation_zonal_isolation_skew_forwarding.yaml b/host/testdata/matching_simulation_zonal_isolation_skew_forwarding.yaml index 69028dd6280..9930ad4a3b9 100644 --- a/host/testdata/matching_simulation_zonal_isolation_skew_forwarding.yaml +++ b/host/testdata/matching_simulation_zonal_isolation_skew_forwarding.yaml @@ -16,6 +16,7 @@ matchingconfig: forwardermaxchildrenpernode: 20 localpollwaittime: 10ms localtaskwaittime: 10ms + taskisolationduration: 1s tasks: - numtaskgenerators: 10 taskspersecond: 180 diff --git a/host/testdata/task_list_isolation_test_cluster.yaml b/host/testdata/task_list_isolation_test_cluster.yaml new file mode 100644 index 00000000000..d0ebe1f06c0 --- /dev/null +++ b/host/testdata/task_list_isolation_test_cluster.yaml @@ -0,0 +1,13 @@ +enablearchival: false +clusterno: 0 +messagingclientconfig: + usemock: true +historyconfig: + numhistoryshards: 4 + numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 +workerconfig: + enablearchiver: false + enablereplicator: false + enableindexer: false diff --git a/scripts/run_matching_simulator.sh b/scripts/run_matching_simulator.sh index 1b708974147..fb8fab006c2 100755 --- a/scripts/run_matching_simulator.sh +++ b/scripts/run_matching_simulator.sh @@ -72,6 +72,12 @@ tmp=$(cat "$eventLogsFile" \ echo "Max Task latency (ms): $tmp" | tee -a $testSummaryFile +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ + | jq -s 'map(if .Payload.IsolationGroup == .PartitionConfig."original-isolation-group" then 1 else 0 end) | add / length') +echo "Task Containment: $tmp" | tee -a $testSummaryFile + + tmp=$(cat "$eventLogsFile" \ | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ | jq '{ScheduleID,TaskListName,EventName,Payload}' \ @@ -191,13 +197,13 @@ cat "$eventLogsFile" \ echo "AddDecisionTask request per isolation group (excluding forwarded):" | tee -a $testSummaryFile cat "$eventLogsFile" \ | jq -c 'select(.EventName == "Received AddDecisionTask" and .Payload.RequestForwardedFrom == "")' \ - | jq '.PartitionConfig."isolation-group"' \ + | jq '.PartitionConfig."original-isolation-group" // .PartitionConfig."isolation-group"' \ | jq -c '.' | sort -n | uniq -c | sed -e 's/^/ /' | tee -a $testSummaryFile echo "AddDecisionTask request per isolation group (forwarded):" | tee -a $testSummaryFile cat "$eventLogsFile" \ | jq -c 'select(.EventName == "Received AddDecisionTask" and .Payload.RequestForwardedFrom != "")' \ - | jq '.PartitionConfig."isolation-group"' \ + | jq '.PartitionConfig."original-isolation-group" // .PartitionConfig."isolation-group"' \ | jq -c '.' | sort -n | uniq -c | sed -e 's/^/ /' | tee -a $testSummaryFile echo "PollForDecisionTask request per isolation group (excluding forwarded):" | tee -a $testSummaryFile @@ -215,7 +221,7 @@ cat "$eventLogsFile" \ echo "Latency per isolation group:" | tee -a $testSummaryFile cat "$eventLogsFile" \ | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ - | jq -s 'group_by(.Payload.IsolationGroup)[] | {"IsolationGroup": .[0].Payload.IsolationGroup, + | jq -s 'group_by(.PartitionConfig."original-isolation-group")[] | {"IsolationGroup": .[0].PartitionConfig."original-isolation-group", "Avg": (map(.Payload.Latency | tonumber) | add / length), "Median": (map(.Payload.Latency | tonumber) | sort | .[length/2]), "Max":(map(.Payload.Latency | tonumber) | max) }'\ | jq -s 'sort_by(.IsolationGroup)[]'\ | jq -r '[.IsolationGroup, .Median, .Avg, .Max] | @tsv' \ @@ -225,7 +231,7 @@ cat "$eventLogsFile" \ echo "Latency per isolation group and task list:" | tee -a $testSummaryFile cat "$eventLogsFile" \ | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ - | jq -s 'group_by(.Payload.IsolationGroup, .TaskListName)[] | {"IsolationGroup": .[0].Payload.IsolationGroup, "TaskListName": .[0].TaskListName, + | jq -s 'group_by(.PartitionConfig."original-isolation-group", .TaskListName)[] | {"IsolationGroup": .[0].PartitionConfig."original-isolation-group", "TaskListName": .[0].TaskListName, "Avg": (map(.Payload.Latency | tonumber) | add / length), "Median": (map(.Payload.Latency | tonumber) | sort | .[length/2]), "Max":(map(.Payload.Latency | tonumber) | max) }'\ | jq -s 'sort_by(.TaskListName, .IsolationGroup)[]'\ | jq -r '[.TaskListName, .IsolationGroup, .Median, .Avg, .Max] | @tsv' \ @@ -234,8 +240,8 @@ cat "$eventLogsFile" \ echo "Task Containment per isolation group and task list:" | tee -a $testSummaryFile cat "$eventLogsFile"\ | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ - | jq -s 'group_by(.PartitionConfig."isolation-group", .TaskListName)[] | {"IsolationGroup": .[0].PartitionConfig."isolation-group", "TaskListName": .[0].TaskListName, - "Containment": (map(if .Payload.IsolationGroup == .PartitionConfig."isolation-group" then 1 else 0 end) | add / length) }'\ + | jq -s 'group_by(.PartitionConfig."original-isolation-group", (.Payload.RequestForwardedFrom // .TaskListName))[] | {"IsolationGroup": .[0].PartitionConfig."original-isolation-group", "TaskListName": (.[0].Payload.RequestForwardedFrom // .[0].TaskListName), + "Containment": (map(if .Payload.IsolationGroup == .PartitionConfig."original-isolation-group" then 1 else 0 end) | add / length) }'\ | jq -s 'sort_by(.TaskListName, .IsolationGroup)[]'\ | jq -r '[.TaskListName, .IsolationGroup, .Containment] | @tsv' \ | sort -n | column -t | sed -e 's/^/ /' | tee -a $testSummaryFile diff --git a/service/matching/config/config.go b/service/matching/config/config.go index 45e9cc31654..80c61b6853d 100644 --- a/service/matching/config/config.go +++ b/service/matching/config/config.go @@ -53,6 +53,8 @@ type ( AsyncTaskDispatchTimeout dynamicconfig.DurationPropertyFnWithTaskListInfoFilters LocalPollWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters LocalTaskWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + TaskIsolationDuration dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + TaskIsolationPollerWindow dynamicconfig.DurationPropertyFnWithTaskListInfoFilters EnableGetNumberOfPartitionsFromCache dynamicconfig.BoolPropertyFnWithTaskListInfoFilters PartitionUpscaleRPS dynamicconfig.IntPropertyFnWithTaskListInfoFilters PartitionDownscaleFactor dynamicconfig.FloatPropertyFnWithTaskListInfoFilters @@ -132,7 +134,9 @@ type ( // isolation configuration EnableTasklistIsolation func() bool // A function which returns all the isolation groups - AllIsolationGroups func() []string + AllIsolationGroups func() []string + TaskIsolationDuration func() time.Duration + TaskIsolationPollerWindow func() time.Duration // hostname HostName string // rate limiter configuration @@ -189,6 +193,8 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string, getIsolationGroups PartitionDownscaleSustainedDuration: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionDownscaleSustainedDuration), AdaptiveScalerUpdateInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingAdaptiveScalerUpdateInterval), EnableAdaptiveScaler: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableAdaptiveScaler), + TaskIsolationDuration: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.TaskIsolationDuration), + TaskIsolationPollerWindow: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.TaskIsolationPollerWindow), HostName: hostName, TaskDispatchRPS: 100000.0, TaskDispatchRPSTTL: time.Minute, diff --git a/service/matching/config/config_test.go b/service/matching/config/config_test.go index 26850115128..05f97a86b72 100644 --- a/service/matching/config/config_test.go +++ b/service/matching/config/config_test.go @@ -89,6 +89,8 @@ func TestNewConfig(t *testing.T) { "EnableAdaptiveScaler": {dynamicconfig.MatchingEnableAdaptiveScaler, true}, "EnableStandbyTaskCompletion": {dynamicconfig.MatchingEnableStandbyTaskCompletion, false}, "EnableClientAutoConfig": {dynamicconfig.MatchingEnableClientAutoConfig, false}, + "TaskIsolationDuration": {dynamicconfig.TaskIsolationDuration, time.Duration(35)}, + "TaskIsolationPollerWindow": {dynamicconfig.TaskIsolationPollerWindow, time.Duration(36)}, } client := dynamicconfig.NewInMemoryClient() for fieldName, expected := range fields { diff --git a/service/matching/handler/engine.go b/service/matching/handler/engine.go index 149b7d12a97..8c1bdef1725 100644 --- a/service/matching/handler/engine.go +++ b/service/matching/handler/engine.go @@ -1335,8 +1335,12 @@ func (e *matchingEngineImpl) emitTaskIsolationMetrics( partitionConfig map[string]string, pollerIsolationGroup string, ) { - if len(partitionConfig) > 0 { - scope.Tagged(metrics.PartitionConfigTags(partitionConfig)...).Tagged(metrics.PollerIsolationGroupTag(pollerIsolationGroup)).IncCounter(metrics.IsolationTaskMatchPerTaskListCounter) + if currentGroup, ok := partitionConfig[partition.IsolationGroupKey]; ok { + originalGroup, ok := partitionConfig[partition.OriginalIsolationGroupKey] + if !ok { + originalGroup = currentGroup + } + scope.Tagged(metrics.OriginalIsolationGroupTag(originalGroup), metrics.IsolationGroupTag(currentGroup), metrics.PollerIsolationGroupTag(pollerIsolationGroup)).IncCounter(metrics.IsolationTaskMatchPerTaskListCounter) } } diff --git a/service/matching/tasklist/task.go b/service/matching/tasklist/task.go index b52564539fe..db9d57a7c26 100644 --- a/service/matching/tasklist/task.go +++ b/service/matching/tasklist/task.go @@ -21,6 +21,7 @@ package tasklist import ( + "github.com/uber/cadence/common/partition" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) @@ -83,6 +84,21 @@ func newInternalTask( if forSyncMatch { task.ResponseC = make(chan error, 1) } + + // Rewrite the partitionConfig to match how we're dispatching it + // OriginalIsolationGroup is populated here and isn't written to the DB. If it's already + // present then it's a forwarded task and we should respect it. + if configIsolationGroup, ok := task.Event.PartitionConfig[partition.IsolationGroupKey]; ok { + partitionConfig := make(map[string]string, 3) + if originalIsolationGroup, ok := task.Event.PartitionConfig[partition.OriginalIsolationGroupKey]; ok { + partitionConfig[partition.OriginalIsolationGroupKey] = originalIsolationGroup + } else { + partitionConfig[partition.OriginalIsolationGroupKey] = configIsolationGroup + } + partitionConfig[partition.IsolationGroupKey] = isolationGroup + partitionConfig[partition.WorkflowIDKey] = task.Event.PartitionConfig[partition.WorkflowIDKey] + task.Event.PartitionConfig = partitionConfig + } return task } diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index 98891b1c794..79138ec29a6 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -60,7 +60,8 @@ const ( // Time budget for empty task to propagate through the function stack and be returned to // pollForActivityTask or pollForDecisionTask handler. returnEmptyTaskTimeBudget = time.Second - noIsolationTimeout = -1 + noIsolationTimeout = time.Duration(0) + minimumIsolationDuration = time.Millisecond * 50 ) var ( @@ -865,19 +866,9 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task for k, v := range taskInfo.PartitionConfig { partitionConfig[k] = v } + startedIsolationGroup := partitionConfig[partition.IsolationGroupKey] partitionConfig[partition.WorkflowIDKey] = taskInfo.WorkflowID - pollerIsolationGroups := c.config.AllIsolationGroups() - // Not all poller information are available at the time of task list manager creation, - // because we don't persist poller information in database, so in the first minute, we always assume - // pollers are available in all isolation groups to avoid the risk of leaking a task to another isolation group. - // Besides, for sticky and scalable tasklists, not all poller information are available, we also use all isolation group. - if c.timeSource.Now().Sub(c.createTime) > time.Minute { - pollerIsolationGroups = c.getPollerIsolationGroups() - if len(pollerIsolationGroups) == 0 { - // we don't have any pollers, use all isolation groups and wait for pollers' arriving - pollerIsolationGroups = c.config.AllIsolationGroups() - } - } + pollerIsolationGroups := c.getPollerIsolationGroups() group, err := c.partitioner.GetIsolationGroupByDomainID(ctx, partition.PollerInfo{ @@ -887,17 +878,31 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task }, partitionConfig) if err != nil { // if we're unable to get the isolation group, log the error and fallback to no isolation - c.logger.Error("Failed to get isolation group from partition library", tag.WorkflowID(taskInfo.WorkflowID), tag.WorkflowRunID(taskInfo.RunID), tag.TaskID(taskInfo.TaskID), tag.Error(err)) + c.logger.Error("Failed to get isolation group from partition library", tag.IsolationGroup(startedIsolationGroup), tag.WorkflowID(taskInfo.WorkflowID), tag.WorkflowRunID(taskInfo.RunID), tag.TaskID(taskInfo.TaskID), tag.Error(err)) + c.scope.Tagged(metrics.IsolationGroupTag(startedIsolationGroup)).IncCounter(metrics.TaskIsolationErrorPerTaskList) return defaultTaskBufferIsolationGroup, noIsolationTimeout, nil } + + totalTaskIsolationDuration := c.config.TaskIsolationDuration() + taskIsolationDuration := noIsolationTimeout + if totalTaskIsolationDuration != noIsolationTimeout && group != defaultTaskBufferIsolationGroup { + taskLatency := c.timeSource.Now().Sub(taskInfo.CreatedTime) + if taskLatency < (totalTaskIsolationDuration - minimumIsolationDuration) { + taskIsolationDuration = totalTaskIsolationDuration - taskLatency + } else { + c.logger.Info("Leaking task due to taskIsolationDuration expired", tag.IsolationGroup(group), tag.IsolationDuration(taskIsolationDuration), tag.TaskLatency(taskLatency)) + c.scope.Tagged(metrics.IsolationGroupTag(group)).IncCounter(metrics.TaskIsolationExpiredPerTaskList) + group = defaultTaskBufferIsolationGroup + } + } c.logger.Debug("get isolation group", tag.PollerGroups(pollerIsolationGroups), tag.IsolationGroup(group), tag.PartitionConfig(partitionConfig)) - return group, noIsolationTimeout, nil + return group, taskIsolationDuration, nil } return defaultTaskBufferIsolationGroup, noIsolationTimeout, nil } func (c *taskListManagerImpl) getPollerIsolationGroups() []string { - groupSet := c.pollerHistory.GetPollerIsolationGroups(c.timeSource.Now().Add(-10 * time.Second)) + groupSet := c.pollerHistory.GetPollerIsolationGroups(c.timeSource.Now().Add(-1 * c.config.TaskIsolationPollerWindow())) c.outstandingPollsLock.Lock() for _, poller := range c.outstandingPollsMap { groupSet[poller.isolationGroup]++ @@ -1031,6 +1036,12 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c EnableAdaptiveScaler: func() bool { return cfg.EnableAdaptiveScaler(domainName, taskListName, taskType) }, + TaskIsolationDuration: func() time.Duration { + return cfg.TaskIsolationDuration(domainName, taskListName, taskType) + }, + TaskIsolationPollerWindow: func() time.Duration { + return cfg.TaskIsolationPollerWindow(domainName, taskListName, taskType) + }, ForwarderConfig: config.ForwarderConfig{ ForwarderMaxOutstandingPolls: func() int { return cfg.ForwarderMaxOutstandingPolls(domainName, taskListName, taskType) diff --git a/service/matching/tasklist/task_list_manager_test.go b/service/matching/tasklist/task_list_manager_test.go index 2b25fdc8d74..462d35bf15e 100644 --- a/service/matching/tasklist/task_list_manager_test.go +++ b/service/matching/tasklist/task_list_manager_test.go @@ -42,6 +42,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/isolationgroup" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/log/testlogger" @@ -532,6 +533,167 @@ func TestMisconfiguredZoneDoesNotBlock(t *testing.T) { assert.True(t, breakRetryLoop, "task should be dispatched to default channel") } +func TestGetIsolationGroupForTask(t *testing.T) { + defaultAvailableIsolationGroups := []string{ + "a", "b", "c", + } + taskIsolationPollerWindow := time.Second * 10 + testCases := []struct { + name string + taskIsolationGroup string + taskIsolationDuration time.Duration + taskLatency time.Duration + availableIsolationGroups []string + recentPollers []string + expectedGroup string + expectedDuration time.Duration + disableTaskIsolation bool + }{ + { + name: "success - recent poller allows group", + taskIsolationGroup: "a", + availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "a", + expectedDuration: 0, + recentPollers: []string{"a"}, + }, + { + name: "success - with isolation duration", + taskIsolationGroup: "b", + taskIsolationDuration: time.Second, + availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "b", + expectedDuration: time.Second, + recentPollers: []string{"b"}, + }, + { + name: "success - low task latency", + taskIsolationGroup: "a", + taskIsolationDuration: time.Second, + taskLatency: time.Millisecond * 300, + availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "a", + expectedDuration: time.Second - (time.Millisecond * 300), + recentPollers: []string{"a"}, + }, + { + name: "leak - no recent pollers", + taskIsolationGroup: "a", + availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "", + expectedDuration: 0, + recentPollers: nil, + }, + { + name: "leak - no matching recent poller", + taskIsolationGroup: "a", + taskIsolationDuration: time.Second, + availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "", + expectedDuration: 0, + recentPollers: []string{"b"}, + }, + { + name: "leak - task latency", + taskIsolationGroup: "a", + taskIsolationDuration: time.Second, + taskLatency: time.Second, + availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "", + expectedDuration: 0, + }, + { + name: "leak - task latency close to taskIsolationDuration", + taskIsolationGroup: "a", + taskIsolationDuration: time.Second, + taskLatency: time.Second - minimumIsolationDuration, + availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "", + expectedDuration: 0, + }, + { + name: "leak - partitioner error", + taskIsolationGroup: "a", + taskIsolationDuration: time.Second, + // No isolation groups causes an error + // availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "", + expectedDuration: 0, + }, + { + name: "leak - task isolation disabled", + taskIsolationGroup: "a", + taskIsolationDuration: time.Second, + availableIsolationGroups: defaultAvailableIsolationGroups, + expectedGroup: "", + expectedDuration: 0, + disableTaskIsolation: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + logger := testlogger.New(t) + + config := defaultTestConfig() + config.EnableTasklistIsolation = func(domain string) bool { return !tc.disableTaskIsolation } + config.TaskIsolationDuration = func(domain string, taskList string, taskType int) time.Duration { + return tc.taskIsolationDuration + } + config.TaskIsolationPollerWindow = func(domain string, taskList string, taskType int) time.Duration { + return taskIsolationPollerWindow + } + config.AllIsolationGroups = func() []string { + return tc.availableIsolationGroups + } + mockClock := clock.NewMockedTimeSource() + tlm := createTestTaskListManagerWithConfig(t, logger, controller, config, mockClock) + + mockIsolationGroupState := isolationgroup.NewMockState(controller) + mockIsolationGroupState.EXPECT().IsDrained(gomock.Any(), gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() + mockIsolationGroupState.EXPECT().IsDrainedByDomainID(gomock.Any(), gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() + mockIsolationGroupState.EXPECT().AvailableIsolationGroupsByDomainID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, domainId string, taskListName string, available []string) (types.IsolationGroupConfiguration, error) { + // Report all available isolation groups as healthy + isolationGroupStates := make(types.IsolationGroupConfiguration, len(available)) + for _, availableGroup := range available { + isolationGroupStates[availableGroup] = types.IsolationGroupPartition{ + Name: availableGroup, + State: types.IsolationGroupStateHealthy, + } + } + return isolationGroupStates, nil + }).AnyTimes() + partitioner := partition.NewDefaultPartitioner(log.NewNoop(), mockIsolationGroupState) + tlm.partitioner = partitioner + + for _, pollerGroup := range tc.recentPollers { + tlm.pollerHistory.UpdatePollerInfo(poller.Identity(pollerGroup), poller.Info{IsolationGroup: pollerGroup}) + } + + taskInfo := &persistence.TaskInfo{ + DomainID: "domainId", + RunID: "run1", + WorkflowID: "workflow1", + ScheduleID: 5, + ScheduleToStartTimeoutSeconds: 1, + PartitionConfig: map[string]string{ + partition.IsolationGroupKey: tc.taskIsolationGroup, + partition.WorkflowIDKey: "workflow1", + }, + CreatedTime: mockClock.Now().Add(-1 * tc.taskLatency), + } + + actual, actualDuration, err := tlm.getIsolationGroupForTask(context.Background(), taskInfo) + + assert.Equal(t, tc.expectedGroup, actual) + assert.Equal(t, tc.expectedDuration, actualDuration) + // There are no longer any error cases + assert.Nil(t, err) + }) + } +} + func TestTaskWriterShutdown(t *testing.T) { controller := gomock.NewController(t) logger := testlogger.New(t) diff --git a/service/matching/tasklist/task_test.go b/service/matching/tasklist/task_test.go new file mode 100644 index 00000000000..1b626807d35 --- /dev/null +++ b/service/matching/tasklist/task_test.go @@ -0,0 +1,163 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package tasklist + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common/partition" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" +) + +func TestNewInternalTask(t *testing.T) { + cases := []struct { + name string + partitionConfig map[string]string + source types.TaskSource + forwardedFrom string + forSyncMatch bool + isolationGroup string + expectedPartitionConfig map[string]string + additionalAssertions func(t *testing.T, task *InternalTask) + }{ + { + name: "sync match", + source: types.TaskSourceHistory, + forSyncMatch: true, + additionalAssertions: func(t *testing.T, task *InternalTask) { + // Only initialized for sync match + assert.NotNil(t, task.ResponseC) + assert.True(t, task.IsSyncMatch()) + }, + }, + { + name: "async match", + source: types.TaskSourceDbBacklog, + forSyncMatch: false, + additionalAssertions: func(t *testing.T, task *InternalTask) { + // Only initialized for sync match + assert.Nil(t, task.ResponseC) + assert.False(t, task.IsSyncMatch()) + }, + }, + { + name: "forwarded from history", + source: types.TaskSourceDbBacklog, + forSyncMatch: true, + forwardedFrom: "elsewhere", + additionalAssertions: func(t *testing.T, task *InternalTask) { + assert.True(t, task.IsForwarded()) + assert.True(t, task.IsSyncMatch()) + }, + }, + { + name: "forwarded from backlog", + source: types.TaskSourceDbBacklog, + forSyncMatch: true, + forwardedFrom: "elsewhere", + additionalAssertions: func(t *testing.T, task *InternalTask) { + assert.True(t, task.IsForwarded()) + // Still technically sync match, just on a different host + assert.True(t, task.IsSyncMatch()) + }, + }, + { + name: "tasklist isolation", + source: types.TaskSourceDbBacklog, + isolationGroup: "a", + partitionConfig: map[string]string{ + partition.IsolationGroupKey: "a", + partition.WorkflowIDKey: "workflowID", + }, + expectedPartitionConfig: map[string]string{ + partition.OriginalIsolationGroupKey: "a", + partition.IsolationGroupKey: "a", + partition.WorkflowIDKey: "workflowID", + }, + }, + { + name: "tasklist isolation - leaked", + source: types.TaskSourceDbBacklog, + isolationGroup: "", + partitionConfig: map[string]string{ + partition.IsolationGroupKey: "a", + partition.WorkflowIDKey: "workflowID", + }, + expectedPartitionConfig: map[string]string{ + partition.OriginalIsolationGroupKey: "a", + partition.IsolationGroupKey: "", + partition.WorkflowIDKey: "workflowID", + }, + }, + { + name: "tasklist isolation - forwarded", + source: types.TaskSourceDbBacklog, + isolationGroup: "", + partitionConfig: map[string]string{ + partition.OriginalIsolationGroupKey: "a", + partition.IsolationGroupKey: "", + partition.WorkflowIDKey: "workflowID", + }, + expectedPartitionConfig: map[string]string{ + partition.OriginalIsolationGroupKey: "a", + partition.IsolationGroupKey: "", + partition.WorkflowIDKey: "workflowID", + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + completionFunc := func(_ *persistence.TaskInfo, _ error) {} + taskInfo := defaultTaskInfo(tc.partitionConfig) + activityDispatchInfo := &types.ActivityTaskDispatchInfo{WorkflowDomain: "domain"} + task := newInternalTask(taskInfo, completionFunc, tc.source, tc.forwardedFrom, tc.forSyncMatch, activityDispatchInfo, tc.isolationGroup) + assert.Equal(t, defaultTaskInfo(tc.expectedPartitionConfig), task.Event.TaskInfo) + assert.NotNil(t, task.Event.completionFunc) + assert.Equal(t, tc.source, task.source) + assert.Equal(t, tc.forwardedFrom, task.forwardedFrom) + assert.Equal(t, tc.isolationGroup, task.isolationGroup) + assert.Equal(t, activityDispatchInfo, task.ActivityTaskDispatchInfo) + if tc.additionalAssertions != nil { + tc.additionalAssertions(t, task) + } + }) + } +} + +func defaultTaskInfo(partitionConfig map[string]string) *persistence.TaskInfo { + return &persistence.TaskInfo{ + DomainID: "DomainID", + WorkflowID: "WorkflowID", + RunID: "RunID", + TaskID: 1, + ScheduleID: 2, + ScheduleToStartTimeoutSeconds: 3, + Expiry: time.UnixMicro(4), + CreatedTime: time.UnixMicro(5), + PartitionConfig: partitionConfig, + } +}