Skip to content

Commit

Permalink
Make selector string casing configurable (elastic#18854)
Browse files Browse the repository at this point in the history
Add support for configuring the string casing in the index/pipeline/key/topic 'Selector'.

Elasticsearch pipeline and index names are required to be lower case only. When used with fields from events this was not always guaranteed, leading us to enforce lower case always (elastic#16081. elastic#6342).
As the code is reused for Kafka topic selection, this unfortunately did lead to a Regression as some users expect strings to allow mixed case (elastic#18640).
With this PR Elasticsearch related resources (e.g. index or pipeline names) are set to lowercase only, while not touching the strings in other outputs.

(cherry picked from commit 28f7aca)
  • Loading branch information
Steffen Siering committed Jun 12, 2020
1 parent bb1e840 commit b71e0c1
Show file tree
Hide file tree
Showing 16 changed files with 527 additions and 137 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818]
- Fix the `translate_sid` processor's handling of unconfigured target fields. {issue}18990[18990] {pull}18991[18991]
- Fixed a service restart failure under Windows. {issue}18914[18914] {pull}18916[18916]
- The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945]
- Fix kafka topic setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]
- Fix redis key setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]

*Auditbeat*

Expand Down
8 changes: 5 additions & 3 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package idxmgmt
import (
"errors"
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -197,6 +198,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector,
MultiKey: "indices",
EnableSingleOnly: true,
FailEmpty: mode != ilm.ModeEnabled,
Case: outil.SelectorLowerCase,
}

indexSel, err := outil.BuildSelectorFromConfig(selCfg, buildSettings)
Expand Down Expand Up @@ -354,15 +356,15 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {

if tmp := evt.Meta["alias"]; tmp != nil {
if alias, ok := tmp.(string); ok {
return alias
return strings.ToLower(alias)
}
}

if tmp := evt.Meta["index"]; tmp != nil {
if idx, ok := tmp.(string); ok {
ts := evt.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
strings.ToLower(idx), ts.Year(), ts.Month(), ts.Day())
}
}

Expand All @@ -372,7 +374,7 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
// which are then expanded by a processor to the "raw_index" field.
if tmp := evt.Meta["raw_index"]; tmp != nil {
if idx, ok := tmp.(string); ok {
return idx
return strings.ToLower(idx)
}
}

Expand Down
44 changes: 44 additions & 0 deletions libbeat/idxmgmt/std_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"without ilm must be lowercase": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "TeSt-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"event alias without ilm": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -147,6 +152,14 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"alias": "test",
},
},
"event alias without ilm must be lowercae": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("test"),
meta: common.MapStr{
"alias": "Test",
},
},
"event index without ilm": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -155,11 +168,24 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"index": "test",
},
},
"event index without ilm must be lowercase": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: dateIdx("test"),
meta: common.MapStr{
"index": "Test",
},
},
"with ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"with ilm must be lowercase": {
ilmCalls: ilmTemplateSettings("Test-9.9.9", "Test-9.9.9"),
cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"event alias wit ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -168,6 +194,14 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"alias": "event-alias",
},
},
"event alias wit ilm must be lowercase": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("event-alias"),
meta: common.MapStr{
"alias": "Event-alias",
},
},
"event index with ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -186,6 +220,16 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
},
want: stable("myindex"),
},
"use indices settings must be lowercase": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{
"index": "test-%{[agent.version]}",
"indices": []map[string]interface{}{
{"index": "MyIndex"},
},
},
want: stable("myindex"),
},
}
for name, test := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -327,7 +328,7 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
if event.Meta != nil {
if pipeline, exists := event.Meta["pipeline"]; exists {
if p, ok := pipeline.(string); ok {
return p, nil
return strings.ToLower(p), nil
}
return "", errors.New("pipeline metadata is no string")
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func doClientPing(t *testing.T) {
Headers: map[string]string{headerTestField: headerTestValue},
ProxyDisable: proxyDisable != "",
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test")),
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}
if proxy != "" {
proxyURL, err := url.Parse(proxy)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestClientWithHeaders(t *testing.T) {
"X-Test": "testing value",
},
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test")),
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}, nil)
assert.NoError(t, err)

Expand Down
17 changes: 11 additions & 6 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,7 @@ func buildSelectors(
return index, pipeline, err
}

pipelineSel, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "pipeline",
MultiKey: "pipelines",
EnableSingleOnly: true,
FailEmpty: false,
})
pipelineSel, err := buildPipelineSelector(cfg)
if err != nil {
return index, pipeline, err
}
Expand All @@ -148,3 +143,13 @@ func buildSelectors(

return index, pipeline, err
}

func buildPipelineSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "pipeline",
MultiKey: "pipelines",
EnableSingleOnly: true,
FailEmpty: false,
Case: outil.SelectorLowerCase,
})
}
58 changes: 58 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
)

Expand Down Expand Up @@ -73,3 +75,59 @@ func TestGlobalConnectCallbacksManagement(t *testing.T) {
t.Fatalf("third callback cannot be retrieved")
}
}

func TestPipelineSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"no pipline configured": {},
"pipeline configured": {
cfg: map[string]interface{}{"pipeline": "test"},
want: "test",
},
"pipeline must be lowercase": {
cfg: map[string]interface{}{"pipeline": "Test"},
want: "test",
},
"pipeline via event meta": {
event: beat.Event{Meta: common.MapStr{"pipeline": "test"}},
want: "test",
},
"pipeline via event meta must be lowercase": {
event: beat.Event{Meta: common.MapStr{"pipeline": "Test"}},
want: "test",
},
"pipelines setting": {
cfg: map[string]interface{}{
"pipelines": []map[string]interface{}{{"pipeline": "test"}},
},
want: "test",
},
"pipelines setting must be lowercase": {
cfg: map[string]interface{}{
"pipelines": []map[string]interface{}{{"pipeline": "Test"}},
},
want: "test",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildPipelineSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := getPipeline(&test.event, &selector)
if err != nil {
t.Fatalf("Failed to create pipeline name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
62 changes: 62 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka
import (
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand Down Expand Up @@ -97,3 +98,64 @@ func TestConfigInvalid(t *testing.T) {
})
}
}

func TestTopicSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"topic configured": {
cfg: map[string]interface{}{"topic": "test"},
want: "test",
},
"topic must keep case": {
cfg: map[string]interface{}{"topic": "Test"},
want: "Test",
},
"topics setting": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "test"}},
},
want: "test",
},
"topics setting must keep case": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "Test"}},
},
want: "Test",
},
"use event field": {
cfg: map[string]interface{}{"topic": "test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "from-event"},
},
want: "test-from-event",
},
"use event field must keep case": {
cfg: map[string]interface{}{"topic": "Test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "From-Event"},
},
want: "Test-From-Event",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildTopicSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := selector.Select(&test.event)
if err != nil {
t.Fatalf("Failed to create topic name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
17 changes: 11 additions & 6 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@ func makeKafka(
return outputs.Fail(err)
}

topic, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
})
topic, err := buildTopicSelector(cfg)
if err != nil {
return outputs.Fail(err)
}
Expand Down Expand Up @@ -102,3 +97,13 @@ func makeKafka(
}
return outputs.Success(config.BulkMaxSize, retry, client)
}

func buildTopicSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
Case: outil.SelectorKeepCase,
})
}
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func esConnect(t *testing.T, index string) *esConnection {

host := getElasticsearchHost()
indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index))
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "")
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "", outil.SelectorLowerCase)
indexSel := outil.MakeSelector(indexFmtExpr)
index, _ = indexSel.Select(&beat.Event{
Timestamp: ts,
Expand Down
Loading

0 comments on commit b71e0c1

Please sign in to comment.