diff --git a/server/modules/elastic/converter.go b/server/modules/elastic/converter.go index 9ab3513e..3c44e1fc 100644 --- a/server/modules/elastic/converter.go +++ b/server/modules/elastic/converter.go @@ -42,7 +42,7 @@ func makeTimeline(interval string) map[string]interface{} { timeline := make(map[string]interface{}) timelineFields := make(map[string]interface{}) timelineFields["field"] = "@timestamp" - timelineFields["interval"] = interval + timelineFields["fixed_interval"] = interval timelineFields["min_doc_count"] = 1 timeline["date_histogram"] = timelineFields return timeline @@ -114,6 +114,62 @@ func makeQuery(store *ElasticEventstore, parsedQuery *model.Query, beginTime tim return clause } +func calcTimelineInterval(intervals int, beginTime time.Time, endTime time.Time) string { + difference := endTime.Sub(beginTime) + intervalSeconds := difference.Seconds() / float64(intervals) + + // Find a common interval nearest the calculated interval + if intervalSeconds <= 3 { + return "1s" + } + if intervalSeconds <= 7 { + return "5s" + } + if intervalSeconds <= 13 { + return "10s" + } + if intervalSeconds <= 23 { + return "15s" + } + if intervalSeconds <= 45 { + return "30s" + } + if intervalSeconds <= 180 { + return "1m" + } + if intervalSeconds <= 420 { + return "5m" + } + if intervalSeconds <= 780 { + return "10m" + } + if intervalSeconds <= 1380 { + return "15m" + } + if intervalSeconds <= 2700 { + return "30m" + } + if intervalSeconds <= 5400 { + return "1h" + } + if intervalSeconds <= 25200 { + return "5h" + } + if intervalSeconds <= 54000 { + return "10h" + } + if intervalSeconds <= 259200 { + return "1d" + } + if intervalSeconds <= 604800 { + return "5d" + } + if intervalSeconds <= 1296000 { + return "10d" + } + return "30d" +} + func convertToElasticRequest(store *ElasticEventstore, criteria *model.EventSearchCriteria) (string, error) { var err error var esJson string @@ -124,7 +180,7 @@ func convertToElasticRequest(store *ElasticEventstore, criteria *model.EventSear aggregations := make(map[string]interface{}) esMap["aggs"] = aggregations - aggregations["timeline"] = makeTimeline("30m") + aggregations["timeline"] = makeTimeline(calcTimelineInterval(store.intervals, criteria.BeginTime, criteria.EndTime)) segment := criteria.ParsedQuery.NamedSegment(model.SegmentKind_GroupBy) if segment != nil { diff --git a/server/modules/elastic/converter_test.go b/server/modules/elastic/converter_test.go index cceb9ac8..a747a806 100644 --- a/server/modules/elastic/converter_test.go +++ b/server/modules/elastic/converter_test.go @@ -20,6 +20,7 @@ import ( func NewTestStore() *ElasticEventstore { return &ElasticEventstore{ fieldDefs: make(map[string]*FieldDefinition), + intervals: DEFAULT_INTERVALS, } } @@ -69,14 +70,38 @@ func TestMakeTimeline(tester *testing.T) { if terms["field"] != "@timestamp" { tester.Errorf("Expected %s, Actual %s", "@timestamp", terms["field"]) } - if terms["interval"] != "30m" { - tester.Errorf("Expected %s, Actual %s", "30m", terms["interval"]) + if terms["fixed_interval"] != "30m" { + tester.Errorf("Expected %s, Actual %s", "30m", terms["fixed_interval"]) } if terms["min_doc_count"] != 1 { tester.Errorf("Expected %d, Actual %d", 1, terms["min_doc_count"]) } } +func TestCalcTimelineInterval(tester *testing.T) { + start, _ := time.Parse(time.RFC3339, "2021-01-02T05:00:00Z") + end, _ := time.Parse(time.RFC3339, "2021-01-02T13:00:00Z") + interval := calcTimelineInterval(25, start, end) + if interval != "15m" { + tester.Errorf("Expected 15m interval but got %s", interval) + } + + // Boundaries + start, _ = time.Parse(time.RFC3339, "2021-01-02T13:00:00Z") + end, _ = time.Parse(time.RFC3339, "2021-01-02T13:00:01Z") + interval = calcTimelineInterval(25, start, end) + if interval != "1s" { + tester.Errorf("Expected 1s interval but got %s", interval) + } + + start, _ = time.Parse(time.RFC3339, "1990-01-02T05:00:00Z") + end, _ = time.Parse(time.RFC3339, "2021-01-02T13:00:00Z") + interval = calcTimelineInterval(25, start, end) + if interval != "30d" { + tester.Errorf("Expected 30d interval but got %s", interval) + } +} + func TestConvertToElasticRequestEmptyCriteria(tester *testing.T) { criteria := model.NewEventSearchCriteria() actualJson, err := convertToElasticRequest(NewTestStore(), criteria) @@ -84,7 +109,7 @@ func TestConvertToElasticRequestEmptyCriteria(tester *testing.T) { tester.Errorf("unexpected conversion error: %s", err) } - expectedJson := `{"aggs":{"timeline":{"date_histogram":{"field":"@timestamp","interval":"30m","min_doc_count":1}}},"query":{"bool":{"filter":[],"must":[{"query_string":{"analyze_wildcard":true,"default_field":"*","query":"*"}},{"range":{"@timestamp":{"format":"strict_date_optional_time","gte":"0001-01-01T00:00:00Z","lte":"0001-01-01T00:00:00Z"}}}],"must_not":[],"should":[]}},"size":25}` + expectedJson := `{"aggs":{"timeline":{"date_histogram":{"field":"@timestamp","fixed_interval":"1s","min_doc_count":1}}},"query":{"bool":{"filter":[],"must":[{"query_string":{"analyze_wildcard":true,"default_field":"*","query":"*"}},{"range":{"@timestamp":{"format":"strict_date_optional_time","gte":"0001-01-01T00:00:00Z","lte":"0001-01-01T00:00:00Z"}}}],"must_not":[],"should":[]}},"size":25}` if actualJson != expectedJson { tester.Errorf("Mismatched ES request conversion; actual='%s' vs expected='%s'", actualJson, expectedJson) } @@ -98,7 +123,7 @@ func TestConvertToElasticRequestGroupByCriteria(tester *testing.T) { tester.Errorf("unexpected conversion error: %s", err) } - expectedJson := `{"aggs":{"bottom":{"terms":{"field":"ghi","order":{"_count":"asc"},"size":10}},"groupby|ghi":{"aggs":{"groupby|ghi|jkl":{"terms":{"field":"jkl","order":{"_count":"desc"},"size":10}}},"terms":{"field":"ghi","order":{"_count":"desc"},"size":10}},"timeline":{"date_histogram":{"field":"@timestamp","interval":"30m","min_doc_count":1}}},"query":{"bool":{"filter":[],"must":[{"query_string":{"analyze_wildcard":true,"default_field":"*","query":"abc AND def AND q: \"\\\\\\\\file\\\\path\""}},{"range":{"@timestamp":{"format":"strict_date_optional_time","gte":"2020-01-02T12:13:14Z","lte":"2020-01-02T13:13:14Z"}}}],"must_not":[],"should":[]}},"size":25}` + expectedJson := `{"aggs":{"bottom":{"terms":{"field":"ghi","order":{"_count":"asc"},"size":10}},"groupby|ghi":{"aggs":{"groupby|ghi|jkl":{"terms":{"field":"jkl","order":{"_count":"desc"},"size":10}}},"terms":{"field":"ghi","order":{"_count":"desc"},"size":10}},"timeline":{"date_histogram":{"field":"@timestamp","fixed_interval":"1m","min_doc_count":1}}},"query":{"bool":{"filter":[],"must":[{"query_string":{"analyze_wildcard":true,"default_field":"*","query":"abc AND def AND q: \"\\\\\\\\file\\\\path\""}},{"range":{"@timestamp":{"format":"strict_date_optional_time","gte":"2020-01-02T12:13:14Z","lte":"2020-01-02T13:13:14Z"}}}],"must_not":[],"should":[]}},"size":25}` if actualJson != expectedJson { tester.Errorf("Mismatched ES request conversion; actual='%s' vs expected='%s'", actualJson, expectedJson) } diff --git a/server/modules/elastic/elastic.go b/server/modules/elastic/elastic.go index 28a22278..1254e1be 100644 --- a/server/modules/elastic/elastic.go +++ b/server/modules/elastic/elastic.go @@ -22,6 +22,7 @@ const DEFAULT_TIMEOUT_MS = 120000 const DEFAULT_CACHE_MS = 86400000 const DEFAULT_INDEX = "*:so-*" const DEFAULT_ASYNC_THRESHOLD = 10 +const DEFAULT_INTERVALS = 25 type Elastic struct { config module.ModuleConfig @@ -54,7 +55,8 @@ func (elastic *Elastic) Init(cfg module.ModuleConfig) error { cacheMs := module.GetIntDefault(cfg, "cacheMs", DEFAULT_CACHE_MS) index := module.GetStringDefault(cfg, "index", DEFAULT_INDEX) asyncThreshold := module.GetIntDefault(cfg, "asyncThreshold", DEFAULT_ASYNC_THRESHOLD) - err := elastic.store.Init(host, remoteHosts, username, password, verifyCert, timeShiftMs, defaultDurationMs, esSearchOffsetMs, timeoutMs, cacheMs, index, asyncThreshold) + intervals := module.GetIntDefault(cfg, "intervals", DEFAULT_INTERVALS) + err := elastic.store.Init(host, remoteHosts, username, password, verifyCert, timeShiftMs, defaultDurationMs, esSearchOffsetMs, timeoutMs, cacheMs, index, asyncThreshold, intervals) if err == nil && elastic.server != nil { elastic.server.Eventstore = elastic.store } diff --git a/server/modules/elastic/elastic_test.go b/server/modules/elastic/elastic_test.go index 90060897..d78ae894 100644 --- a/server/modules/elastic/elastic_test.go +++ b/server/modules/elastic/elastic_test.go @@ -50,4 +50,7 @@ func TestElasticInit(tester *testing.T) { if elastic.store.index != DEFAULT_INDEX { tester.Errorf("expected index %s but got %s", DEFAULT_INDEX, elastic.store.index) } + if elastic.store.intervals != DEFAULT_INTERVALS { + tester.Errorf("expected interval %d but got %d", DEFAULT_INTERVALS, elastic.store.intervals) + } } diff --git a/server/modules/elastic/elasticeventstore.go b/server/modules/elastic/elasticeventstore.go index 6fd860bc..6479e005 100644 --- a/server/modules/elastic/elasticeventstore.go +++ b/server/modules/elastic/elasticeventstore.go @@ -54,6 +54,7 @@ type ElasticEventstore struct { cacheTime time.Time cacheLock sync.Mutex fieldDefs map[string]*FieldDefinition + intervals int asyncThreshold int } @@ -76,7 +77,8 @@ func (store *ElasticEventstore) Init(hostUrl string, timeoutMs int, cacheMs int, index string, - asyncThreshold int) error { + asyncThreshold int, + intervals int) error { store.timeShiftMs = timeShiftMs store.defaultDurationMs = defaultDurationMs store.esSearchOffsetMs = esSearchOffsetMs @@ -84,6 +86,7 @@ func (store *ElasticEventstore) Init(hostUrl string, store.asyncThreshold = asyncThreshold store.timeoutMs = time.Duration(timeoutMs) * time.Millisecond store.cacheMs = time.Duration(cacheMs) * time.Millisecond + store.intervals = intervals var err error store.esClient, err = store.makeEsClient(hostUrl, user, pass, verifyCert)