Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clone request if search attributes are mapped #3299

Merged
merged 7 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -953,10 +953,14 @@ func (s *visibilityStore) parseESDoc(hit *elastic.SearchHit, saTypeMap searchatt
s.metricsClient.IncCounter(metrics.ElasticsearchVisibility, metrics.ElasticsearchDocumentParseFailuresCount)
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to encode custom search attributes of Elasticsearch document(%s): %v", hit.Id, err))
}
err = searchattribute.ApplyAliases(s.searchAttributesMapper, record.SearchAttributes, namespace.String())
aliasedSas, err := searchattribute.AliasFields(s.searchAttributesMapper, record.SearchAttributes, namespace.String())
if err != nil {
return nil, err
}

if aliasedSas != nil {
record.SearchAttributes = aliasedSas
}
}

if memoEncoding != "" {
Expand Down
43 changes: 30 additions & 13 deletions common/searchattribute/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ type (
}
)

// ApplyAliases replaces field names with alias names for custom search attributes.
func ApplyAliases(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) error {
// AliasFields returns SearchAttributes struct where each search attribute name is replaced with alias.
// If no replacement where made, it returns nil which means that original SearchAttributes struct should be used.
func AliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) (*commonpb.SearchAttributes, error) {
if len(searchAttributes.GetIndexedFields()) == 0 || mapper == nil {
return nil
return nil, nil
}

newIndexedFields := make(map[string]*commonpb.Payload, len(searchAttributes.GetIndexedFields()))
mapped := false
for saName, saPayload := range searchAttributes.GetIndexedFields() {
if !IsMappable(saName) {
newIndexedFields[saName] = saPayload
Expand All @@ -58,26 +60,34 @@ func ApplyAliases(mapper Mapper, searchAttributes *commonpb.SearchAttributes, na
if err != nil {
if _, isInvalidArgument := err.(*serviceerror.InvalidArgument); isInvalidArgument {
// Silently ignore serviceerror.InvalidArgument because it indicates unmapped field (alias was deleted, for example).
// IMPORTANT: ApplyAliases should never return serviceerror.InvalidArgument because it is used by Poll API and the error
// IMPORTANT: AliasFields should never return serviceerror.InvalidArgument because it is used by Poll API and the error
// goes through up to SDK, which shutdowns worker when it receives serviceerror.InvalidArgument as poll response.
continue
}
return err
return nil, err
}
if aliasName != saName {
mapped = true
}
newIndexedFields[aliasName] = saPayload
}

searchAttributes.IndexedFields = newIndexedFields
return nil
// If no field name was mapped, return nil to save on clone operation on caller side.
if !mapped {
return nil, nil
}
return &commonpb.SearchAttributes{IndexedFields: newIndexedFields}, nil
}

// SubstituteAliases replaces aliases with actual field names for custom search attributes.
func SubstituteAliases(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) error {
// UnaliasFields returns SearchAttributes struct where each search attribute alias is replaced with field name.
// If no replacement where made, it returns nil which means that original SearchAttributes struct should be used.
func UnaliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) (*commonpb.SearchAttributes, error) {
if len(searchAttributes.GetIndexedFields()) == 0 || mapper == nil {
return nil
return nil, nil
}

newIndexedFields := make(map[string]*commonpb.Payload, len(searchAttributes.GetIndexedFields()))
mapped := false
for saName, saPayload := range searchAttributes.GetIndexedFields() {
if !IsMappable(saName) {
newIndexedFields[saName] = saPayload
Expand All @@ -86,11 +96,18 @@ func SubstituteAliases(mapper Mapper, searchAttributes *commonpb.SearchAttribute

fieldName, err := mapper.GetFieldName(saName, namespace)
if err != nil {
return err
return nil, err
}
if fieldName != saName {
mapped = true
}
newIndexedFields[fieldName] = saPayload
}

searchAttributes.IndexedFields = newIndexedFields
return nil
// If no alias was mapped, return nil to save on clone operation on caller side.
if !mapped {
return nil, nil
}

return &commonpb.SearchAttributes{IndexedFields: newIndexedFields}, nil
}
61 changes: 47 additions & 14 deletions common/searchattribute/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (t *TestMapper) GetAlias(fieldName string, namespace string) (string, error
if namespace == "error-namespace" {
return "", serviceerror.NewInternal("mapper error")
} else if namespace == "test-namespace" {
if fieldName == "pass-through" {
return fieldName, nil
}

return "alias_of_" + fieldName, nil
}

Expand All @@ -61,19 +65,22 @@ func (t *TestMapper) GetFieldName(alias string, namespace string) (string, error
if namespace == "error-namespace" {
return "", serviceerror.NewInternal("mapper error")
} else if namespace == "test-namespace" {
if alias == "pass-through" {
return alias, nil
}
return strings.TrimPrefix(alias, "alias_of_"), nil
}
return "", serviceerror.NewInvalidArgument("unknown namespace")
}

func Test_ApplyAliases(t *testing.T) {
func Test_AliasFields(t *testing.T) {
sa := &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"field1": {Data: []byte("data1")},
"wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored.
},
}
err := ApplyAliases(&TestMapper{}, sa, "error-namespace")
_, err := AliasFields(&TestMapper{}, sa, "error-namespace")
assert.Error(t, err)
var internalErr *serviceerror.Internal
assert.ErrorAs(t, err, &internalErr)
Expand All @@ -84,9 +91,9 @@ func Test_ApplyAliases(t *testing.T) {
"wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored.
},
}
err = ApplyAliases(&TestMapper{}, sa, "unknown-namespace")
sa, err = AliasFields(&TestMapper{}, sa, "unknown-namespace")
assert.NoError(t, err)
assert.Len(t, sa.GetIndexedFields(), 0)
assert.Nil(t, sa)

sa = &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
Expand All @@ -95,8 +102,9 @@ func Test_ApplyAliases(t *testing.T) {
"wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored.
},
}
err = ApplyAliases(&TestMapper{}, sa, "test-namespace")
sa, err = AliasFields(&TestMapper{}, sa, "test-namespace")
assert.NoError(t, err)
assert.NotNil(t, sa)
assert.Len(t, sa.GetIndexedFields(), 2)
assert.EqualValues(t, "data1", sa.GetIndexedFields()["alias_of_field1"].GetData())
assert.EqualValues(t, "data2", sa.GetIndexedFields()["alias_of_field2"].GetData())
Expand All @@ -105,19 +113,31 @@ func Test_ApplyAliases(t *testing.T) {
sa = &commonpb.SearchAttributes{
IndexedFields: nil,
}
err = ApplyAliases(&TestMapper{}, sa, "error-namespace")
sa, err = AliasFields(&TestMapper{}, sa, "error-namespace")
assert.NoError(t, err)
assert.Nil(t, sa)
sa, err = AliasFields(&TestMapper{}, sa, "unknown-namespace")
assert.NoError(t, err)
err = ApplyAliases(&TestMapper{}, sa, "unknown-namespace")
assert.Nil(t, sa)

// Pass through search attributes are not mapped.
sa = &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"pass-through": {Data: []byte("data1")},
},
}
sa, err = AliasFields(&TestMapper{}, sa, "test-namespace")
assert.NoError(t, err)
assert.Nil(t, sa)
}

func Test_SubstituteAliases(t *testing.T) {
func Test_UnaliasFields(t *testing.T) {
sa := &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"alias_of_field1": {Data: []byte("data1")},
},
}
err := SubstituteAliases(&TestMapper{}, sa, "error-namespace")
_, err := UnaliasFields(&TestMapper{}, sa, "error-namespace")
assert.Error(t, err)
var internalErr *serviceerror.Internal
assert.ErrorAs(t, err, &internalErr)
Expand All @@ -128,7 +148,7 @@ func Test_SubstituteAliases(t *testing.T) {
"alias_of_field2": {Data: []byte("data2")},
},
}
err = SubstituteAliases(&TestMapper{}, sa, "unknown-namespace")
_, err = UnaliasFields(&TestMapper{}, sa, "unknown-namespace")
assert.Error(t, err)
var invalidArgumentErr *serviceerror.InvalidArgument
assert.ErrorAs(t, err, &invalidArgumentErr)
Expand All @@ -139,8 +159,9 @@ func Test_SubstituteAliases(t *testing.T) {
"alias_of_field2": {Data: []byte("data2")},
},
}
err = SubstituteAliases(&TestMapper{}, sa, "test-namespace")
sa, err = UnaliasFields(&TestMapper{}, sa, "test-namespace")
assert.NoError(t, err)
assert.NotNil(t, sa)
assert.Len(t, sa.GetIndexedFields(), 2)
assert.EqualValues(t, "data1", sa.GetIndexedFields()["field1"].GetData())
assert.EqualValues(t, "data2", sa.GetIndexedFields()["field2"].GetData())
Expand All @@ -152,16 +173,28 @@ func Test_SubstituteAliases(t *testing.T) {
"wrong_alias": {Data: []byte("data3")},
},
}
err = SubstituteAliases(&TestMapper{}, sa, "test-namespace")
_, err = UnaliasFields(&TestMapper{}, sa, "test-namespace")
assert.Error(t, err)
assert.ErrorAs(t, err, &invalidArgumentErr)

// Empty search attributes are not validated with mapper.
sa = &commonpb.SearchAttributes{
IndexedFields: nil,
}
err = SubstituteAliases(&TestMapper{}, sa, "error-namespace")
sa, err = UnaliasFields(&TestMapper{}, sa, "error-namespace")
assert.NoError(t, err)
err = SubstituteAliases(&TestMapper{}, sa, "unknown-namespace")
assert.Nil(t, sa)
sa, err = UnaliasFields(&TestMapper{}, sa, "unknown-namespace")
assert.NoError(t, err)
assert.Nil(t, sa)

// Pass through aliases are not substituted.
sa = &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"pass-through": {Data: []byte("data1")},
},
}
sa, err = UnaliasFields(&TestMapper{}, sa, "test-namespace")
assert.NoError(t, err)
assert.Nil(t, sa)
}
Loading