Skip to content

Commit

Permalink
Incorporate comments from PR
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-majlis-s1 committed Nov 20, 2023
1 parent 8d45344 commit 8eebfd0
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 43 deletions.
2 changes: 1 addition & 1 deletion pkg/api/add_events/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

const (
AttrBundleKey = "bundle_key"
AttrSessionKey = "session_key"
AttrServerHost = "serverHost"
AttrOrigServerHost = "__origServerHost"
AttrLogFile = "logfile"
Expand Down
80 changes: 41 additions & 39 deletions pkg/client/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,51 @@ func NewEventWithMeta(
bundle *add_events.EventBundle,
groupBy []string,
serverHost string,
debug bool,
) EventWithMeta {
// initialise
key := ""
info := make(add_events.SessionInfo)

// adjust server host attribute
adjustServerHost(bundle, serverHost)

// construct Key
bundleKey := extractKeyAndUpdateInfo(bundle, groupBy, key, info)

// in debug mode include bundleKey
if debug {
info[add_events.AttrSessionKey] = bundleKey
}

return EventWithMeta{
EventBundle: bundle,
Key: bundleKey,
SessionInfo: info,
}
}

func extractKeyAndUpdateInfo(bundle *add_events.EventBundle, groupBy []string, key string, info add_events.SessionInfo) string {
// construct key and move attributes from attrs to sessionInfo
for _, k := range groupBy {
val, ok := bundle.Event.Attrs[k]
key += k + ":"
if ok {
key += fmt.Sprintf("%s", val)

// move to session info and remove from attributes
info[k] = val
delete(bundle.Event.Attrs, k)
}
key += "___DELIM___"
}

// use md5 to shorten the key
hash := md5.Sum([]byte(key))
return hex.EncodeToString(hash[:])
}

func adjustServerHost(bundle *add_events.EventBundle, serverHost string) {
// figure out serverHost value
usedServerHost := serverHost
// if event's ServerHost is set, use it
Expand Down Expand Up @@ -84,35 +124,6 @@ func NewEventWithMeta(
// so that it can be in the next step moved to sessionInfo
delete(bundle.Event.Attrs, add_events.AttrOrigServerHost)
bundle.Event.Attrs[add_events.AttrServerHost] = usedServerHost

// construct Key
// move attributes from attrs to sessionInfo
for _, k := range groupBy {
val, ok := bundle.Event.Attrs[k]
key += k + ":"
if ok {
key += fmt.Sprintf("%s", val)

// move to session info and remove from attributes
info[k] = val
delete(bundle.Event.Attrs, k)
}
key += "___DELIM___"
}

// use md5 to shorten the key
hash := md5.Sum([]byte(key))
bundleKey := hex.EncodeToString(hash[:])

// during the development we can include the key
// so that we can see this in UI
// info[add_events.AttrBundleKey] = bundleKey

return EventWithMeta{
EventBundle: bundle,
Key: bundleKey,
SessionInfo: info,
}
}

// AddEvents enqueues given events for processing (sending to Dataset).
Expand All @@ -130,16 +141,7 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
// store there information about the host
bundlesWithMeta := make(map[string][]EventWithMeta)
for _, bundle := range bundles {
bef := len(bundle.Event.Attrs)
bWM := NewEventWithMeta(bundle, client.Config.BufferSettings.GroupBy, client.serverHost)
aftB := len(bundle.Event.Attrs)
aftM := len(bWM.EventBundle.Event.Attrs)
client.Logger.Warn("Amount of attributes:",
zap.String("Key", bWM.Key),
zap.Int("AttributesBefore", bef),
zap.Int("AttributesAfter", aftB),
zap.Int("AttributesWithMetaAfter", aftM),
)
bWM := NewEventWithMeta(bundle, client.Config.BufferSettings.GroupBy, client.serverHost, client.Config.Debug)

list, found := bundlesWithMeta[bWM.Key]
if !found {
Expand Down
23 changes: 22 additions & 1 deletion pkg/client/add_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func TestNewEventWithMeta(t *testing.T) {
tests := []struct {
name string
groupBy []string
debug bool
expKey string
expAttrs add_events.EventAttrs
expSessionInfo add_events.SessionInfo
Expand All @@ -130,17 +131,33 @@ func TestNewEventWithMeta(t *testing.T) {
{
name: "empty group by",
groupBy: []string{},
debug: false,
expKey: "d41d8cd98f00b204e9800998ecf8427e",
expAttrs: add_events.EventAttrs{
k1: v1, k2: v2, k3: v3,
},
expSessionInfo: add_events.SessionInfo{},
},

// when no grouping is used, then attributes are kept
{
name: "empty group by",
groupBy: []string{},
debug: true,
expKey: "d41d8cd98f00b204e9800998ecf8427e",
expAttrs: add_events.EventAttrs{
k1: v1, k2: v2, k3: v3,
},
expSessionInfo: add_events.SessionInfo{
add_events.AttrSessionKey: "d41d8cd98f00b204e9800998ecf8427e",
},
},

// group by not specified attribute - 1
{
name: "group by unused attribute - 1",
groupBy: []string{k4},
debug: false,
expKey: "746ea36093d470e90a9c2fbf07c8ed17",
expAttrs: add_events.EventAttrs{
k1: v1, k2: v2, k3: v3,
Expand All @@ -152,6 +169,7 @@ func TestNewEventWithMeta(t *testing.T) {
{
name: "group by unused attribute - 2",
groupBy: []string{k5},
debug: false,
expKey: "d9b675aac82295ce36dad72278888308",
expAttrs: add_events.EventAttrs{
k1: v1, k2: v2, k3: v3,
Expand All @@ -163,6 +181,7 @@ func TestNewEventWithMeta(t *testing.T) {
{
name: "group by two attributes - 1",
groupBy: []string{k1, k2},
debug: false,
expKey: "4410d57b15f30fb22e92fc3e2338f288",
expAttrs: add_events.EventAttrs{
k3: v3,
Expand All @@ -176,6 +195,7 @@ func TestNewEventWithMeta(t *testing.T) {
{
name: "group by two attributes - 2",
groupBy: []string{k2, k1},
debug: false,
expKey: "ce28b69e77b27f012501095cb343e2ae",
expAttrs: add_events.EventAttrs{
k3: v3,
Expand Down Expand Up @@ -203,6 +223,7 @@ func TestNewEventWithMeta(t *testing.T) {
&add_events.EventBundle{Event: event},
tt.groupBy,
"serverHost",
tt.debug,
)

tt.expAttrs[add_events.AttrServerHost] = "serverHost"
Expand Down Expand Up @@ -1250,7 +1271,7 @@ func TestAddEventsServerHostLogic(t *testing.T) {

extractBundles := func(req add_events.AddEventsRequest) []tBundle {
bundles := make([]tBundle, 0)
delete(*req.SessionInfo, add_events.AttrBundleKey)
delete(*req.SessionInfo, add_events.AttrSessionKey)
for _, ev := range req.Events {
bundles = append(bundles, tBundle{attrs: ev.Attrs, info: *req.SessionInfo})
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/scalyr/dataset-go/pkg/buffer_config"
)

const DebugModeDefault = false

// DataSetTokens wrap DataSet access tokens
type DataSetTokens struct {
WriteLog string
Expand All @@ -49,6 +51,7 @@ type DataSetConfig struct {
Tokens DataSetTokens
BufferSettings buffer_config.DataSetBufferSettings
ServerHostSettings server_host_config.DataSetServerHostSettings
Debug bool
}

func NewDefaultDataSetConfig() DataSetConfig {
Expand All @@ -57,6 +60,7 @@ func NewDefaultDataSetConfig() DataSetConfig {
Tokens: DataSetTokens{},
BufferSettings: buffer_config.NewDefaultDataSetBufferSettings(),
ServerHostSettings: server_host_config.NewDefaultDataSetServerHostSettings(),
Debug: DebugModeDefault,
}
}

Expand Down Expand Up @@ -90,6 +94,13 @@ func WithServerHostSettings(serverHostSettings server_host_config.DataSetServerH
}
}

func WithDebug(debug bool) DataSetConfigOption {
return func(c *DataSetConfig) error {
c.Debug = debug
return nil
}
}

func FromEnv() DataSetConfigOption {
return func(c *DataSetConfig) error {
if c.Tokens.WriteLog == "" {
Expand All @@ -109,6 +120,7 @@ func FromEnv() DataSetConfigOption {
}
c.BufferSettings = buffer_config.NewDefaultDataSetBufferSettings()
c.ServerHostSettings = server_host_config.NewDefaultDataSetServerHostSettings()
c.Debug = DebugModeDefault
return nil
}
}
Expand All @@ -135,11 +147,12 @@ func (cfg *DataSetConfig) WithOptions(opts ...DataSetConfigOption) (*DataSetConf

func (cfg *DataSetConfig) String() string {
return fmt.Sprintf(
"Endpoint: %s, Tokens: (%s), BufferSettings: (%s), ServerHostSettings: (%s)",
"Endpoint: %s, Tokens: (%s), BufferSettings: (%s), ServerHostSettings: (%s), Debug: (%t)",
cfg.Endpoint,
cfg.Tokens.String(),
cfg.BufferSettings.String(),
cfg.ServerHostSettings.String(),
cfg.Debug,
)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func TestNewConfigBasedOnExistingWithNewConfigOptions(t *testing.T) {
WithTokens(DataSetTokens{WriteLog: "writeLogOpt1"}),
WithBufferSettings(*bufCfg),
WithServerHostSettings(*hostCfg),
WithDebug(true),
)
// AND
bufCfg2, _ := buffer_config.New(
Expand Down Expand Up @@ -141,6 +142,7 @@ func TestNewConfigBasedOnExistingWithNewConfigOptions(t *testing.T) {
ServerHost: "AAA",
UseHostName: false,
}, cfg5.ServerHostSettings)
assert.Equal(t, true, cfg5.Debug)

// AND new config is changed
assert.Equal(t, "https://fooOpt2", cfg6.Endpoint)
Expand All @@ -157,11 +159,12 @@ func TestNewConfigBasedOnExistingWithNewConfigOptions(t *testing.T) {
ServerHost: "BBB",
UseHostName: true,
}, cfg6.ServerHostSettings)
assert.Equal(t, true, cfg6.Debug)
}

func TestNewDefaultDataSetConfigToString(t *testing.T) {
cfg := NewDefaultDataSetConfig()
assert.Equal(t, cfg.String(), "Endpoint: https://app.scalyr.com, Tokens: (WriteLog: false, ReadLog: false, WriteConfig: false, ReadConfig: false), BufferSettings: (MaxLifetime: 5s, MaxSize: 6225920, GroupBy: [], RetryRandomizationFactor: 0.500000, RetryMultiplier: 1.500000, RetryInitialInterval: 5s, RetryMaxInterval: 30s, RetryMaxElapsedTime: 5m0s, RetryShutdownTimeout: 30s), ServerHostSettings: (UseHostName: true, ServerHost: )")
assert.Equal(t, cfg.String(), "Endpoint: https://app.scalyr.com, Tokens: (WriteLog: false, ReadLog: false, WriteConfig: false, ReadConfig: false), BufferSettings: (MaxLifetime: 5s, MaxSize: 6225920, GroupBy: [], RetryRandomizationFactor: 0.500000, RetryMultiplier: 1.500000, RetryInitialInterval: 5s, RetryMaxInterval: 30s, RetryMaxElapsedTime: 5m0s, RetryShutdownTimeout: 30s), ServerHostSettings: (UseHostName: true, ServerHost: ), Debug: (false)")
}

func TestNewDefaultDataSetConfigIsValid(t *testing.T) {
Expand Down

0 comments on commit 8eebfd0

Please sign in to comment.