Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Sep 6, 2022
1 parent 2fc5d02 commit 35a3ef1
Showing 1 changed file with 55 additions and 30 deletions.
85 changes: 55 additions & 30 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2908,12 +2908,12 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
return nil, err
}

request, err = wh.mapCreateScheduleRequestSearchAttributes(request, namespaceName)
if err != nil {
if err = wh.validateStartWorkflowArgsForSchedule(namespaceName, request.GetSchedule().GetAction().GetStartWorkflow()); err != nil {
return nil, err
}

if err = wh.validateStartWorkflowArgsForSchedule(namespaceName, request.GetSchedule().GetAction().GetStartWorkflow()); err != nil {
request, err = wh.mapCreateScheduleRequestSearchAttributes(request, namespaceName)
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -3005,15 +3005,8 @@ func (wh *WorkflowHandler) validateStartWorkflowArgsForSchedule(
return errIDReusePolicyNotAllowed
}

err = wh.validateSearchAttributes(startWorkflow.GetSearchAttributes(), namespaceName)
if err != nil {
return nil, err
}

// map search attributes to aliases here, since we don't go through the frontend when starting later
request, err = wh.substituteSearchAttributesAliasesInCreateScheduleRequest(request, namespaceName)
if err != nil {
return nil, err
if err := wh.validateSearchAttributes(startWorkflow.GetSearchAttributes(), namespaceName); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -3110,9 +3103,13 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
}
searchattribute.ApplyTypeMap(sa, saTypeMap)
if err = searchattribute.ApplyAliases(wh.saMapper, sa, request.Namespace); err != nil {
mappedSearchAttributes, err := searchattribute.ApplyAliases(wh.saMapper, sa, request.Namespace)
if err != nil {
return err
}
if mappedSearchAttributes != nil {
response.Schedule.Action.GetStartWorkflow().SearchAttributes = mappedSearchAttributes
}
}

// for all running workflows started by the schedule, we should check that they're
Expand Down Expand Up @@ -3230,6 +3227,11 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow
return nil, err
}

request, err = wh.mapUpdateScheduleRequestStartWorkflowSearchAttributes(request, namespaceName)
if err != nil {
return nil, err
}

input := &schedspb.FullUpdateRequest{
Schedule: request.Schedule,
}
Expand Down Expand Up @@ -4753,36 +4755,59 @@ func (wh *WorkflowHandler) mapCreateScheduleRequestSearchAttributes(request *wor
if err != nil {
return nil, err
}
if mappedSearchAttributes == nil {

startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow()
mappedStartWorkflowSearchAttributes, err := searchattribute.SubstituteAliases(wh.saMapper, startWorkflow.GetSearchAttributes(), namespaceName.String())
if err != nil {
return nil, err
}

if mappedSearchAttributes == nil && mappedStartWorkflowSearchAttributes == nil {
return request, nil
}

// Shallow copy request and replace SearchAttributes fields only.
newRequest := *request
newRequest.SearchAttributes = mappedSearchAttributes
return &newRequest, nil
}

func (wh *WorkflowHandler) mapCreateScheduleRequestStartWorkflowSearchAttributes(request *workflowservice.CreateScheduleRequest, namespaceName namespace.Name) (*workflowservice.CreateScheduleRequest, error) {
if startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow(); startWorkflow != nil {
mappedSearchAttributes, err := searchattribute.SubstituteAliases(wh.saMapper, startWorkflow.GetSearchAttributes(), namespaceName.String())
if err != nil {
return nil, err
}
if mappedSearchAttributes == nil {
return request, nil
}
if mappedSearchAttributes != nil {
newRequest.SearchAttributes = mappedSearchAttributes
}

if mappedStartWorkflowSearchAttributes != nil && startWorkflow != nil {
newStartWorkflow := *startWorkflow
newStartWorkflow.SearchAttributes = mappedSearchAttributes
newStartWorkflow.SearchAttributes = mappedStartWorkflowSearchAttributes
newSchedule := *request.GetSchedule()
newSchedule.Action = &schedpb.ScheduleAction{
Action: &schedpb.ScheduleAction_StartWorkflow{
StartWorkflow: &newStartWorkflow,
}}
newRequest := *request
newRequest.Schedule = &newSchedule
return &newRequest, nil
}

return request, nil
return &newRequest, nil
}

func (wh *WorkflowHandler) mapUpdateScheduleRequestStartWorkflowSearchAttributes(request *workflowservice.UpdateScheduleRequest, namespaceName namespace.Name) (*workflowservice.UpdateScheduleRequest, error) {
startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow()
if startWorkflow == nil {
return request, nil
}

mappedSearchAttributes, err := searchattribute.SubstituteAliases(wh.saMapper, startWorkflow.GetSearchAttributes(), namespaceName.String())
if err != nil {
return nil, err
}
if mappedSearchAttributes == nil {
return request, nil
}
newStartWorkflow := *startWorkflow
newStartWorkflow.SearchAttributes = mappedSearchAttributes
newSchedule := *request.GetSchedule()
newSchedule.Action = &schedpb.ScheduleAction{
Action: &schedpb.ScheduleAction_StartWorkflow{
StartWorkflow: &newStartWorkflow,
}}
newRequest := *request
newRequest.Schedule = &newSchedule
return &newRequest, nil
}

0 comments on commit 35a3ef1

Please sign in to comment.