Skip to content

Commit

Permalink
Move search attribute validator and mapper calls to frontend service (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Feb 9, 2022
1 parent cdb4fa0 commit 670d99c
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 33 deletions.
1 change: 1 addition & 0 deletions common/searchattribute/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp

fieldName := saName
if IsMappable(saName) && v.searchAttributesMapper != nil {
// Call mapper here because original saName is needed to generate a proper error message (it should contain an alias but not a field name).
fieldName, err = v.searchAttributesMapper.GetFieldName(saName, namespace)
if err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions host/child_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
tlParent := "integration-child-workflow-test-parent-taskqueue"
tlChild := "integration-child-workflow-test-child-taskqueue"
identity := "worker1"
saName := "CustomKeywordField"
// Uncomment this line to test with mapper.
// saName = "AliasForCustomKeywordField"

parentWorkflowType := &commonpb.WorkflowType{}
parentWorkflowType.Name = wtParent
Expand Down Expand Up @@ -101,7 +104,7 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
attrValPayload := payload.EncodeString("attrVal")
searchAttr := &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"CustomKeywordField": attrValPayload,
saName: attrValPayload,
},
}

Expand Down Expand Up @@ -217,8 +220,8 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
s.Equal(header, startedEvent.GetChildWorkflowExecutionStartedEventAttributes().Header)
s.Equal(header, childStartedEvent.GetWorkflowExecutionStartedEventAttributes().Header)
s.Equal(memo, childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetMemo())
s.Equal(searchAttr.GetIndexedFields()["CustomKeywordField"].GetData(), childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["CustomKeywordField"].GetData())
s.Equal([]byte("Keyword"), childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["CustomKeywordField"].GetMetadata()["type"])
s.Equal(searchAttr.GetIndexedFields()[saName].GetData(), childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()[saName].GetData())
s.Equal("Keyword", string(childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()[saName].GetMetadata()["type"]))
s.Equal(time.Duration(0), timestamp.DurationValue(childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetWorkflowExecutionTimeout()))
s.Equal(200*time.Second, timestamp.DurationValue(childStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetWorkflowRunTimeout()))

Expand Down
10 changes: 6 additions & 4 deletions host/continue_as_new_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
wt := "integration-continue-as-new-workflow-test-type"
tl := "integration-continue-as-new-workflow-test-taskqueue"
identity := "worker1"
saName := "CustomKeywordField"
// Uncomment this line to test with mapper.
// saName = "AliasForCustomKeywordField"

workflowType := &commonpb.WorkflowType{Name: wt}

Expand All @@ -64,10 +67,9 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
memo := &commonpb.Memo{
Fields: map[string]*commonpb.Payload{"memoKey": payload.EncodeString("memoVal")},
}
searchAttrPayload := payload.EncodeString("random keyword")
searchAttr := &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"CustomKeywordField": searchAttrPayload,
saName: payload.EncodeString("random"),
},
}

Expand Down Expand Up @@ -152,8 +154,8 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
s.Equal(previousRunID, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetContinuedExecutionRunId())
s.Equal(header, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().Header)
s.Equal(memo, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().Memo)
s.Equal(searchAttr.GetIndexedFields()["CustomKeywordField"].GetData(), lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["CustomKeywordField"].GetData())
s.Equal([]byte("Keyword"), lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["CustomKeywordField"].GetMetadata()["type"])
s.Equal(searchAttr.GetIndexedFields()[saName].GetData(), lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()[saName].GetData())
s.Equal("Keyword", string(lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()[saName].GetMetadata()["type"]))
}

