From 29d47829bf82a6c18e505c35c4e5ffefc842f508 Mon Sep 17 00:00:00 2001 From: Martin Majlis <122797378+martin-majlis-s1@users.noreply.github.com> Date: Tue, 21 Nov 2023 10:39:13 +0100 Subject: [PATCH] DSET-4559: Group By fields are part of SessionInfo (#62) * DSET-4559: Group By fields are part of SessionInfo * DSET-4559: Move grouped attributes to session info * Adjust code so that it looks in the UI as expected * Incorporate comments from PR --- examples/client/go.mod | 6 +- examples/client/go.sum | 3 + examples/readme/go.mod | 6 +- examples/readme/go.sum | 3 + pkg/api/add_events/add_events.go | 14 +- pkg/api/add_events/event_bundle.go | 27 - pkg/api/add_events/event_bundle_test.go | 37 -- pkg/buffer/buffer_test.go | 12 +- pkg/client/add_events.go | 142 ++++- pkg/client/add_events_long_running_test.go | 3 - pkg/client/add_events_test.go | 577 ++++++++++++++---- pkg/client/client.go | 29 +- pkg/client/client_test.go | 7 +- pkg/config/config.go | 15 +- pkg/config/config_test.go | 5 +- pkg/version/version.go | 4 +- pkg/version/version_test.go | 4 +- test/testdata/buffer_test_payload_full.json | 4 +- .../buffer_test_payload_injection.json | 4 +- 19 files changed, 636 insertions(+), 266 deletions(-) diff --git a/examples/client/go.mod b/examples/client/go.mod index a73b127..a936d38 100644 --- a/examples/client/go.mod +++ b/examples/client/go.mod @@ -4,15 +4,15 @@ go 1.19 require ( github.com/scalyr/dataset-go v0.0.0 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.26.0 ) require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cskr/pubsub v1.0.2 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.4.0 // indirect go.uber.org/atomic v1.7.0 // indirect - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/multierr v1.10.0 // indirect golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect ) diff --git a/examples/client/go.sum b/examples/client/go.sum index 63e98d5..fbe1f3a 100644 --- a/examples/client/go.sum +++ b/examples/client/go.sum @@ -11,6 +11,7 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -26,8 +27,10 @@ go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= diff --git a/examples/readme/go.mod b/examples/readme/go.mod index 0c0a1da..0159991 100644 --- a/examples/readme/go.mod +++ b/examples/readme/go.mod @@ -4,15 +4,15 @@ go 1.19 require ( github.com/scalyr/dataset-go v0.0.0 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.26.0 ) require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cskr/pubsub v1.0.2 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.4.0 // indirect go.uber.org/atomic v1.7.0 // indirect - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/multierr v1.10.0 // indirect golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect ) diff --git a/examples/readme/go.sum b/examples/readme/go.sum index 63e98d5..fbe1f3a 100644 --- a/examples/readme/go.sum +++ b/examples/readme/go.sum @@ -11,6 +11,7 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -26,8 +27,10 @@ go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= diff --git a/pkg/api/add_events/add_events.go b/pkg/api/add_events/add_events.go index 782cc21..f7f9e7d 100644 --- a/pkg/api/add_events/add_events.go +++ b/pkg/api/add_events/add_events.go @@ -27,12 +27,16 @@ import ( ) const ( - AttrBundleKey = "bundle_key" + AttrSessionKey = "session_key" AttrServerHost = "serverHost" AttrOrigServerHost = "__origServerHost" + AttrLogFile = "logfile" ) -type EventAttrs = map[string]interface{} +type ( + EventAttrs = map[string]interface{} + SessionInfo = map[string]interface{} +) // Event represents DataSet REST API event structure (see https://app.scalyr.com/help/api#addEvents) type Event struct { @@ -68,12 +72,6 @@ type Log struct { Attrs map[string]interface{} `json:"attrs"` } -type SessionInfo struct { - ServerType string `json:"serverType,omitempty"` - ServerId string `json:"serverId,omitempty"` - Region string `json:"region,omitempty"` -} - // AddEventsRequestParams represents a represents a AddEvent DataSet REST API request parameters, see https://app.scalyr.com/help/api#addEvents. type AddEventsRequestParams struct { Session string `json:"session,omitempty"` diff --git a/pkg/api/add_events/event_bundle.go b/pkg/api/add_events/event_bundle.go index cba2ebf..47477ee 100644 --- a/pkg/api/add_events/event_bundle.go +++ b/pkg/api/add_events/event_bundle.go @@ -16,12 +16,6 @@ package add_events -import ( - "crypto/md5" - "encoding/hex" - "fmt" -) - // EventBundle represents a single DataSet event wrapper structure (see https://app.scalyr.com/help/api#addEvents) // Event - Zero or more events (log messages) to upload. // Thread - Optional. Lets you create a readable name for each thread in Event. @@ -32,24 +26,3 @@ type EventBundle struct { Thread *Thread Log *Log } - -func (bundle *EventBundle) Key(groupBy []string) string { - // construct key - key := "" - for _, k := range groupBy { - val, ok := bundle.Event.Attrs[k] - if ok { - key += fmt.Sprintf("%s:%s", k, val) - } - } - - // use md5 to shorten the key - hash := md5.Sum([]byte(key)) - bundleKey := hex.EncodeToString(hash[:]) - - // add the key as attribute - bundle.Event.Attrs[AttrBundleKey] = bundleKey - - // return the key - return bundleKey -} diff --git a/pkg/api/add_events/event_bundle_test.go b/pkg/api/add_events/event_bundle_test.go index 06e9d90..4abbf39 100644 --- a/pkg/api/add_events/event_bundle_test.go +++ b/pkg/api/add_events/event_bundle_test.go @@ -15,40 +15,3 @@ */ package add_events - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEventBundle(t *testing.T) { - event := &Event{ - Thread: "5", - Sev: 3, - Ts: "0", - Attrs: map[string]interface{}{ - "foo": "a", - "bar": "b", - "baz": "a", - }, - } - bundle := &EventBundle{Event: event} - - keyFoo := bundle.Key([]string{"foo"}) - keyBar := bundle.Key([]string{"bar"}) - keyBaz := bundle.Key([]string{"baz"}) - keyNotThere1 := bundle.Key([]string{"notThere1"}) - keyNotThere2 := bundle.Key([]string{"notThere2"}) - - assert.Equal(t, "ef9faec68698672038857b2647429002", keyFoo) - assert.Equal(t, "55a2f7ebf2af8927837c599131d32d07", keyBar) - assert.Equal(t, "6dd515483537f552fd5fa604cd60f0d9", keyBaz) - assert.Equal(t, "d41d8cd98f00b204e9800998ecf8427e", keyNotThere1) - assert.Equal(t, "d41d8cd98f00b204e9800998ecf8427e", keyNotThere2) - - // although the value is same, key should be different because attributes differ - assert.NotEqual(t, keyBaz, keyFoo) - // non-existing attributes should have the same key - assert.Equal(t, keyNotThere1, keyNotThere2) -} diff --git a/pkg/buffer/buffer_test.go b/pkg/buffer/buffer_test.go index 93c688b..4f7c02e 100644 --- a/pkg/buffer/buffer_test.go +++ b/pkg/buffer/buffer_test.go @@ -82,9 +82,9 @@ func createTestBundle() add_events.EventBundle { func createEmptyBuffer() *Buffer { sessionInfo := &add_events.SessionInfo{ - ServerId: "serverId", - ServerType: "serverType", - Region: "region", + "serverId": "serverId", + "serverType": "serverType", + "region": "region", } session := "session" token := "token" @@ -136,9 +136,9 @@ func TestPayloadFull(t *testing.T) { func TestPayloadInjection(t *testing.T) { sessionInfo := &add_events.SessionInfo{ - ServerId: "serverId\",\"sI\":\"I", - ServerType: "serverType\",\"sT\":\"T", - Region: "region\",\"r\":\"R", + "serverId": "serverId\",\"sI\":\"I", + "serverType": "serverType\",\"sT\":\"T", + "region": "region\",\"r\":\"R", } session := "session\",\"s\":\"S" token := "token\",\"events\":[{}],\"foo\":\"bar" diff --git a/pkg/client/add_events.go b/pkg/client/add_events.go index 9b5c81e..b2c1a13 100644 --- a/pkg/client/add_events.go +++ b/pkg/client/add_events.go @@ -17,6 +17,8 @@ package client import ( + "crypto/md5" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -40,6 +42,90 @@ import ( Wrapper around: https://app.scalyr.com/help/api#addEvents */ +type EventWithMeta struct { + EventBundle *add_events.EventBundle + Key string + SessionInfo add_events.SessionInfo +} + +func NewEventWithMeta( + bundle *add_events.EventBundle, + groupBy []string, + serverHost string, + debug bool, +) EventWithMeta { + // initialise + key := "" + info := make(add_events.SessionInfo) + + // adjust server host attribute + adjustServerHost(bundle, serverHost) + + // construct Key + bundleKey := extractKeyAndUpdateInfo(bundle, groupBy, key, info) + + // in debug mode include bundleKey + if debug { + info[add_events.AttrSessionKey] = bundleKey + } + + return EventWithMeta{ + EventBundle: bundle, + Key: bundleKey, + SessionInfo: info, + } +} + +func extractKeyAndUpdateInfo(bundle *add_events.EventBundle, groupBy []string, key string, info add_events.SessionInfo) string { + // construct key and move attributes from attrs to sessionInfo + for _, k := range groupBy { + val, ok := bundle.Event.Attrs[k] + key += k + ":" + if ok { + key += fmt.Sprintf("%s", val) + + // move to session info and remove from attributes + info[k] = val + delete(bundle.Event.Attrs, k) + } + key += "___DELIM___" + } + + // use md5 to shorten the key + hash := md5.Sum([]byte(key)) + return hex.EncodeToString(hash[:]) +} + +func adjustServerHost(bundle *add_events.EventBundle, serverHost string) { + // figure out serverHost value + usedServerHost := serverHost + // if event's ServerHost is set, use it + if len(bundle.Event.ServerHost) > 0 { + usedServerHost = bundle.Event.ServerHost + } else { + // if somebody is using library directly and forget to set Event.ServerHost, + // lets check the attributes first + + // check serverHost attribute + attrHost, ok := bundle.Event.Attrs[add_events.AttrServerHost] + if ok { + usedServerHost = attrHost.(string) + } else { + // if serverHost attribute is not set, check the __origServerHost + attrOrigHost, okOrig := bundle.Event.Attrs[add_events.AttrOrigServerHost] + if okOrig { + usedServerHost = attrOrigHost.(string) + } + } + } + + // for the SessionInfo, it has to be in the serverHost + // therefore remove the orig and set the serverHost + // so that it can be in the next step moved to sessionInfo + delete(bundle.Event.Attrs, add_events.AttrOrigServerHost) + bundle.Event.Attrs[add_events.AttrServerHost] = usedServerHost +} + // AddEvents enqueues given events for processing (sending to Dataset). // It returns an error if the batch was not accepted (e.g. exporter in error state and retrying handle previous batches or client is being shutdown). func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error { @@ -53,10 +139,16 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error // then, figure out which keys are part of the batch // store there information about the host - seenKeys := make(map[string]bool) + bundlesWithMeta := make(map[string][]EventWithMeta) for _, bundle := range bundles { - key := bundle.Key(client.Config.BufferSettings.GroupBy) - seenKeys[key] = true + bWM := NewEventWithMeta(bundle, client.Config.BufferSettings.GroupBy, client.serverHost, client.Config.Debug) + + list, found := bundlesWithMeta[bWM.Key] + if !found { + bundlesWithMeta[bWM.Key] = []EventWithMeta{bWM} + } else { + bundlesWithMeta[bWM.Key] = append(list, bWM) + } } // update time when the first batch was received @@ -69,42 +161,27 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error // add subscriber for buffer by key client.addEventsMutex.Lock() defer client.addEventsMutex.Unlock() - for key := range seenKeys { + for key, list := range bundlesWithMeta { _, found := client.eventBundleSubscriptionChannels[key] if !found { // add information about the host to the sessionInfo - client.newBufferForEvents(key) + client.newBufferForEvents(key, &list[0].SessionInfo) client.newEventBundleSubscriberRoutine(key) } } // and as last step - publish them - for _, bundle := range bundles { - key := bundle.Key(client.Config.BufferSettings.GroupBy) - client.eventBundlePerKeyTopic.Pub(bundle, key) - client.eventsEnqueued.Add(1) + for key, list := range bundlesWithMeta { + for _, bundle := range list { + client.eventBundlePerKeyTopic.Pub(bundle, key) + client.eventsEnqueued.Add(1) + } } return nil } -// fixServerHostsInBundle fills the attribute __origServerHost for the event -// and removes the attribute serverHost. This is needed to properly associate -// incoming events with the correct host -func (client *DataSetClient) fixServerHostsInBundle(bundle *add_events.EventBundle) { - delete(bundle.Event.Attrs, add_events.AttrServerHost) - - // set the attribute __origServerHost to the event's ServerHost - if len(bundle.Event.ServerHost) > 0 { - bundle.Event.Attrs[add_events.AttrOrigServerHost] = bundle.Event.ServerHost - return - } - - // as fallback use the value set to the client - bundle.Event.Attrs[add_events.AttrOrigServerHost] = client.serverHost -} - func (client *DataSetClient) newEventBundleSubscriberRoutine(key string) { ch := client.eventBundlePerKeyTopic.Sub(key) client.eventBundleSubscriptionChannels[key] = ch @@ -113,11 +190,11 @@ func (client *DataSetClient) newEventBundleSubscriberRoutine(key string) { })(key, ch) } -func (client *DataSetClient) newBufferForEvents(key string) { +func (client *DataSetClient) newBufferForEvents(key string, info *add_events.SessionInfo) { session := fmt.Sprintf("%s-%s", client.Id, key) buf := buffer.NewEmptyBuffer(session, client.Config.Tokens.WriteLog) - client.initBuffer(buf, client.SessionInfo) + client.initBuffer(buf, info) client.buffersAllMutex.Lock() client.buffers[session] = buf @@ -150,7 +227,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte msg, channelReceiveSuccess := <-ch if !channelReceiveSuccess { client.Logger.Error( - "Cannot receive EventBundle from channel", + "Cannot receive EventWithMeta from channel", zap.String("key", key), zap.Any("msg", msg), ) @@ -159,12 +236,11 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte continue } - bundle, ok := msg.(*add_events.EventBundle) + bundle, ok := msg.(EventWithMeta) if ok { buf := getBuffer(key) - client.fixServerHostsInBundle(bundle) - added, err := buf.AddBundle(bundle) + added, err := buf.AddBundle(bundle.EventBundle) if err != nil { if errors.Is(err, &buffer.NotAcceptingError{}) { buf = getBuffer(key) @@ -184,7 +260,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte } if added == buffer.TooMuch { - added, err = buf.AddBundle(bundle) + added, err = buf.AddBundle(bundle.EventBundle) if err != nil { if errors.Is(err, &buffer.NotAcceptingError{}) { buf = getBuffer(key) @@ -214,7 +290,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte } } else { client.Logger.Error( - "Cannot convert message to EventBundle", + "Cannot convert message to EventWithMeta", zap.String("key", key), zap.Any("msg", msg), ) diff --git a/pkg/client/add_events_long_running_test.go b/pkg/client/add_events_long_running_test.go index a4a99e8..235e398 100644 --- a/pkg/client/add_events_long_running_test.go +++ b/pkg/client/add_events_long_running_test.go @@ -105,9 +105,6 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo - for bI := 0; bI < MaxBatchCount; bI++ { batch := make([]*add_events.EventBundle, 0) for lI := 0; lI < LogsPerBatch; lI++ { diff --git a/pkg/client/add_events_test.go b/pkg/client/add_events_test.go index 15b6c8d..fa9ca44 100644 --- a/pkg/client/add_events_test.go +++ b/pkg/client/add_events_test.go @@ -65,16 +65,21 @@ func extract(req *http.Request) (add_events.AddEventsRequest, error) { type ( tAttr = add_events.EventAttrs + tInfo = add_events.SessionInfo tEvent struct { attrs tAttr serverHost string } + tBundle struct { + attrs tAttr + info tInfo + } ) const attributeKey = "key" // byKey implement sort.Interface - https://pkg.go.dev/sort#Interface -type byKey [][]tAttr +type byKey [][]tBundle func (s byKey) Len() int { return len(s) @@ -87,12 +92,145 @@ func (s byKey) Swap(i, j int) { // Less returns true if ith element is nil or it's string representation // is before jth element. func (s byKey) Less(i, j int) bool { - if s[i][0][attributeKey] == nil { + if s[i][0].attrs[attributeKey] == nil && s[j][0].attrs[attributeKey] == nil { + if s[i][0].info[attributeKey] == nil { + return true + } else if s[j][0].info[attributeKey] == nil { + return false + } else { + return s[i][0].info[attributeKey].(string) < s[j][0].info[attributeKey].(string) + } + } else if s[i][0].attrs[attributeKey] == nil { return true - } else if s[j][0][attributeKey] == nil { + } else if s[j][0].attrs[attributeKey] == nil { return false } else { - return s[i][0][attributeKey].(string) < s[j][0][attributeKey].(string) + return s[i][0].attrs[attributeKey].(string) < s[j][0].attrs[attributeKey].(string) + } +} + +func TestNewEventWithMeta(t *testing.T) { + k1 := "k1" + k2 := "k2" + k3 := "k3" + k4 := "k4" + k5 := "k5" + v1 := "v1" + v2 := "v2" + v3 := "v3" + + tests := []struct { + name string + groupBy []string + debug bool + expKey string + expAttrs add_events.EventAttrs + expSessionInfo add_events.SessionInfo + }{ + // when no grouping is used, then attributes are kept + { + name: "empty group by", + groupBy: []string{}, + debug: false, + expKey: "d41d8cd98f00b204e9800998ecf8427e", + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{}, + }, + + // when no grouping is used, then attributes are kept + { + name: "empty group by", + groupBy: []string{}, + debug: true, + expKey: "d41d8cd98f00b204e9800998ecf8427e", + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrSessionKey: "d41d8cd98f00b204e9800998ecf8427e", + }, + }, + + // group by not specified attribute - 1 + { + name: "group by unused attribute - 1", + groupBy: []string{k4}, + debug: false, + expKey: "746ea36093d470e90a9c2fbf07c8ed17", + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{}, + }, + + // group by not specified attribute - 2 + { + name: "group by unused attribute - 2", + groupBy: []string{k5}, + debug: false, + expKey: "d9b675aac82295ce36dad72278888308", + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{}, + }, + + // group by two attributes + { + name: "group by two attributes - 1", + groupBy: []string{k1, k2}, + debug: false, + expKey: "4410d57b15f30fb22e92fc3e2338f288", + expAttrs: add_events.EventAttrs{ + k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + k1: v1, k2: v2, + }, + }, + + // group by two attributes - swapped + { + name: "group by two attributes - 2", + groupBy: []string{k2, k1}, + debug: false, + expKey: "ce28b69e77b27f012501095cb343e2ae", + expAttrs: add_events.EventAttrs{ + k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + k1: v1, k2: v2, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(*testing.T) { + event := &add_events.Event{ + Thread: "5", + Sev: 3, + Ts: "0", + Attrs: map[string]interface{}{ + k1: v1, + k2: v2, + k3: v3, + }, + } + + eWM := NewEventWithMeta( + &add_events.EventBundle{Event: event}, + tt.groupBy, + "serverHost", + tt.debug, + ) + + tt.expAttrs[add_events.AttrServerHost] = "serverHost" + assert.Equal(t, tt.expKey, eWM.Key) + assert.Equal(t, tt.expAttrs, eWM.EventBundle.Event.Attrs) + assert.Equal(t, tt.expSessionInfo, eWM.SessionInfo) + }) } } @@ -103,11 +241,11 @@ func TestAddEventsRetry(t *testing.T) { const succeedInAttempt = int32(3) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { attempt.Add(1) - cer, err := extract(req) + _, err := extract(req) assert.Nil(t, err, "Error reading request: %v", err) - assert.Equal(t, "b", cer.SessionInfo.ServerType) - assert.Equal(t, "a", cer.SessionInfo.ServerId) + // F assert.Equal(t, "b", cer.SessionInfo.ServerType) + // F assert.Equal(t, "a", cer.SessionInfo.ServerId) status := "success" if attempt.Load() < succeedInAttempt { @@ -136,8 +274,6 @@ func TestAddEventsRetry(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} err = sc.AddEvents([]*add_events.EventBundle{eventBundle1}) @@ -228,8 +364,6 @@ func TestAddEventsRetryAfterSec(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} err1 := sc.AddEvents([]*add_events.EventBundle{eventBundle1}) @@ -319,8 +453,6 @@ func TestAddEventsRetryAfterTime(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} err = sc.AddEvents([]*add_events.EventBundle{eventBundle1}) @@ -349,8 +481,6 @@ func TestAddEventsLargeEvent(t *testing.T) { attempt.Add(1) cer, err := extract(req) assert.Nil(t, err, "Error reading request: %v", err) - assert.Equal(t, "b", cer.SessionInfo.ServerType) - assert.Equal(t, "a", cer.SessionInfo.ServerId) assert.Equal(t, len(cer.Events), 1) wasAttrs := (cer.Events)[0].Attrs @@ -365,30 +495,26 @@ func TestAddEventsLargeEvent(t *testing.T) { } } expectedLengths := map[string]int{ - add_events.AttrBundleKey: 32, - add_events.AttrOrigServerHost: 3, - "0": 990000, - "7": 995000, - "2": 999000, - "5": 999900, - "4": 1000000, - "3": 1000100, - "6": 241661, + "0": 990000, + "7": 995000, + "2": 999000, + "5": 999900, + "4": 1000000, + "3": 1000100, + "6": 241753, } expectedAttrs := map[string]interface{}{ - add_events.AttrBundleKey: "d41d8cd98f00b204e9800998ecf8427e", - add_events.AttrOrigServerHost: "foo", - "0": strings.Repeat("0", expectedLengths["0"]), - "7": strings.Repeat("7", expectedLengths["7"]), - "2": strings.Repeat("2", expectedLengths["2"]), - "5": strings.Repeat("5", expectedLengths["5"]), - "4": strings.Repeat("4", expectedLengths["4"]), - "3": strings.Repeat("3", expectedLengths["3"]), - "6": strings.Repeat("6", expectedLengths["6"]), + "0": strings.Repeat("0", expectedLengths["0"]), + "7": strings.Repeat("7", expectedLengths["7"]), + "2": strings.Repeat("2", expectedLengths["2"]), + "5": strings.Repeat("5", expectedLengths["5"]), + "4": strings.Repeat("4", expectedLengths["4"]), + "3": strings.Repeat("3", expectedLengths["3"]), + "6": strings.Repeat("6", expectedLengths["6"]), } - assert.Equal(t, wasLengths, expectedLengths) - assert.Equal(t, wasAttrs, expectedAttrs, wasAttrs) + assert.Equal(t, expectedLengths, wasLengths) + assert.Equal(t, expectedAttrs, wasAttrs, wasAttrs) wasSuccessful.Store(true) payload, err := json.Marshal(map[string]interface{}{ @@ -410,8 +536,6 @@ func TestAddEventsLargeEvent(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: originalAttrs} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} err = sc.AddEvents([]*add_events.EventBundle{eventBundle1}) @@ -436,9 +560,9 @@ func TestAddEventsLargeEvent(t *testing.T) { assert.Equal(t, 1.0, stats.Buffers.SuccessRate()) assert.Equal(t, 1.0, stats.Transfer.SuccessRate()) assert.Equal(t, uint64(2), stats.Transfer.BuffersProcessed()) - assert.Equal(t, uint64(0x5f006d), stats.Transfer.BytesSent()) - assert.Equal(t, uint64(0x5f006d), stats.Transfer.BytesAccepted()) - assert.Equal(t, 3113014.5, stats.Transfer.AvgBufferBytes()) + assert.Equal(t, uint64(0x5f0073), stats.Transfer.BytesSent()) + assert.Equal(t, uint64(0x5f0073), stats.Transfer.BytesAccepted()) + assert.Equal(t, 3113017.5, stats.Transfer.AvgBufferBytes()) } func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { @@ -454,11 +578,11 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { attempt.Add(1) cer, err := extract(req) assert.Nil(t, err, "Error reading request: %v", err) - assert.Equal(t, "b", cer.SessionInfo.ServerType) - assert.Equal(t, "a", cer.SessionInfo.ServerId) assert.Equal(t, len(cer.Events), 1) wasAttrs := (cer.Events)[0].Attrs + wasSessionInfo := cer.SessionInfo + // if attributes were not modified, then we // should update test, so they are modified assert.NotEqual(t, wasAttrs, originalAttrs) @@ -470,24 +594,25 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { } } expectedLengths := map[string]int{ - add_events.AttrBundleKey: 32, - add_events.AttrOrigServerHost: 3, - "0": 990000, - "7": 995000, - "2": 999000, - "5": 6, + "0": 990000, + "7": 995000, + "2": 999000, + "5": 6, } - expectedAttrs := map[string]interface{}{ - add_events.AttrBundleKey: "d41d8cd98f00b204e9800998ecf8427e", - add_events.AttrOrigServerHost: "foo", - "0": strings.Repeat("\"", expectedLengths["0"]), - "7": strings.Repeat("\"", expectedLengths["7"]), - "2": strings.Repeat("\"", expectedLengths["2"]), - "5": strings.Repeat("\"", expectedLengths["5"]), + expectedAttrs := add_events.EventAttrs{ + "0": strings.Repeat("\"", expectedLengths["0"]), + "7": strings.Repeat("\"", expectedLengths["7"]), + "2": strings.Repeat("\"", expectedLengths["2"]), + "5": strings.Repeat("\"", expectedLengths["5"]), + } + expectedSessionInfo := add_events.SessionInfo{ + add_events.AttrServerHost: "foo", } - assert.Equal(t, wasLengths, expectedLengths) - assert.Equal(t, wasAttrs, expectedAttrs) + + assert.Equal(t, expectedLengths, wasLengths) + assert.Equal(t, expectedAttrs, wasAttrs) + assert.Equal(t, expectedSessionInfo, *wasSessionInfo) wasSuccessful.Store(true) payload, err := json.Marshal(map[string]interface{}{ @@ -510,8 +635,6 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: originalAttrs} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} err = sc.AddEvents([]*add_events.EventBundle{eventBundle1}) @@ -582,9 +705,6 @@ func TestAddEventsWithBufferSweeper(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo - const NumEvents = 10 go func(n int) { @@ -616,8 +736,6 @@ func TestAddEventsDoNotRetryForever(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} err = sc.AddEvents([]*add_events.EventBundle{eventBundle1}) @@ -655,8 +773,6 @@ func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} err = sc.AddEvents([]*add_events.EventBundle{eventBundle1}) @@ -686,8 +802,6 @@ func TestShutdownFinishesWithinExpectedTimeout(t *testing.T) { sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} err = sc.AddEvents([]*add_events.EventBundle{eventBundle1}) @@ -721,8 +835,6 @@ func TestAddEventsAreNotRejectedOncePreviousReqRetriesMaxLifetimeExpired(t *test client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - client.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} @@ -752,8 +864,6 @@ func TestAddEventsAreRejectedOncePreviousReqRetriesMaxLifetimeNotExpired(t *test client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - client.SessionInfo = sessionInfo event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} @@ -787,7 +897,7 @@ func TestAddEventsServerHostLogic(t *testing.T) { name string events []tEvent groupBy []string - expCalls [][]tAttr + expCalls [][]tBundle }{ // when nothing is specified, there is just once call { @@ -800,10 +910,16 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: configServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, }, }, }, @@ -820,10 +936,17 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: configServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, + + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, }, }, }, @@ -840,10 +963,18 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ + { + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: ev1ServerHost}, + }, + }, { - {key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, }, }, }, @@ -860,10 +991,18 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: ev1ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, }, }, }, @@ -880,10 +1019,18 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev3ServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: ev3ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, }, }, }, @@ -901,12 +1048,18 @@ func TestAddEventsServerHostLogic(t *testing.T) { }, }, groupBy: []string{add_events.AttrServerHost}, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev3ServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: ev3ServerHost}, + }, }, { - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, }, }, }, @@ -937,13 +1090,36 @@ func TestAddEventsServerHostLogic(t *testing.T) { }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ + { + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: ev1ServerHost}, + }, + }, { - {key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: ev2ServerHost}, - {key: ev3Value, add_events.AttrOrigServerHost: ev3ServerHost}, - {key: ev4Value, add_events.AttrOrigServerHost: ev4ServerHost}, - {key: ev5Value, add_events.AttrOrigServerHost: ev5ServerHost}, + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrServerHost: ev2ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev3Value}, + info: tInfo{add_events.AttrServerHost: ev3ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev4Value}, + info: tInfo{add_events.AttrServerHost: ev4ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev5Value}, + info: tInfo{add_events.AttrServerHost: ev5ServerHost}, + }, }, }, }, @@ -974,21 +1150,36 @@ func TestAddEventsServerHostLogic(t *testing.T) { }, }, groupBy: []string{key}, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev1Value, add_events.AttrServerHost: ev1ServerHost}, + }, }, { - {key: ev2Value, add_events.AttrOrigServerHost: ev2ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev2Value, add_events.AttrServerHost: ev2ServerHost}, + }, }, { - {key: ev3Value, add_events.AttrOrigServerHost: ev3ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev3Value, add_events.AttrServerHost: ev3ServerHost}, + }, }, { - {key: ev4Value, add_events.AttrOrigServerHost: ev4ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev4Value, add_events.AttrServerHost: ev4ServerHost}, + }, }, { - {key: ev5Value, add_events.AttrOrigServerHost: ev5ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev5Value, add_events.AttrServerHost: ev5ServerHost}, + }, }, }, }, @@ -1006,16 +1197,19 @@ func TestAddEventsServerHostLogic(t *testing.T) { serverHost: ev3ServerHost, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev3ServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: ev3ServerHost}, + }, }, }, }, - // serverHost from the config wins + // serverHost is taken from serverHost if also _origServerHost is set { - name: "serverHost from event.serverHost wins", + name: "serverHost is taken from serverHost if also _origServerHost is set", events: []tEvent{ { attrs: tAttr{ @@ -1025,38 +1219,78 @@ func TestAddEventsServerHostLogic(t *testing.T) { }, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ + { + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: ev1ServerHost}, + }, + }, + }, + }, + // serverHost from the config wins looses if any serverHost is set + { + name: "serverHost from config looses if any __origServerHost is set", + events: []tEvent{ + { + attrs: tAttr{ + key: ev1Value, + add_events.AttrOrigServerHost: ev2ServerHost, + }, + }, + }, + expCalls: [][]tBundle{ + { + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: ev2ServerHost}, + }, + }, + }, + }, + // serverHost from config is used when it's not specified + { + name: "serverHost from config is used when it's not specified", + events: []tEvent{ + { + attrs: tAttr{ + key: ev1Value, + }, + }, + }, + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrServerHost: configServerHost}, + }, }, }, }, } - extractAttrs := func(events []*add_events.Event) []tAttr { - attrs := make([]map[string]interface{}, 0) - for _, ev := range events { - delete(ev.Attrs, add_events.AttrBundleKey) - attrs = append(attrs, ev.Attrs) + extractBundles := func(req add_events.AddEventsRequest) []tBundle { + bundles := make([]tBundle, 0) + delete(*req.SessionInfo, add_events.AttrSessionKey) + for _, ev := range req.Events { + bundles = append(bundles, tBundle{attrs: ev.Attrs, info: *req.SessionInfo}) } - return attrs + return bundles } for _, tt := range tests { t.Run(tt.name, func(*testing.T) { numCalls := atomic.Int32{} lock := sync.Mutex{} - calls := make([][]tAttr, 0) + calls := make([][]tBundle, 0) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { numCalls.Add(1) cer, err := extract(req) assert.Nil(t, err, "Error reading request: %v", err) - assert.Equal(t, "b", cer.SessionInfo.ServerType) - assert.Equal(t, "a", cer.SessionInfo.ServerId) lock.Lock() - calls = append(calls, extractAttrs(cer.Events)) + calls = append(calls, extractBundles(cer)) lock.Unlock() payload, err := json.Marshal(map[string]interface{}{ @@ -1084,8 +1318,6 @@ func TestAddEventsServerHostLogic(t *testing.T) { }) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) require.Nil(t, err) - sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} - sc.SessionInfo = sessionInfo bundles := make([]*add_events.EventBundle, 0) for _, event := range tt.events { @@ -1115,6 +1347,109 @@ func TestAddEventsServerHostLogic(t *testing.T) { } } +func TestAddEventsGroupBy(t *testing.T) { + k1 := "k1" + k2 := "k2" + k3 := "k3" + k4 := "k4" + v1 := "v1" + v2 := "v2" + v3 := "v3" + + tests := []struct { + name string + groupBy []string + expAttrs add_events.EventAttrs + expSessionInfo add_events.SessionInfo + }{ + // when no grouping is used, then attributes are kept + { + name: "empty group by", + groupBy: []string{}, + expAttrs: add_events.EventAttrs{k1: v1, k2: v2, k3: v3}, + expSessionInfo: add_events.SessionInfo{}, + }, + + // group by not specified attribute + { + name: "group by unused attribute", + groupBy: []string{k4}, + expAttrs: add_events.EventAttrs{k1: v1, k2: v2, k3: v3}, + expSessionInfo: add_events.SessionInfo{}, + }, + + // group by two attributes + { + name: "group by two attributes", + groupBy: []string{k1, k2}, + expAttrs: add_events.EventAttrs{k3: v3}, + expSessionInfo: add_events.SessionInfo{k1: v1, k2: v2}, + }, + + // group by two attributes + { + name: "group by two attributes - swapped", + groupBy: []string{k2, k1}, + expAttrs: add_events.EventAttrs{k3: v3}, + expSessionInfo: add_events.SessionInfo{k1: v1, k2: v2}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(*testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + cer, err := extract(req) + + assert.Nil(t, err, "Error reading request: %v", err) + + assert.Equal(t, tt.expAttrs, cer.Events[0].Attrs, tt.name) + tt.expSessionInfo[add_events.AttrServerHost] = "serverHost" + assert.Equal(t, tt.expSessionInfo, *cer.SessionInfo, tt.name) + + payload, err := json.Marshal(map[string]interface{}{ + "status": "success", + "bytesCharged": 42, + }) + assert.NoError(t, err) + l, err := w.Write(payload) + assert.Greater(t, l, 1) + assert.NoError(t, err) + })) + defer server.Close() + + config := newDataSetConfig( + server.URL, + *newBufferSettings( + buffer_config.WithGroupBy(tt.groupBy), + buffer_config.WithRetryMaxElapsedTime(10*RetryBase), + buffer_config.WithRetryInitialInterval(RetryBase), + buffer_config.WithRetryMaxInterval(RetryBase), + ), + server_host_config.NewDefaultDataSetServerHostSettings(), + ) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + require.Nil(t, err) + + bundles := []*add_events.EventBundle{ + { + Event: &add_events.Event{ + Thread: "5", + Sev: 3, + Ts: "1", + Attrs: add_events.EventAttrs{k1: v1, k2: v2, k3: v3}, + ServerHost: "serverHost", + }, + }, + } + + err = sc.AddEvents(bundles) + assert.Nil(t, err) + err = sc.Shutdown() + assert.Nil(t, err) + }) + } +} + func mockServerDefaultPayload(t *testing.T, statusCode int) *httptest.Server { payload, _ := json.Marshal(map[string]interface{}{ "status": "success", diff --git a/pkg/client/client.go b/pkg/client/client.go index a8e36e5..2a28a53 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -64,10 +64,9 @@ func isRetryableStatus(status uint32) bool { // DataSetClient represent a DataSet REST API client type DataSetClient struct { - Id uuid.UUID - Config *config.DataSetConfig - Client *http.Client - SessionInfo *add_events.SessionInfo + Id uuid.UUID + Config *config.DataSetConfig + Client *http.Client // map of known Buffer //TODO introduce cleanup buffers map[string]*buffer.Buffer buffersAllMutex sync.Mutex @@ -123,7 +122,11 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge // update group by, so that logs from the same host // belong to the same session - addServerHostIntoGroupBy(cfg) + adjustGroupByWithSpecialAttributes(cfg) + logger.Info( + "Adjusted config: ", + zap.String("config", cfg.String()), + ) serverHost, err := getServerHost(cfg.ServerHostSettings) if err != nil { @@ -160,7 +163,6 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge Id: id, Config: cfg, Client: client, - SessionInfo: &add_events.SessionInfo{}, buffers: make(map[string]*buffer.Buffer), buffersEnqueued: atomic.Uint64{}, buffersProcessed: atomic.Uint64{}, @@ -229,13 +231,18 @@ func getServerHost(settings server_host_config.DataSetServerHostSettings) (strin return os.Hostname() } -// addServerHostIntoGroupBy adds attributes that indicate from which machine -// the logs are into the groupBy attribute, so that they are part of the same session -func addServerHostIntoGroupBy(cfg *config.DataSetConfig) { +// adjustGroupByWithSpecialAttributes adds attributes that have special meaning in the UI +// serverHost and logfile are used in the drop-down, so they have to be part of the +// SessionInfo. +func adjustGroupByWithSpecialAttributes(cfg *config.DataSetConfig) { groupBy := cfg.BufferSettings.GroupBy - if !slices.Contains(groupBy, add_events.AttrOrigServerHost) { - groupBy = append(groupBy, add_events.AttrOrigServerHost) + if !slices.Contains(groupBy, add_events.AttrLogFile) { + groupBy = append(groupBy, add_events.AttrLogFile) } + if !slices.Contains(groupBy, add_events.AttrServerHost) { + groupBy = append(groupBy, add_events.AttrServerHost) + } + cfg.BufferSettings.GroupBy = groupBy } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index f269dbe..d935970 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -71,10 +71,9 @@ func TestClientBuffer(t *testing.T) { require.Nil(t, err) sessionInfo := add_events.SessionInfo{ - ServerId: "serverId", - ServerType: "testing", + "ServerId": "serverId", + "ServerType": "testing", } - sc.SessionInfo = &sessionInfo event1 := &add_events.Event{ Thread: "TId", @@ -86,7 +85,7 @@ func TestClientBuffer(t *testing.T) { }, } - sc.newBufferForEvents("aaa") + sc.newBufferForEvents("aaa", &sessionInfo) buffer1 := sc.getBuffer("aaa") added, err := buffer1.AddBundle(&add_events.EventBundle{Event: event1}) assert.Nil(t, err) diff --git a/pkg/config/config.go b/pkg/config/config.go index 6f7a707..eb49c74 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -25,6 +25,8 @@ import ( "github.com/scalyr/dataset-go/pkg/buffer_config" ) +const DebugModeDefault = false + // DataSetTokens wrap DataSet access tokens type DataSetTokens struct { WriteLog string @@ -49,6 +51,7 @@ type DataSetConfig struct { Tokens DataSetTokens BufferSettings buffer_config.DataSetBufferSettings ServerHostSettings server_host_config.DataSetServerHostSettings + Debug bool } func NewDefaultDataSetConfig() DataSetConfig { @@ -57,6 +60,7 @@ func NewDefaultDataSetConfig() DataSetConfig { Tokens: DataSetTokens{}, BufferSettings: buffer_config.NewDefaultDataSetBufferSettings(), ServerHostSettings: server_host_config.NewDefaultDataSetServerHostSettings(), + Debug: DebugModeDefault, } } @@ -90,6 +94,13 @@ func WithServerHostSettings(serverHostSettings server_host_config.DataSetServerH } } +func WithDebug(debug bool) DataSetConfigOption { + return func(c *DataSetConfig) error { + c.Debug = debug + return nil + } +} + func FromEnv() DataSetConfigOption { return func(c *DataSetConfig) error { if c.Tokens.WriteLog == "" { @@ -109,6 +120,7 @@ func FromEnv() DataSetConfigOption { } c.BufferSettings = buffer_config.NewDefaultDataSetBufferSettings() c.ServerHostSettings = server_host_config.NewDefaultDataSetServerHostSettings() + c.Debug = DebugModeDefault return nil } } @@ -135,11 +147,12 @@ func (cfg *DataSetConfig) WithOptions(opts ...DataSetConfigOption) (*DataSetConf func (cfg *DataSetConfig) String() string { return fmt.Sprintf( - "Endpoint: %s, Tokens: (%s), BufferSettings: (%s), ServerHostSettings: (%s)", + "Endpoint: %s, Tokens: (%s), BufferSettings: (%s), ServerHostSettings: (%s), Debug: (%t)", cfg.Endpoint, cfg.Tokens.String(), cfg.BufferSettings.String(), cfg.ServerHostSettings.String(), + cfg.Debug, ) } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 28feb32..9472a92 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -105,6 +105,7 @@ func TestNewConfigBasedOnExistingWithNewConfigOptions(t *testing.T) { WithTokens(DataSetTokens{WriteLog: "writeLogOpt1"}), WithBufferSettings(*bufCfg), WithServerHostSettings(*hostCfg), + WithDebug(true), ) // AND bufCfg2, _ := buffer_config.New( @@ -141,6 +142,7 @@ func TestNewConfigBasedOnExistingWithNewConfigOptions(t *testing.T) { ServerHost: "AAA", UseHostName: false, }, cfg5.ServerHostSettings) + assert.Equal(t, true, cfg5.Debug) // AND new config is changed assert.Equal(t, "https://fooOpt2", cfg6.Endpoint) @@ -157,11 +159,12 @@ func TestNewConfigBasedOnExistingWithNewConfigOptions(t *testing.T) { ServerHost: "BBB", UseHostName: true, }, cfg6.ServerHostSettings) + assert.Equal(t, true, cfg6.Debug) } func TestNewDefaultDataSetConfigToString(t *testing.T) { cfg := NewDefaultDataSetConfig() - assert.Equal(t, cfg.String(), "Endpoint: https://app.scalyr.com, Tokens: (WriteLog: false, ReadLog: false, WriteConfig: false, ReadConfig: false), BufferSettings: (MaxLifetime: 5s, MaxSize: 6225920, GroupBy: [], RetryRandomizationFactor: 0.500000, RetryMultiplier: 1.500000, RetryInitialInterval: 5s, RetryMaxInterval: 30s, RetryMaxElapsedTime: 5m0s, RetryShutdownTimeout: 30s), ServerHostSettings: (UseHostName: true, ServerHost: )") + assert.Equal(t, cfg.String(), "Endpoint: https://app.scalyr.com, Tokens: (WriteLog: false, ReadLog: false, WriteConfig: false, ReadConfig: false), BufferSettings: (MaxLifetime: 5s, MaxSize: 6225920, GroupBy: [], RetryRandomizationFactor: 0.500000, RetryMultiplier: 1.500000, RetryInitialInterval: 5s, RetryMaxInterval: 30s, RetryMaxElapsedTime: 5m0s, RetryShutdownTimeout: 30s), ServerHostSettings: (UseHostName: true, ServerHost: ), Debug: (false)") } func TestNewDefaultDataSetConfigIsValid(t *testing.T) { diff --git a/pkg/version/version.go b/pkg/version/version.go index fc3bc32..06369e9 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -17,6 +17,6 @@ package version const ( - Version = "0.14.0" - ReleasedDate = "2023-08-23" + Version = "0.15.0" + ReleasedDate = "2023-11-16" ) diff --git a/pkg/version/version_test.go b/pkg/version/version_test.go index f312dd3..bdd674c 100644 --- a/pkg/version/version_test.go +++ b/pkg/version/version_test.go @@ -23,6 +23,6 @@ import ( ) func TestVersion(t *testing.T) { - assert.Equal(t, "0.14.0", Version) - assert.Equal(t, "2023-08-23", ReleasedDate) + assert.Equal(t, "0.15.0", Version) + assert.Equal(t, "2023-11-16", ReleasedDate) } diff --git a/test/testdata/buffer_test_payload_full.json b/test/testdata/buffer_test_payload_full.json index 8fdb8d2..ac59f9a 100644 --- a/test/testdata/buffer_test_payload_full.json +++ b/test/testdata/buffer_test_payload_full.json @@ -2,9 +2,9 @@ "token":"token", "session":"session", "sessionInfo":{ - "serverType":"serverType", + "region":"region", "serverId":"serverId", - "region":"region" + "serverType":"serverType" }, "events":[ { diff --git a/test/testdata/buffer_test_payload_injection.json b/test/testdata/buffer_test_payload_injection.json index 6090042..16dd929 100644 --- a/test/testdata/buffer_test_payload_injection.json +++ b/test/testdata/buffer_test_payload_injection.json @@ -2,9 +2,9 @@ "token":"token\",\"events\":[{}],\"foo\":\"bar", "session":"session\",\"s\":\"S", "sessionInfo":{ - "serverType":"serverType\",\"sT\":\"T", + "region":"region\",\"r\":\"R", "serverId":"serverId\",\"sI\":\"I", - "region":"region\",\"r\":\"R" + "serverType":"serverType\",\"sT\":\"T" }, "events":[ {