Skip to content

Commit

Permalink
Adjust the histogram intervals based on the search period to accommod…
Browse files Browse the repository at this point in the history
…ate very large and very small search windows
  • Loading branch information
jertel committed Feb 23, 2021
1 parent 00ca15a commit 328fae7
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 8 deletions.
60 changes: 58 additions & 2 deletions server/modules/elastic/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func makeTimeline(interval string) map[string]interface{} {
timeline := make(map[string]interface{})
timelineFields := make(map[string]interface{})
timelineFields["field"] = "@timestamp"
timelineFields["interval"] = interval
timelineFields["fixed_interval"] = interval
timelineFields["min_doc_count"] = 1
timeline["date_histogram"] = timelineFields
return timeline
Expand Down Expand Up @@ -114,6 +114,62 @@ func makeQuery(store *ElasticEventstore, parsedQuery *model.Query, beginTime tim
return clause
}

func calcTimelineInterval(intervals int, beginTime time.Time, endTime time.Time) string {
difference := endTime.Sub(beginTime)
intervalSeconds := difference.Seconds() / float64(intervals)

// Find a common interval nearest the calculated interval
if intervalSeconds <= 3 {
return "1s"
}
if intervalSeconds <= 7 {
return "5s"
}
if intervalSeconds <= 13 {
return "10s"
}
if intervalSeconds <= 23 {
return "15s"
}
if intervalSeconds <= 45 {
return "30s"
}
if intervalSeconds <= 180 {
return "1m"
}
if intervalSeconds <= 420 {
return "5m"
}
if intervalSeconds <= 780 {
return "10m"
}
if intervalSeconds <= 1380 {
return "15m"
}
if intervalSeconds <= 2700 {
return "30m"
}
if intervalSeconds <= 5400 {
return "1h"
}
if intervalSeconds <= 25200 {
return "5h"
}
if intervalSeconds <= 54000 {
return "10h"
}
if intervalSeconds <= 259200 {
return "1d"
}
if intervalSeconds <= 604800 {
return "5d"
}
if intervalSeconds <= 1296000 {
return "10d"
}
return "30d"
}

func convertToElasticRequest(store *ElasticEventstore, criteria *model.EventSearchCriteria) (string, error) {
var err error
var esJson string
Expand All @@ -124,7 +180,7 @@ func convertToElasticRequest(store *ElasticEventstore, criteria *model.EventSear

aggregations := make(map[string]interface{})
esMap["aggs"] = aggregations
aggregations["timeline"] = makeTimeline("30m")
aggregations["timeline"] = makeTimeline(calcTimelineInterval(store.intervals, criteria.BeginTime, criteria.EndTime))

segment := criteria.ParsedQuery.NamedSegment(model.SegmentKind_GroupBy)
if segment != nil {
Expand Down
33 changes: 29 additions & 4 deletions server/modules/elastic/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
func NewTestStore() *ElasticEventstore {
return &ElasticEventstore{
fieldDefs: make(map[string]*FieldDefinition),
intervals: DEFAULT_INTERVALS,
}
}

Expand Down Expand Up @@ -69,22 +70,46 @@ func TestMakeTimeline(tester *testing.T) {
if terms["field"] != "@timestamp" {
tester.Errorf("Expected %s, Actual %s", "@timestamp", terms["field"])
}
if terms["interval"] != "30m" {
tester.Errorf("Expected %s, Actual %s", "30m", terms["interval"])
if terms["fixed_interval"] != "30m" {
tester.Errorf("Expected %s, Actual %s", "30m", terms["fixed_interval"])
}
if terms["min_doc_count"] != 1 {
tester.Errorf("Expected %d, Actual %d", 1, terms["min_doc_count"])
}
}

func TestCalcTimelineInterval(tester *testing.T) {
start, _ := time.Parse(time.RFC3339, "2021-01-02T05:00:00Z")
end, _ := time.Parse(time.RFC3339, "2021-01-02T13:00:00Z")
interval := calcTimelineInterval(25, start, end)
if interval != "15m" {
tester.Errorf("Expected 15m interval but got %s", interval)
}

// Boundaries
start, _ = time.Parse(time.RFC3339, "2021-01-02T13:00:00Z")
end, _ = time.Parse(time.RFC3339, "2021-01-02T13:00:01Z")
interval = calcTimelineInterval(25, start, end)
if interval != "1s" {
tester.Errorf("Expected 1s interval but got %s", interval)
}

start, _ = time.Parse(time.RFC3339, "1990-01-02T05:00:00Z")
end, _ = time.Parse(time.RFC3339, "2021-01-02T13:00:00Z")
interval = calcTimelineInterval(25, start, end)
if interval != "30d" {
tester.Errorf("Expected 30d interval but got %s", interval)
}
}

func TestConvertToElasticRequestEmptyCriteria(tester *testing.T) {
criteria := model.NewEventSearchCriteria()
actualJson, err := convertToElasticRequest(NewTestStore(), criteria)
if err != nil {
tester.Errorf("unexpected conversion error: %s", err)
}

expectedJson := `{"aggs":{"timeline":{"date_histogram":{"field":"@timestamp","interval":"30m","min_doc_count":1}}},"query":{"bool":{"filter":[],"must":[{"query_string":{"analyze_wildcard":true,"default_field":"*","query":"*"}},{"range":{"@timestamp":{"format":"strict_date_optional_time","gte":"0001-01-01T00:00:00Z","lte":"0001-01-01T00:00:00Z"}}}],"must_not":[],"should":[]}},"size":25}`
expectedJson := `{"aggs":{"timeline":{"date_histogram":{"field":"@timestamp","fixed_interval":"1s","min_doc_count":1}}},"query":{"bool":{"filter":[],"must":[{"query_string":{"analyze_wildcard":true,"default_field":"*","query":"*"}},{"range":{"@timestamp":{"format":"strict_date_optional_time","gte":"0001-01-01T00:00:00Z","lte":"0001-01-01T00:00:00Z"}}}],"must_not":[],"should":[]}},"size":25}`
if actualJson != expectedJson {
tester.Errorf("Mismatched ES request conversion; actual='%s' vs expected='%s'", actualJson, expectedJson)
}
Expand All @@ -98,7 +123,7 @@ func TestConvertToElasticRequestGroupByCriteria(tester *testing.T) {
tester.Errorf("unexpected conversion error: %s", err)
}

expectedJson := `{"aggs":{"bottom":{"terms":{"field":"ghi","order":{"_count":"asc"},"size":10}},"groupby|ghi":{"aggs":{"groupby|ghi|jkl":{"terms":{"field":"jkl","order":{"_count":"desc"},"size":10}}},"terms":{"field":"ghi","order":{"_count":"desc"},"size":10}},"timeline":{"date_histogram":{"field":"@timestamp","interval":"30m","min_doc_count":1}}},"query":{"bool":{"filter":[],"must":[{"query_string":{"analyze_wildcard":true,"default_field":"*","query":"abc AND def AND q: \"\\\\\\\\file\\\\path\""}},{"range":{"@timestamp":{"format":"strict_date_optional_time","gte":"2020-01-02T12:13:14Z","lte":"2020-01-02T13:13:14Z"}}}],"must_not":[],"should":[]}},"size":25}`
expectedJson := `{"aggs":{"bottom":{"terms":{"field":"ghi","order":{"_count":"asc"},"size":10}},"groupby|ghi":{"aggs":{"groupby|ghi|jkl":{"terms":{"field":"jkl","order":{"_count":"desc"},"size":10}}},"terms":{"field":"ghi","order":{"_count":"desc"},"size":10}},"timeline":{"date_histogram":{"field":"@timestamp","fixed_interval":"1m","min_doc_count":1}}},"query":{"bool":{"filter":[],"must":[{"query_string":{"analyze_wildcard":true,"default_field":"*","query":"abc AND def AND q: \"\\\\\\\\file\\\\path\""}},{"range":{"@timestamp":{"format":"strict_date_optional_time","gte":"2020-01-02T12:13:14Z","lte":"2020-01-02T13:13:14Z"}}}],"must_not":[],"should":[]}},"size":25}`
if actualJson != expectedJson {
tester.Errorf("Mismatched ES request conversion; actual='%s' vs expected='%s'", actualJson, expectedJson)
}
Expand Down
4 changes: 3 additions & 1 deletion server/modules/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const DEFAULT_TIMEOUT_MS = 120000
const DEFAULT_CACHE_MS = 86400000
const DEFAULT_INDEX = "*:so-*"
const DEFAULT_ASYNC_THRESHOLD = 10
const DEFAULT_INTERVALS = 25

type Elastic struct {
config module.ModuleConfig
Expand Down Expand Up @@ -54,7 +55,8 @@ func (elastic *Elastic) Init(cfg module.ModuleConfig) error {
cacheMs := module.GetIntDefault(cfg, "cacheMs", DEFAULT_CACHE_MS)
index := module.GetStringDefault(cfg, "index", DEFAULT_INDEX)
asyncThreshold := module.GetIntDefault(cfg, "asyncThreshold", DEFAULT_ASYNC_THRESHOLD)
err := elastic.store.Init(host, remoteHosts, username, password, verifyCert, timeShiftMs, defaultDurationMs, esSearchOffsetMs, timeoutMs, cacheMs, index, asyncThreshold)
intervals := module.GetIntDefault(cfg, "intervals", DEFAULT_INTERVALS)
err := elastic.store.Init(host, remoteHosts, username, password, verifyCert, timeShiftMs, defaultDurationMs, esSearchOffsetMs, timeoutMs, cacheMs, index, asyncThreshold, intervals)
if err == nil && elastic.server != nil {
elastic.server.Eventstore = elastic.store
}
Expand Down
3 changes: 3 additions & 0 deletions server/modules/elastic/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ func TestElasticInit(tester *testing.T) {
if elastic.store.index != DEFAULT_INDEX {
tester.Errorf("expected index %s but got %s", DEFAULT_INDEX, elastic.store.index)
}
if elastic.store.intervals != DEFAULT_INTERVALS {
tester.Errorf("expected interval %d but got %d", DEFAULT_INTERVALS, elastic.store.intervals)
}
}
5 changes: 4 additions & 1 deletion server/modules/elastic/elasticeventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ElasticEventstore struct {
cacheTime time.Time
cacheLock sync.Mutex
fieldDefs map[string]*FieldDefinition
intervals int
asyncThreshold int
}

Expand All @@ -76,14 +77,16 @@ func (store *ElasticEventstore) Init(hostUrl string,
timeoutMs int,
cacheMs int,
index string,
asyncThreshold int) error {
asyncThreshold int,
intervals int) error {
store.timeShiftMs = timeShiftMs
store.defaultDurationMs = defaultDurationMs
store.esSearchOffsetMs = esSearchOffsetMs
store.index = index
store.asyncThreshold = asyncThreshold
store.timeoutMs = time.Duration(timeoutMs) * time.Millisecond
store.cacheMs = time.Duration(cacheMs) * time.Millisecond
store.intervals = intervals

var err error
store.esClient, err = store.makeEsClient(hostUrl, user, pass, verifyCert)
Expand Down

0 comments on commit 328fae7

Please sign in to comment.