From 670d99c95ccaea0beb12fa8e2774888a901c0bc1 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 8 Feb 2022 19:49:02 -0800 Subject: [PATCH] Move search attribute validator and mapper calls to frontend service (#2476) --- common/searchattribute/validator.go | 1 + host/child_workflow_test.go | 9 ++- host/continue_as_new_test.go | 10 +-- host/onebox.go | 4 ++ host/test_search_attribute_mapper.go | 59 +++++++++++++++++ service/frontend/workflowHandler.go | 37 ++++++++++- service/history/historyEngine.go | 16 ----- service/history/historyEngine2_test.go | 87 +++++++++++++++++++++++--- 8 files changed, 190 insertions(+), 33 deletions(-) create mode 100644 host/test_search_attribute_mapper.go diff --git a/common/searchattribute/validator.go b/common/searchattribute/validator.go index 3670fc79a0f..d62ae8b64b0 100644 --- a/common/searchattribute/validator.go +++ b/common/searchattribute/validator.go @@ -91,6 +91,7 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp fieldName := saName if IsMappable(saName) && v.searchAttributesMapper != nil { + // Call mapper here because original saName is needed to generate a proper error message (it should contain an alias but not a field name). fieldName, err = v.searchAttributesMapper.GetFieldName(saName, namespace) if err != nil { return err diff --git a/host/child_workflow_test.go b/host/child_workflow_test.go index ac43c0079a8..0ceb4505f88 100644 --- a/host/child_workflow_test.go +++ b/host/child_workflow_test.go @@ -54,6 +54,9 @@ func (s *integrationSuite) TestChildWorkflowExecution() { tlParent := "integration-child-workflow-test-parent-taskqueue" tlChild := "integration-child-workflow-test-child-taskqueue" identity := "worker1" + saName := "CustomKeywordField" + // Uncomment this line to test with mapper. + // saName = "AliasForCustomKeywordField" parentWorkflowType := &commonpb.WorkflowType{} parentWorkflowType.Name = wtParent @@ -101,7 +104,7 @@ func (s *integrationSuite) TestChildWorkflowExecution() { attrValPayload := payload.EncodeString("attrVal") searchAttr := &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "CustomKeywordField": attrValPayload, + saName: attrValPayload, }, } @@ -217,8 +220,8 @@ func (s *integrationSuite) TestChildWorkflowExecution() { s.Equal(header, startedEvent.GetChildWorkflowExecutionStartedEventAttributes().Header) s.Equal(header, childStartedEvent.GetWorkflowExecutionStartedEventAttributes().Header) s.Equal(memo, childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetMemo()) - s.Equal(searchAttr.GetIndexedFields()["CustomKeywordField"].GetData(), childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["CustomKeywordField"].GetData()) - s.Equal([]byte("Keyword"), childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["CustomKeywordField"].GetMetadata()["type"]) + s.Equal(searchAttr.GetIndexedFields()[saName].GetData(), childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()[saName].GetData()) + s.Equal("Keyword", string(childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()[saName].GetMetadata()["type"])) s.Equal(time.Duration(0), timestamp.DurationValue(childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetWorkflowExecutionTimeout())) s.Equal(200*time.Second, timestamp.DurationValue(childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetWorkflowRunTimeout())) diff --git a/host/continue_as_new_test.go b/host/continue_as_new_test.go index b14a4e13d4a..6d04f01ea86 100644 --- a/host/continue_as_new_test.go +++ b/host/continue_as_new_test.go @@ -53,6 +53,9 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() { wt := "integration-continue-as-new-workflow-test-type" tl := "integration-continue-as-new-workflow-test-taskqueue" identity := "worker1" + saName := "CustomKeywordField" + // Uncomment this line to test with mapper. + // saName = "AliasForCustomKeywordField" workflowType := &commonpb.WorkflowType{Name: wt} @@ -64,10 +67,9 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() { memo := &commonpb.Memo{ Fields: map[string]*commonpb.Payload{"memoKey": payload.EncodeString("memoVal")}, } - searchAttrPayload := payload.EncodeString("random keyword") searchAttr := &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "CustomKeywordField": searchAttrPayload, + saName: payload.EncodeString("random"), }, } @@ -152,8 +154,8 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() { s.Equal(previousRunID, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetContinuedExecutionRunId()) s.Equal(header, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().Header) s.Equal(memo, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().Memo) - s.Equal(searchAttr.GetIndexedFields()["CustomKeywordField"].GetData(), lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["CustomKeywordField"].GetData()) - s.Equal([]byte("Keyword"), lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["CustomKeywordField"].GetMetadata()["type"]) + s.Equal(searchAttr.GetIndexedFields()[saName].GetData(), lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()[saName].GetData()) + s.Equal("Keyword", string(lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()[saName].GetMetadata()["type"])) } func (s *integrationSuite) TestContinueAsNewRun_Timeout() { diff --git a/host/onebox.go b/host/onebox.go index 6d3ddb633f3..5b1bdedfe3a 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -423,6 +423,8 @@ func (c *temporalImpl) startFrontend(hosts map[string][]string, startWG *sync.Wa fx.Provide(func() authorization.JWTAudienceMapper { return nil }), fx.Provide(func() client.FactoryProvider { return client.NewFactoryProvider() }), fx.Provide(func() searchattribute.Mapper { return nil }), + // Comment the line above and uncomment the line bellow to test with search attributes mapper. + // fx.Provide(func() searchattribute.Mapper { return NewSearchAttributeTestMapper() }), fx.Provide(func() resolver.ServiceResolver { return resolver.NewNoopResolver() }), fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }), fx.Provide(func() dynamicconfig.Client { return newIntegrationConfigClient(dynamicconfig.NewNoopClient()) }), @@ -516,6 +518,8 @@ func (c *temporalImpl) startHistory( ), fx.Provide(func() client.FactoryProvider { return client.NewFactoryProvider() }), fx.Provide(func() searchattribute.Mapper { return nil }), + // Comment the line above and uncomment the line bellow to test with search attributes mapper. + // fx.Provide(func() searchattribute.Mapper { return NewSearchAttributeTestMapper() }), fx.Provide(func() resolver.ServiceResolver { return resolver.NewNoopResolver() }), fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }), fx.Provide(func() dynamicconfig.Client { return integrationClient }), diff --git a/host/test_search_attribute_mapper.go b/host/test_search_attribute_mapper.go new file mode 100644 index 00000000000..cbc1fd96f32 --- /dev/null +++ b/host/test_search_attribute_mapper.go @@ -0,0 +1,59 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package host + +import ( + "fmt" + "strings" + + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/common/searchattribute" +) + +type ( + SearchAttributeTestMapper struct{} +) + +func NewSearchAttributeTestMapper() *SearchAttributeTestMapper { + return &SearchAttributeTestMapper{} +} + +func (t *SearchAttributeTestMapper) GetAlias(fieldName string, namespace string) (string, error) { + if _, err := searchattribute.TestNameTypeMap.GetType(fieldName); err == nil { + return "AliasFor" + fieldName, nil + } + return "", serviceerror.NewInvalidArgument(fmt.Sprintf("fieldname '%s' has no search-attribute defined for '%s' namespace", fieldName, namespace)) +} + +func (t *SearchAttributeTestMapper) GetFieldName(alias string, namespace string) (string, error) { + if strings.HasPrefix(alias, "AliasFor") { + fieldName := strings.TrimPrefix(alias, "AliasFor") + if _, err := searchattribute.TestNameTypeMap.GetType(fieldName); err == nil { + return fieldName, nil + } + } + return "", serviceerror.NewInvalidArgument(fmt.Sprintf("search-attribute '%s' not found for '%s' namespace", alias, namespace)) +} diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index ef99a140e5c..c39a3da13ad 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -107,6 +107,7 @@ type ( namespaceRegistry namespace.Registry saMapper searchattribute.Mapper saProvider searchattribute.Provider + saValidator *searchattribute.Validator archivalMetadata archiver.ArchivalMetadata } @@ -167,7 +168,13 @@ func NewWorkflowHandler( namespaceRegistry: namespaceRegistry, saProvider: saProvider, saMapper: saMapper, - archivalMetadata: archivalMetadata, + saValidator: searchattribute.NewValidator( + saProvider, + saMapper, + config.SearchAttributesNumberOfKeysLimit, + config.SearchAttributesSizeOfValueLimit, + config.SearchAttributesTotalSizeLimit), + archivalMetadata: archivalMetadata, } return handler @@ -429,6 +436,11 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request * } wh.logger.Debug("Start workflow execution request namespaceID.", tag.WorkflowNamespaceID(namespaceID.String())) + err = wh.processIncomingSearchAttributes(request.GetSearchAttributes(), namespaceName) + if err != nil { + return nil, err + } + resp, err := wh.historyClient.StartWorkflowExecution(ctx, common.CreateHistoryStartWorkflowRequest(namespaceID.String(), request, nil, time.Now().UTC())) if err != nil { @@ -1913,6 +1925,11 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, return nil, err } + err = wh.processIncomingSearchAttributes(request.GetSearchAttributes(), namespaceName) + if err != nil { + return nil, err + } + var runId string op := func() error { var err error @@ -2964,7 +2981,7 @@ func (wh *WorkflowHandler) getHistory( historyEvents = append(historyEvents, transientWorkflowTaskInfo.ScheduledEvent, transientWorkflowTaskInfo.StartedEvent) } - if err := wh.processSearchAttributes(historyEvents, namespace); err != nil { + if err := wh.processOutgoingSearchAttributes(historyEvents, namespace); err != nil { return nil, nil, err } @@ -2974,7 +2991,7 @@ func (wh *WorkflowHandler) getHistory( return executionHistory, nextPageToken, nil } -func (wh *WorkflowHandler) processSearchAttributes(events []*historypb.HistoryEvent, namespace namespace.Name) error { +func (wh *WorkflowHandler) processOutgoingSearchAttributes(events []*historypb.HistoryEvent, namespace namespace.Name) error { saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) @@ -3003,6 +3020,20 @@ func (wh *WorkflowHandler) processSearchAttributes(events []*historypb.HistoryEv return nil } +func (wh *WorkflowHandler) processIncomingSearchAttributes(searchAttributes *commonpb.SearchAttributes, namespaceName namespace.Name) error { + // Validate search attributes before substitution because in case of error, error message should contain alias but not field name. + if err := wh.saValidator.Validate(searchAttributes, namespaceName.String(), wh.config.ESIndexName); err != nil { + return err + } + if err := wh.saValidator.ValidateSize(searchAttributes, namespaceName.String()); err != nil { + return err + } + if err := searchattribute.SubstituteAliases(wh.saMapper, searchAttributes, namespaceName.String()); err != nil { + return err + } + return nil +} + func (wh *WorkflowHandler) validateTransientWorkflowTaskEvents( expectedNextEventID int64, transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo, diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 2e423db5b25..39dedcd2c07 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -520,11 +520,6 @@ func (e *historyEngineImpl) StartWorkflowExecution( return nil, err } - err = searchattribute.SubstituteAliases(e.shard.GetSearchAttributesMapper(), request.GetSearchAttributes(), namespace.String()) - if err != nil { - return nil, err - } - workflowID := request.GetWorkflowId() // grab the current context as a Lock, nothing more _, currentRelease, err := e.historyCache.GetOrCreateCurrentWorkflowExecution( @@ -2033,11 +2028,6 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( return nil, err } - err = searchattribute.SubstituteAliases(e.shard.GetSearchAttributesMapper(), request.GetSearchAttributes(), namespace.String()) - if err != nil { - return nil, err - } - if err := common.CheckEventBlobSizeLimit( sRequest.GetSignalInput().Size(), e.config.BlobSizeLimitWarn(namespace.String()), @@ -2759,12 +2749,6 @@ func (e *historyEngineImpl) validateStartWorkflowExecutionRequest( if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil { return err } - if err := e.searchAttributesValidator.Validate(request.SearchAttributes, namespace.String(), e.config.DefaultVisibilityIndexName); err != nil { - return err - } - if err := e.searchAttributesValidator.ValidateSize(request.SearchAttributes, namespace.String()); err != nil { - return err - } if err := common.CheckEventBlobSizeLimit( request.GetInput().Size(), diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 4d69d4b98eb..a1769e4f3a0 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -27,7 +27,6 @@ package history import ( "context" "errors" - "strings" "testing" "time" @@ -963,6 +962,74 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompletedRecordMarkerCommand() { s.False(executionBuilder.HasPendingWorkflowTask()) } +func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWithSearchAttributes() { + we := commonpb.WorkflowExecution{ + WorkflowId: "wId", + RunId: tests.RunID, + } + tl := "testTaskQueue" + taskToken := &tokenspb.Task{ + ScheduleAttempt: 1, + WorkflowId: "wId", + RunId: we.GetRunId(), + ScheduleId: 2, + } + serializedTaskToken, _ := taskToken.Marshal() + identity := "testIdentity" + + msBuilder := workflow.TestLocalMutableState(s.historyEngine.shard, s.mockEventsCache, tests.LocalNamespaceEntry, + log.NewTestLogger(), we.GetRunId()) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, nil, 100*time.Second, 50*time.Second, 200*time.Second, identity) + di := addWorkflowTaskScheduledEvent(msBuilder) + addWorkflowTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) + + commands := []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{ + Namespace: tests.Namespace.String(), + WorkflowId: tests.WorkflowID, + WorkflowType: &commonpb.WorkflowType{Name: "wType"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: tl}, + SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{ + "AliasForCustomTextField": payload.EncodeString("search attribute value")}, + }, + }}, + }} + + ms := workflow.TestCloneToProto(msBuilder) + gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms} + + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse, nil) + s.mockNamespaceCache.EXPECT().GetNamespace(tests.Namespace).Return(tests.LocalNamespaceEntry, nil).AnyTimes() + + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any()).DoAndReturn(func(request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) { + eventsToSave := request.UpdateWorkflowEvents[0].Events + s.Len(eventsToSave, 2) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED, eventsToSave[0].GetEventType()) + s.Equal(enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED, eventsToSave[1].GetEventType()) + startChildEventAttributes := eventsToSave[1].GetStartChildWorkflowExecutionInitiatedEventAttributes() + // Search attribute name was mapped and saved under field name. + s.Equal( + payload.EncodeString("search attribute value"), + startChildEventAttributes.GetSearchAttributes().GetIndexedFields()["CustomTextField"]) + return tests.UpdateWorkflowExecutionResponse, nil + }) + + s.mockShard.Resource.SearchAttributesMapper.EXPECT(). + GetFieldName("AliasForCustomTextField", tests.Namespace.String()).Return("CustomTextField", nil). + Times(2) // One for validator, one for actual mapper + + _, err := s.historyEngine.RespondWorkflowTaskCompleted(metrics.AddMetricsContext(context.Background()), &historyservice.RespondWorkflowTaskCompletedRequest{ + NamespaceId: tests.NamespaceID.String(), + CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + TaskToken: serializedTaskToken, + Commands: commands, + Identity: identity, + }, + }) + s.Nil(err) +} + func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() { namespaceID := tests.NamespaceID workflowID := "workflowID" @@ -999,11 +1066,17 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew_SearchAttributes() { taskQueue := "testTaskQueue" identity := "testIdentity" - s.mockExecutionMgr.EXPECT().CreateWorkflowExecution(gomock.Any()).Return(tests.CreateWorkflowExecutionResponse, nil) - s.mockShard.Resource.SearchAttributesMapper.EXPECT().GetFieldName(gomock.Any(), gomock.Any()).DoAndReturn( - func(alias string, namespace string) (string, error) { - return strings.TrimPrefix(alias, "AliasFor"), nil - }).Times(2) + s.mockExecutionMgr.EXPECT().CreateWorkflowExecution(gomock.Any()).DoAndReturn(func(request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) { + eventsToSave := request.NewWorkflowEvents[0].Events + s.Len(eventsToSave, 2) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, eventsToSave[0].GetEventType()) + startEventAttributes := eventsToSave[0].GetWorkflowExecutionStartedEventAttributes() + // Search attribute name was mapped and saved under field name. + s.Equal( + payload.EncodeString("test"), + startEventAttributes.GetSearchAttributes().GetIndexedFields()["CustomKeywordField"]) + return tests.CreateWorkflowExecutionResponse, nil + }) requestID := uuid.New() resp, err := s.historyEngine.StartWorkflowExecution(metrics.AddMetricsContext(context.Background()), &historyservice.StartWorkflowExecutionRequest{ @@ -1020,7 +1093,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew_SearchAttributes() { Identity: identity, RequestId: requestID, SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{ - "AliasForCustomKeywordField": payload.EncodeString("test"), + "CustomKeywordField": payload.EncodeString("test"), }}}, }) s.Nil(err)