Skip to content

Commit

Permalink
Clone request if search attributes are mapped (#3299)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Sep 7, 2022
1 parent a91898d commit 3c2ddea
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -953,10 +953,14 @@ func (s *visibilityStore) parseESDoc(hit *elastic.SearchHit, saTypeMap searchatt
s.metricsClient.IncCounter(metrics.ElasticsearchVisibility, metrics.ElasticsearchDocumentParseFailuresCount)
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to encode custom search attributes of Elasticsearch document(%s): %v", hit.Id, err))
}
err = searchattribute.ApplyAliases(s.searchAttributesMapper, record.SearchAttributes, namespace.String())
aliasedSas, err := searchattribute.AliasFields(s.searchAttributesMapper, record.SearchAttributes, namespace.String())
if err != nil {
return nil, err
}

if aliasedSas != nil {
record.SearchAttributes = aliasedSas
}
}

if memoEncoding != "" {
Expand Down
43 changes: 30 additions & 13 deletions common/searchattribute/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ type (
}
)

// ApplyAliases replaces field names with alias names for custom search attributes.
func ApplyAliases(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) error {
// AliasFields returns SearchAttributes struct where each search attribute name is replaced with alias.
// If no replacement where made, it returns nil which means that original SearchAttributes struct should be used.
func AliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) (*commonpb.SearchAttributes, error) {
if len(searchAttributes.GetIndexedFields()) == 0 || mapper == nil {
return nil
return nil, nil
}

newIndexedFields := make(map[string]*commonpb.Payload, len(searchAttributes.GetIndexedFields()))
mapped := false
for saName, saPayload := range searchAttributes.GetIndexedFields() {
if !IsMappable(saName) {
newIndexedFields[saName] = saPayload
Expand All @@ -58,26 +60,34 @@ func ApplyAliases(mapper Mapper, searchAttributes *commonpb.SearchAttributes, na
if err != nil {
if _, isInvalidArgument := err.(*serviceerror.InvalidArgument); isInvalidArgument {
// Silently ignore serviceerror.InvalidArgument because it indicates unmapped field (alias was deleted, for example).
// IMPORTANT: ApplyAliases should never return serviceerror.InvalidArgument because it is used by Poll API and the error
// IMPORTANT: AliasFields should never return serviceerror.InvalidArgument because it is used by Poll API and the error
// goes through up to SDK, which shutdowns worker when it receives serviceerror.InvalidArgument as poll response.
continue
}
return err
return nil, err
}
if aliasName != saName {
mapped = true
}
newIndexedFields[aliasName] = saPayload
}

searchAttributes.IndexedFields = newIndexedFields
return nil
// If no field name was mapped, return nil to save on clone operation on caller side.
if !mapped {
return nil, nil
}
return &commonpb.SearchAttributes{IndexedFields: newIndexedFields}, nil
}

// SubstituteAliases replaces aliases with actual field names for custom search attributes.
func SubstituteAliases(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) error {
// UnaliasFields returns SearchAttributes struct where each search attribute alias is replaced with field name.
// If no replacement where made, it returns nil which means that original SearchAttributes struct should be used.
func UnaliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) (*commonpb.SearchAttributes, error) {
if len(searchAttributes.GetIndexedFields()) == 0 || mapper == nil {
return nil
return nil, nil
}

newIndexedFields := make(map[string]*commonpb.Payload, len(searchAttributes.GetIndexedFields()))
mapped := false
for saName, saPayload := range searchAttributes.GetIndexedFields() {
if !IsMappable(saName) {
newIndexedFields[saName] = saPayload
Expand All @@ -86,11 +96,18 @@ func SubstituteAliases(mapper Mapper, searchAttributes *commonpb.SearchAttribute

fieldName, err := mapper.GetFieldName(saName, namespace)
if err != nil {
return err
return nil, err
}
if fieldName != saName {
mapped = true
}
newIndexedFields[fieldName] = saPayload
}

searchAttributes.IndexedFields = newIndexedFields
return nil
// If no alias was mapped, return nil to save on clone operation on caller side.
if !mapped {
return nil, nil
}

return &commonpb.SearchAttributes{IndexedFields: newIndexedFields}, nil
}
61 changes: 47 additions & 14 deletions common/searchattribute/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (t *TestMapper) GetAlias(fieldName string, namespace string) (string, error
if namespace == "error-namespace" {
return "", serviceerror.NewInternal("mapper error")
} else if namespace == "test-namespace" {
if fieldName == "pass-through" {
return fieldName, nil
}

return "alias_of_" + fieldName, nil
}

Expand All @@ -61,19 +65,22 @@ func (t *TestMapper) GetFieldName(alias string, namespace string) (string, error
if namespace == "error-namespace" {
return "", serviceerror.NewInternal("mapper error")
} else if namespace == "test-namespace" {
if alias == "pass-through" {
return alias, nil
}
return strings.TrimPrefix(alias, "alias_of_"), nil
}
return "", serviceerror.NewInvalidArgument("unknown namespace")
}

func Test_ApplyAliases(t *testing.T) {
func Test_AliasFields(t *testing.T) {
sa := &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"field1": {Data: []byte("data1")},
"wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored.
},
}
err := ApplyAliases(&TestMapper{}, sa, "error-namespace")
_, err := AliasFields(&TestMapper{}, sa, "error-namespace")
assert.Error(t, err)
var internalErr *serviceerror.Internal
assert.ErrorAs(t, err, &internalErr)
Expand All @@ -84,9 +91,9 @@ func Test_ApplyAliases(t *testing.T) {
"wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored.
},
}
err = ApplyAliases(&TestMapper{}, sa, "unknown-namespace")
sa, err = AliasFields(&TestMapper{}, sa, "unknown-namespace")
assert.NoError(t, err)
assert.Len(t, sa.GetIndexedFields(), 0)
assert.Nil(t, sa)

sa = &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
Expand All @@ -95,8 +102,9 @@ func Test_ApplyAliases(t *testing.T) {
"wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored.
},
}
err = ApplyAliases(&TestMapper{}, sa, "test-namespace")
sa, err = AliasFields(&TestMapper{}, sa, "test-namespace")
assert.NoError(t, err)
assert.NotNil(t, sa)
assert.Len(t, sa.GetIndexedFields(), 2)
assert.EqualValues(t, "data1", sa.GetIndexedFields()["alias_of_field1"].GetData())
assert.EqualValues(t, "data2", sa.GetIndexedFields()["alias_of_field2"].GetData())
Expand All @@ -105,19 +113,31 @@ func Test_ApplyAliases(t *testing.T) {
sa = &commonpb.SearchAttributes{
IndexedFields: nil,
}
err = ApplyAliases(&TestMapper{}, sa, "error-namespace")
sa, err = AliasFields(&TestMapper{}, sa, "error-namespace")
assert.NoError(t, err)
assert.Nil(t, sa)
sa, err = AliasFields(&TestMapper{}, sa, "unknown-namespace")
assert.NoError(t, err)
err = ApplyAliases(&TestMapper{}, sa, "unknown-namespace")
assert.Nil(t, sa)

// Pass through search attributes are not mapped.
sa = &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"pass-through": {Data: []byte("data1")},
},
}
sa, err = AliasFields(&TestMapper{}, sa, "test-namespace")
assert.NoError(t, err)
assert.Nil(t, sa)
}

func Test_SubstituteAliases(t *testing.T) {
func Test_UnaliasFields(t *testing.T) {
sa := &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"alias_of_field1": {Data: []byte("data1")},
},
}
err := SubstituteAliases(&TestMapper{}, sa, "error-namespace")
_, err := UnaliasFields(&TestMapper{}, sa, "error-namespace")
assert.Error(t, err)
var internalErr *serviceerror.Internal
assert.ErrorAs(t, err, &internalErr)
Expand All @@ -128,7 +148,7 @@ func Test_SubstituteAliases(t *testing.T) {
"alias_of_field2": {Data: []byte("data2")},
},
}
err = SubstituteAliases(&TestMapper{}, sa, "unknown-namespace")
_, err = UnaliasFields(&TestMapper{}, sa, "unknown-namespace")
assert.Error(t, err)
var invalidArgumentErr *serviceerror.InvalidArgument
assert.ErrorAs(t, err, &invalidArgumentErr)
Expand All @@ -139,8 +159,9 @@ func Test_SubstituteAliases(t *testing.T) {
"alias_of_field2": {Data: []byte("data2")},
},
}
err = SubstituteAliases(&TestMapper{}, sa, "test-namespace")
sa, err = UnaliasFields(&TestMapper{}, sa, "test-namespace")
assert.NoError(t, err)
assert.NotNil(t, sa)
assert.Len(t, sa.GetIndexedFields(), 2)
assert.EqualValues(t, "data1", sa.GetIndexedFields()["field1"].GetData())
assert.EqualValues(t, "data2", sa.GetIndexedFields()["field2"].GetData())
Expand All @@ -152,16 +173,28 @@ func Test_SubstituteAliases(t *testing.T) {
"wrong_alias": {Data: []byte("data3")},
},
}
err = SubstituteAliases(&TestMapper{}, sa, "test-namespace")
_, err = UnaliasFields(&TestMapper{}, sa, "test-namespace")
assert.Error(t, err)
assert.ErrorAs(t, err, &invalidArgumentErr)

// Empty search attributes are not validated with mapper.
sa = &commonpb.SearchAttributes{
IndexedFields: nil,
}
err = SubstituteAliases(&TestMapper{}, sa, "error-namespace")
sa, err = UnaliasFields(&TestMapper{}, sa, "error-namespace")
assert.NoError(t, err)
err = SubstituteAliases(&TestMapper{}, sa, "unknown-namespace")
assert.Nil(t, sa)
sa, err = UnaliasFields(&TestMapper{}, sa, "unknown-namespace")
assert.NoError(t, err)
assert.Nil(t, sa)

// Pass through aliases are not substituted.
sa = &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"pass-through": {Data: []byte("data1")},
},
}
sa, err = UnaliasFields(&TestMapper{}, sa, "test-namespace")
assert.NoError(t, err)
assert.Nil(t, sa)
}
Loading

0 comments on commit 3c2ddea

Please sign in to comment.