func (s *integrationSuite) TestContinueAsNewRun_Timeout() {
Expand Down
4 changes: 4 additions & 0 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ func (c *temporalImpl) startFrontend(hosts map[string][]string, startWG *sync.Wa
fx.Provide(func() authorization.JWTAudienceMapper { return nil }),
fx.Provide(func() client.FactoryProvider { return client.NewFactoryProvider() }),
fx.Provide(func() searchattribute.Mapper { return nil }),
// Comment the line above and uncomment the line bellow to test with search attributes mapper.
// fx.Provide(func() searchattribute.Mapper { return NewSearchAttributeTestMapper() }),
fx.Provide(func() resolver.ServiceResolver { return resolver.NewNoopResolver() }),
fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }),
fx.Provide(func() dynamicconfig.Client { return newIntegrationConfigClient(dynamicconfig.NewNoopClient()) }),
Expand Down Expand Up @@ -516,6 +518,8 @@ func (c *temporalImpl) startHistory(
),
fx.Provide(func() client.FactoryProvider { return client.NewFactoryProvider() }),
fx.Provide(func() searchattribute.Mapper { return nil }),
// Comment the line above and uncomment the line bellow to test with search attributes mapper.
// fx.Provide(func() searchattribute.Mapper { return NewSearchAttributeTestMapper() }),
fx.Provide(func() resolver.ServiceResolver { return resolver.NewNoopResolver() }),
fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }),
fx.Provide(func() dynamicconfig.Client { return integrationClient }),
Expand Down
59 changes: 59 additions & 0 deletions host/test_search_attribute_mapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package host

import (
"fmt"
"strings"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/searchattribute"
)

type (
SearchAttributeTestMapper struct{}
)

func NewSearchAttributeTestMapper() *SearchAttributeTestMapper {
return &SearchAttributeTestMapper{}
}

func (t *SearchAttributeTestMapper) GetAlias(fieldName string, namespace string) (string, error) {
if _, err := searchattribute.TestNameTypeMap.GetType(fieldName); err == nil {
return "AliasFor" + fieldName, nil
}
return "", serviceerror.NewInvalidArgument(fmt.Sprintf("fieldname '%s' has no search-attribute defined for '%s' namespace", fieldName, namespace))
}

func (t *SearchAttributeTestMapper) GetFieldName(alias string, namespace string) (string, error) {
if strings.HasPrefix(alias, "AliasFor") {
fieldName := strings.TrimPrefix(alias, "AliasFor")
if _, err := searchattribute.TestNameTypeMap.GetType(fieldName); err == nil {
return fieldName, nil
}
}
return "", serviceerror.NewInvalidArgument(fmt.Sprintf("search-attribute '%s' not found for '%s' namespace", alias, namespace))
}
37 changes: 34 additions & 3 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type (
namespaceRegistry namespace.Registry
saMapper searchattribute.Mapper
saProvider searchattribute.Provider
saValidator *searchattribute.Validator
archivalMetadata archiver.ArchivalMetadata
}

Expand Down Expand Up @@ -167,7 +168,13 @@ func NewWorkflowHandler(
namespaceRegistry: namespaceRegistry,
saProvider: saProvider,
saMapper: saMapper,
archivalMetadata: archivalMetadata,
saValidator: searchattribute.NewValidator(
saProvider,
saMapper,
config.SearchAttributesNumberOfKeysLimit,
config.SearchAttributesSizeOfValueLimit,
config.SearchAttributesTotalSizeLimit),
archivalMetadata: archivalMetadata,
}

return handler
Expand Down Expand Up @@ -429,6 +436,11 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
}
wh.logger.Debug("Start workflow execution request namespaceID.", tag.WorkflowNamespaceID(namespaceID.String()))

err = wh.processIncomingSearchAttributes(request.GetSearchAttributes(), namespaceName)
if err != nil {
return nil, err
}

resp, err := wh.historyClient.StartWorkflowExecution(ctx, common.CreateHistoryStartWorkflowRequest(namespaceID.String(), request, nil, time.Now().UTC()))

if err != nil {
Expand Down Expand Up @@ -1913,6 +1925,11 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
return nil, err
}

err = wh.processIncomingSearchAttributes(request.GetSearchAttributes(), namespaceName)
if err != nil {
return nil, err
}

var runId string
op := func() error {
var err error
Expand Down Expand Up @@ -2964,7 +2981,7 @@ func (wh *WorkflowHandler) getHistory(
historyEvents = append(historyEvents, transientWorkflowTaskInfo.ScheduledEvent, transientWorkflowTaskInfo.StartedEvent)
}

if err := wh.processSearchAttributes(historyEvents, namespace); err != nil {
if err := wh.processOutgoingSearchAttributes(historyEvents, namespace); err != nil {
return nil, nil, err
}

Expand All @@ -2974,7 +2991,7 @@ func (wh *WorkflowHandler) getHistory(
return executionHistory, nextPageToken, nil
}

func (wh *WorkflowHandler) processSearchAttributes(events []*historypb.HistoryEvent, namespace namespace.Name) error {
func (wh *WorkflowHandler) processOutgoingSearchAttributes(events []*historypb.HistoryEvent, namespace namespace.Name) error {
saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
Expand Down Expand Up @@ -3003,6 +3020,20 @@ func (wh *WorkflowHandler) processSearchAttributes(events []*historypb.HistoryEv
return nil
}

func (wh *WorkflowHandler) processIncomingSearchAttributes(searchAttributes *commonpb.SearchAttributes, namespaceName namespace.Name) error {
// Validate search attributes before substitution because in case of error, error message should contain alias but not field name.
if err := wh.saValidator.Validate(searchAttributes, namespaceName.String(), wh.config.ESIndexName); err != nil {
return err
}
if err := wh.saValidator.ValidateSize(searchAttributes, namespaceName.String()); err != nil {
return err
}
if err := searchattribute.SubstituteAliases(wh.saMapper, searchAttributes, namespaceName.String()); err != nil {
return err
}
return nil
}

func (wh *WorkflowHandler) validateTransientWorkflowTaskEvents(
expectedNextEventID int64,
transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo,
Expand Down
16 changes: 0 additions & 16 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,6 @@ func (e *historyEngineImpl) StartWorkflowExecution(
return nil, err
}

err = searchattribute.SubstituteAliases(e.shard.GetSearchAttributesMapper(), request.GetSearchAttributes(), namespace.String())
if err != nil {
return nil, err
}

workflowID := request.GetWorkflowId()
// grab the current context as a Lock, nothing more
_, currentRelease, err := e.historyCache.GetOrCreateCurrentWorkflowExecution(
Expand Down Expand Up @@ -2033,11 +2028,6 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
return nil, err
}

err = searchattribute.SubstituteAliases(e.shard.GetSearchAttributesMapper(), request.GetSearchAttributes(), namespace.String())
if err != nil {
return nil, err
}

if err := common.CheckEventBlobSizeLimit(
sRequest.GetSignalInput().Size(),
e.config.BlobSizeLimitWarn(namespace.String()),
Expand Down Expand Up @@ -2759,12 +2749,6 @@ func (e *historyEngineImpl) validateStartWorkflowExecutionRequest(
if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil {
return err
}
if err := e.searchAttributesValidator.Validate(request.SearchAttributes, namespace.String(), e.config.DefaultVisibilityIndexName); err != nil {
return err
}
if err := e.searchAttributesValidator.ValidateSize(request.SearchAttributes, namespace.String()); err != nil {
return err
}

if err := common.CheckEventBlobSizeLimit(
request.GetInput().Size(),
Expand Down
87 changes: 80 additions & 7 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package history
import (
"context"
"errors"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -963,6 +962,74 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompletedRecordMarkerCommand() {
s.False(executionBuilder.HasPendingWorkflowTask())
}

func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWithSearchAttributes() {
we := commonpb.WorkflowExecution{
WorkflowId: "wId",
RunId: tests.RunID,
}
tl := "testTaskQueue"
taskToken := &tokenspb.Task{
ScheduleAttempt: 1,
WorkflowId: "wId",
RunId: we.GetRunId(),
ScheduleId: 2,
}
serializedTaskToken, _ := taskToken.Marshal()
identity := "testIdentity"

msBuilder := workflow.TestLocalMutableState(s.historyEngine.shard, s.mockEventsCache, tests.LocalNamespaceEntry,
log.NewTestLogger(), we.GetRunId())
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, nil, 100*time.Second, 50*time.Second, 200*time.Second, identity)
di := addWorkflowTaskScheduledEvent(msBuilder)
addWorkflowTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)

commands := []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: tests.Namespace.String(),
WorkflowId: tests.WorkflowID,
WorkflowType: &commonpb.WorkflowType{Name: "wType"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{
"AliasForCustomTextField": payload.EncodeString("search attribute value")},
},
}},
}}

ms := workflow.TestCloneToProto(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse, nil)
s.mockNamespaceCache.EXPECT().GetNamespace(tests.Namespace).Return(tests.LocalNamespaceEntry, nil).AnyTimes()

s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any()).DoAndReturn(func(request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) {
eventsToSave := request.UpdateWorkflowEvents[0].Events
s.Len(eventsToSave, 2)
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED, eventsToSave[0].GetEventType())
s.Equal(enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED, eventsToSave[1].GetEventType())
startChildEventAttributes := eventsToSave[1].GetStartChildWorkflowExecutionInitiatedEventAttributes()
// Search attribute name was mapped and saved under field name.
s.Equal(
payload.EncodeString("search attribute value"),
startChildEventAttributes.GetSearchAttributes().GetIndexedFields()["CustomTextField"])
return tests.UpdateWorkflowExecutionResponse, nil
})

s.mockShard.Resource.SearchAttributesMapper.EXPECT().
GetFieldName("AliasForCustomTextField", tests.Namespace.String()).Return("CustomTextField", nil).
Times(2) // One for validator, one for actual mapper

_, err := s.historyEngine.RespondWorkflowTaskCompleted(metrics.AddMetricsContext(context.Background()), &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tests.NamespaceID.String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Commands: commands,
Identity: identity,
},
})
s.Nil(err)
}

func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() {
namespaceID := tests.NamespaceID
workflowID := "workflowID"
Expand Down Expand Up @@ -999,11 +1066,17 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew_SearchAttributes() {
taskQueue := "testTaskQueue"
identity := "testIdentity"

s.mockExecutionMgr.EXPECT().CreateWorkflowExecution(gomock.Any()).Return(tests.CreateWorkflowExecutionResponse, nil)
s.mockShard.Resource.SearchAttributesMapper.EXPECT().GetFieldName(gomock.Any(), gomock.Any()).DoAndReturn(
func(alias string, namespace string) (string, error) {
return strings.TrimPrefix(alias, "AliasFor"), nil
}).Times(2)
s.mockExecutionMgr.EXPECT().CreateWorkflowExecution(gomock.Any()).DoAndReturn(func(request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) {
eventsToSave := request.NewWorkflowEvents[0].Events
s.Len(eventsToSave, 2)
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, eventsToSave[0].GetEventType())
startEventAttributes := eventsToSave[0].GetWorkflowExecutionStartedEventAttributes()
// Search attribute name was mapped and saved under field name.
s.Equal(
payload.EncodeString("test"),
startEventAttributes.GetSearchAttributes().GetIndexedFields()["CustomKeywordField"])
return tests.CreateWorkflowExecutionResponse, nil
})

requestID := uuid.New()
resp, err := s.historyEngine.StartWorkflowExecution(metrics.AddMetricsContext(context.Background()), &historyservice.StartWorkflowExecutionRequest{
Expand All @@ -1020,7 +1093,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew_SearchAttributes() {
Identity: identity,
RequestId: requestID,
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{
"AliasForCustomKeywordField": payload.EncodeString("test"),
"CustomKeywordField": payload.EncodeString("test"),
}}},
})
s.Nil(err)
Expand Down

0 comments on commit 670d99c

Please sign in to comment.