From 9c0ddd4663d1521cbda9d4c04d31b4ffd6a35d91 Mon Sep 17 00:00:00 2001 From: Raphael Philipe Mendes da Silva Date: Wed, 11 Oct 2023 08:06:38 -0700 Subject: [PATCH] [exporter/awscloudwatchlogsexporter] Improve performance of the awscloudwatchlogs exporter (#26692) Adds support to the to parallelism in the awscloudwatchlogs exporter by leveraging the [exporter helper](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md). In this PR, we are adding support to the `num_consumers` configuration in the `sending_queue`. This will allow users to specify the number of consumers that will consume from the sending_queue in parallel. It is possible and straightforward to use this approach because CloudWatch logs [no longer requires that you use a token to control access to the stream that you are writing to](https://aws.amazon.com/about-aws/whats-new/2023/01/amazon-cloudwatch-logs-log-stream-transaction-quota-sequencetoken-requirement/). You can write to the same stream in parallel. To achieve this, this PR does the following: * Create Pusher that is able to push to multiple streams at the same time. * Move lifecycle of the Pusher to the function that is used to consume from the sending queue. This allows you to safely send to multiple streams at the same time without any resource contention since each call to consume logs will not share resources with others that are happening in parallel (one exception is the creation of log streams). Besides that I analyzed the code and removed other limitations: * locks that were not necessary * Limiter that was used to limit the number of requests per stream to 5 per second. [The TPS is much higher now and is per account.](https://aws.amazon.com/about-aws/whats-new/2023/01/amazon-cloudwatch-logs-log-stream-transaction-quota-sequencetoken-requirement/) ** How to review this PR: ** The first 3 commits in this PR were used to refactor the code before making the real changes. Please use the commits to simplify the review process. **Link to tracking Issue:** #26360 **Testing:** - Unit tests were added. - Tested locally sending logs to cloudwatch logs. **Documentation:** Documentation was added describing the new parameters. --------- Signed-off-by: Raphael Silva Co-authored-by: Anthony Mirabella --- .../parallel-awscloudwatchlogsexporter.yaml | 22 +++ exporter/awscloudwatchlogsexporter/README.md | 7 +- exporter/awscloudwatchlogsexporter/config.go | 32 ++-- .../awscloudwatchlogsexporter/config_test.go | 37 ++-- .../awscloudwatchlogsexporter/exporter.go | 106 ++++------- .../exporter_test.go | 147 +++++++++++---- exporter/awscloudwatchlogsexporter/factory.go | 8 +- .../awscloudwatchlogsexporter/factory_test.go | 6 +- .../testdata/config.yaml | 6 + exporter/awsemfexporter/emf_exporter.go | 19 +- exporter/awsemfexporter/emf_exporter_test.go | 12 +- exporter/awsemfexporter/metric_translator.go | 22 +++ internal/aws/cwlogs/cwlog_client.go | 56 +++--- internal/aws/cwlogs/cwlog_client_test.go | 109 +++-------- internal/aws/cwlogs/pusher.go | 177 +++++++++++++----- internal/aws/cwlogs/pusher_test.go | 146 +++++++++------ 16 files changed, 522 insertions(+), 390 deletions(-) create mode 100755 .chloggen/parallel-awscloudwatchlogsexporter.yaml diff --git a/.chloggen/parallel-awscloudwatchlogsexporter.yaml b/.chloggen/parallel-awscloudwatchlogsexporter.yaml new file mode 100755 index 000000000000..e10b71ad5cea --- /dev/null +++ b/.chloggen/parallel-awscloudwatchlogsexporter.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awscloudwatchlogsexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Improve the performance of the awscloudwatchlogsexporter" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26692] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Improve the performance by adding support to multiple consumers and removing locks and limiters that are no longer + necessary. diff --git a/exporter/awscloudwatchlogsexporter/README.md b/exporter/awscloudwatchlogsexporter/README.md index 17c6ff78c165..ce85109deb58 100644 --- a/exporter/awscloudwatchlogsexporter/README.md +++ b/exporter/awscloudwatchlogsexporter/README.md @@ -31,7 +31,10 @@ The following settings can be optionally configured: - `endpoint`: The CloudWatch Logs service endpoint which the requests are forwarded to. [See the CloudWatch Logs endpoints](https://docs.aws.amazon.com/general/latest/gr/cwl_region.html) for a list. - `log_retention`: LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653. - `tags`: Tags is the option to set tags for the CloudWatch Log Group. If specified, please add at most 50 tags. Input is a string to string map like so: { 'key': 'value' }. Keys must be between 1-128 characters and follow the regex pattern: `^([\p{L}\p{Z}\p{N}_.:/=+\-@]+)$`(alphanumerics, whitespace, and _.:/=+-!). Values must be between 1-256 characters and follow the regex pattern: `^([\p{L}\p{Z}\p{N}_.:/=+\-@]*)$`(alphanumerics, whitespace, and _.:/=+-!). [Link to tagging restrictions](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html#:~:text=Required%3A%20Yes-,tags,-The%20key%2Dvalue) -- `raw_log`: Boolean default false. If set to true, only the log message will be exported to CloudWatch Logs. This needs to be set to true for [EMF logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html). +- `raw_log`: Boolean default false. If set to true, only the log message will be exported to CloudWatch Logs. This needs to be set to true for [EMF logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html). +- `sending_queue`: [Parameters for the sending queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), where you can control parallelism and the size of the sending buffer. Obs.: this component will always have a sending queue enabled. + - `num_consumers`: Number of consumers that will consume from the sending queue. This parameter controls how many consumers will consume from the sending queue in parallel. + - `queue_size`: Maximum number of batches kept in memory before dropping; ignored if enabled is false ### Examples @@ -63,7 +66,7 @@ exporters: - If the log group and/or log stream are specified in an EMF log, that EMF log will be exported to that log group and/or log stream (i.e. ignores the log group and log stream defined in the configuration) - The log group and log stream will also be created automatically if they do not already exist. - Example of an EMF log with log group and log stream: - ```json +```json {"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo", "LogStreamName": "Bar", "CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100} ``` - Resource ARNs (Amazon Resource Name (ARN) of the AWS resource running the collector) are currently not supported with the CloudWatch Logs Exporter. diff --git a/exporter/awscloudwatchlogsexporter/config.go b/exporter/awscloudwatchlogsexporter/config.go index 393c98ed7117..76ac501e6d22 100644 --- a/exporter/awscloudwatchlogsexporter/config.go +++ b/exporter/awscloudwatchlogsexporter/config.go @@ -41,9 +41,8 @@ type Config struct { // Values must be between 1-256 characters and follow the regex pattern: ^([\p{L}\p{Z}\p{N}_.:/=+\-@]*)$ Tags map[string]*string `mapstructure:"tags"` - // QueueSettings is a subset of exporterhelper.QueueSettings, - // because only QueueSize is user-settable due to how AWS CloudWatch API works - QueueSettings QueueSettings `mapstructure:"sending_queue"` + // Queue settings frm the exporterhelper + exporterhelper.QueueSettings `mapstructure:"sending_queue"` logger *zap.Logger @@ -54,11 +53,6 @@ type Config struct { RawLog bool `mapstructure:"raw_log,omitempty"` } -type QueueSettings struct { - // QueueSize set the length of the sending queue - QueueSize int `mapstructure:"queue_size"` -} - var _ component.Config = (*Config)(nil) // Validate config @@ -69,9 +63,18 @@ func (config *Config) Validate() error { if config.LogStreamName == "" { return errors.New("'log_stream_name' must be set") } - if config.QueueSettings.QueueSize < 1 { - return errors.New("'sending_queue.queue_size' must be 1 or greater") + + if err := config.QueueSettings.Validate(); err != nil { + return err + } + + // TODO: once QueueSettings.Validate validate the number of consumers remove the next + // verification + + if config.QueueSettings.NumConsumers < 1 { + return errors.New("'sending_queue.num_consumers' must be 1 or greater") } + if retErr := cwlogs.ValidateRetentionValue(config.LogRetention); retErr != nil { return retErr } @@ -79,13 +82,4 @@ func (config *Config) Validate() error { } -func (config *Config) enforcedQueueSettings() exporterhelper.QueueSettings { - return exporterhelper.QueueSettings{ - Enabled: true, - // due to the sequence token, there can be only one request in flight - NumConsumers: 1, - QueueSize: config.QueueSettings.QueueSize, - } -} - // TODO(jbd): Add ARN role to config. diff --git a/exporter/awscloudwatchlogsexporter/config_test.go b/exporter/awscloudwatchlogsexporter/config_test.go index 9b9ffd1da008..b372e8032666 100644 --- a/exporter/awscloudwatchlogsexporter/config_test.go +++ b/exporter/awscloudwatchlogsexporter/config_test.go @@ -42,8 +42,10 @@ func TestLoadConfig(t *testing.T) { LogStreamName: "testing", Endpoint: "", AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), - QueueSettings: QueueSettings{ - QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 1, + QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, }, }, @@ -61,14 +63,20 @@ func TestLoadConfig(t *testing.T) { AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), LogGroupName: "test-2", LogStreamName: "testing", - QueueSettings: QueueSettings{ - QueueSize: 2, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 1, + QueueSize: 2, }, }, }, { id: component.NewIDWithName(metadata.Type, "invalid_queue_size"), - errorMessage: "'sending_queue.queue_size' must be 1 or greater", + errorMessage: "queue size must be positive", + }, + { + id: component.NewIDWithName(metadata.Type, "invalid_num_consumers"), + errorMessage: "'sending_queue.num_consumers' must be 1 or greater", }, { id: component.NewIDWithName(metadata.Type, "invalid_required_field_stream"), @@ -78,10 +86,6 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "invalid_required_field_group"), errorMessage: "'log_group_name' must be set", }, - { - id: component.NewIDWithName(metadata.Type, "invalid_queue_setting"), - errorMessage: `'sending_queue' has invalid keys: enabled, num_consumers`, - }, } for _, tt := range tests { @@ -113,8 +117,10 @@ func TestRetentionValidateCorrect(t *testing.T) { Endpoint: "", LogRetention: 365, AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), - QueueSettings: QueueSettings{ - QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 1, + QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, } assert.NoError(t, component.ValidateConfig(cfg)) @@ -130,7 +136,8 @@ func TestRetentionValidateWrong(t *testing.T) { Endpoint: "", LogRetention: 366, AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), - QueueSettings: QueueSettings{ + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, } @@ -213,8 +220,10 @@ func TestValidateTags(t *testing.T) { Endpoint: "", Tags: tt.tags, AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), - QueueSettings: QueueSettings{ - QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 1, + QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, } if tt.errorMessage != "" { diff --git a/exporter/awscloudwatchlogsexporter/exporter.go b/exporter/awscloudwatchlogsexporter/exporter.go index 2951b02a5977..95040ecab3a4 100644 --- a/exporter/awscloudwatchlogsexporter/exporter.go +++ b/exporter/awscloudwatchlogsexporter/exporter.go @@ -8,7 +8,7 @@ import ( "encoding/hex" "encoding/json" "errors" - "sync" + "fmt" "time" "github.com/aws/aws-sdk-go/aws" @@ -32,8 +32,7 @@ type cwlExporter struct { retryCount int collectorID string svcStructuredLog *cwlogs.Client - pusherMap map[cwlogs.PusherKey]cwlogs.Pusher - pusherMapLock sync.RWMutex + pusherFactory cwlogs.MultiStreamPusherFactory } type awsMetadata struct { @@ -68,16 +67,8 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*cwlExporter return nil, err } - pusherKey := cwlogs.PusherKey{ - LogGroupName: expConfig.LogGroupName, - LogStreamName: expConfig.LogStreamName, - } - - pusher := cwlogs.NewPusher(pusherKey, *awsConfig.MaxRetries, *svcStructuredLog, params.Logger) - - pusherMap := make(map[cwlogs.PusherKey]cwlogs.Pusher) - - pusherMap[pusherKey] = pusher + logStreamManager := cwlogs.NewLogStreamManager(*svcStructuredLog) + multiStreamPusherFactory := cwlogs.NewMultiStreamPusherFactory(logStreamManager, *svcStructuredLog, params.Logger) logsExporter := &cwlExporter{ svcStructuredLog: svcStructuredLog, @@ -85,7 +76,7 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*cwlExporter logger: params.Logger, retryCount: *awsConfig.MaxRetries, collectorID: collectorIdentifier.String(), - pusherMap: pusherMap, + pusherFactory: multiStreamPusherFactory, } return logsExporter, nil } @@ -101,7 +92,7 @@ func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp. params, config, logsPusher.consumeLogs, - exporterhelper.WithQueue(expConfig.enforcedQueueSettings()), + exporterhelper.WithQueue(expConfig.QueueSettings), exporterhelper.WithRetry(expConfig.RetrySettings), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithShutdown(logsPusher.shutdown), @@ -109,74 +100,36 @@ func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp. } func (e *cwlExporter) consumeLogs(_ context.Context, ld plog.Logs) error { - logEvents, _ := logsToCWLogs(e.logger, ld, e.Config) - if len(logEvents) == 0 { - return nil - } + pusher := e.pusherFactory.CreateMultiStreamPusher() + var errs error - logPushersUsed := make(map[cwlogs.PusherKey]cwlogs.Pusher) - for _, logEvent := range logEvents { - pusherKey := cwlogs.PusherKey{ - LogGroupName: logEvent.LogGroupName, - LogStreamName: logEvent.LogStreamName, - } - cwLogsPusher := e.getLogPusher(logEvent) - e.logger.Debug("Adding log event", zap.Any("event", logEvent)) - err := cwLogsPusher.AddLogEntry(logEvent) - if err != nil { - e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents))) - } - logPushersUsed[pusherKey] = cwLogsPusher - } - var flushErrArray []error - for _, pusher := range logPushersUsed { - flushErr := pusher.ForceFlush() - if flushErr != nil { - e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr)) - flushErrArray = append(flushErrArray, flushErr) - } - } - if len(flushErrArray) != 0 { - errorString := "" - for _, err := range flushErrArray { - errorString += err.Error() - } - return errors.New(errorString) - } - return nil -} + err := pushLogsToCWLogs(e.logger, ld, e.Config, pusher) -func (e *cwlExporter) getLogPusher(logEvent *cwlogs.Event) cwlogs.Pusher { - e.pusherMapLock.Lock() - defer e.pusherMapLock.Unlock() - pusherKey := cwlogs.PusherKey{ - LogGroupName: logEvent.LogGroupName, - LogStreamName: logEvent.LogStreamName, + if err != nil { + errs = errors.Join(errs, fmt.Errorf("Error pushing logs: %w", err)) } - if e.pusherMap[pusherKey] == nil { - pusher := cwlogs.NewPusher(pusherKey, e.retryCount, *e.svcStructuredLog, e.logger) - e.pusherMap[pusherKey] = pusher + + err = pusher.ForceFlush() + + if err != nil { + errs = errors.Join(errs, fmt.Errorf("Error flushing logs: %w", err)) } - return e.pusherMap[pusherKey] + + return errs } func (e *cwlExporter) shutdown(_ context.Context) error { - if e.pusherMap != nil { - for _, pusher := range e.pusherMap { - pusher.ForceFlush() - } - } return nil } -func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.Event, int) { +func pushLogsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config, pusher cwlogs.Pusher) error { n := ld.ResourceLogs().Len() + if n == 0 { - return []*cwlogs.Event{}, 0 + return nil } - var dropped int - var out []*cwlogs.Event + var errs error rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { @@ -192,14 +145,17 @@ func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.E event, err := logToCWLog(resourceAttrs, log, config) if err != nil { logger.Debug("Failed to convert to CloudWatch Log", zap.Error(err)) - dropped++ } else { - out = append(out, event) + err := pusher.AddLogEntry(event) + if err != nil { + errs = errors.Join(errs, err) + } } } } } - return out, dropped + + return errs } type cwLogBody struct { @@ -268,8 +224,10 @@ func logToCWLog(resourceAttrs map[string]interface{}, log plog.LogRecord, config Timestamp: aws.Int64(int64(log.Timestamp()) / int64(time.Millisecond)), // in milliseconds Message: aws.String(string(bodyJSON)), }, - LogGroupName: logGroupName, - LogStreamName: logStreamName, + StreamKey: cwlogs.StreamKey{ + LogGroupName: logGroupName, + LogStreamName: logStreamName, + }, GeneratedTime: time.Now(), }, nil } diff --git a/exporter/awscloudwatchlogsexporter/exporter_test.go b/exporter/awscloudwatchlogsexporter/exporter_test.go index acaded5b6c39..9d96bc2068d1 100644 --- a/exporter/awscloudwatchlogsexporter/exporter_test.go +++ b/exporter/awscloudwatchlogsexporter/exporter_test.go @@ -5,11 +5,11 @@ package awscloudwatchlogsexporter import ( "context" + "errors" "testing" "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -29,7 +29,7 @@ func (p *mockPusher) AddLogEntry(_ *cwlogs.Event) error { args := p.Called(nil) errorStr := args.String(0) if errorStr != "" { - return awserr.NewRequestFailure(nil, 400, "").(error) + return errors.New("Add log entry Error") } return nil } @@ -38,7 +38,7 @@ func (p *mockPusher) ForceFlush() error { args := p.Called(nil) errorStr := args.String(0) if errorStr != "" { - return awserr.NewRequestFailure(nil, 400, "").(error) + return errors.New("Push error") } return nil } @@ -63,8 +63,10 @@ func TestLogToCWLog(t *testing.T) { Timestamp: aws.Int64(1609719139), Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`), }, - LogGroupName: "", - LogStreamName: "", + StreamKey: cwlogs.StreamKey{ + LogGroupName: "", + LogStreamName: "", + }, }, }, { @@ -78,8 +80,10 @@ func TestLogToCWLog(t *testing.T) { Timestamp: aws.Int64(1609719139), Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"}}`), }, - LogGroupName: "", - LogStreamName: "", + StreamKey: cwlogs.StreamKey{ + LogGroupName: "", + LogStreamName: "", + }, }, }, { @@ -96,8 +100,10 @@ func TestLogToCWLog(t *testing.T) { Timestamp: aws.Int64(1609719139), Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`), }, - LogGroupName: "tLogGroup", - LogStreamName: "tStreamName", + StreamKey: cwlogs.StreamKey{ + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + }, }, }, { @@ -115,8 +121,10 @@ func TestLogToCWLog(t *testing.T) { Timestamp: aws.Int64(1609719139), Message: aws.String(`hello world`), }, - LogGroupName: "tLogGroup", - LogStreamName: "tStreamName", + StreamKey: cwlogs.StreamKey{ + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + }, }, }, { @@ -134,8 +142,10 @@ func TestLogToCWLog(t *testing.T) { Timestamp: aws.Int64(1609719139), Message: aws.String(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`), }, - LogGroupName: "Foo", - LogStreamName: "tStreamName", + StreamKey: cwlogs.StreamKey{ + LogGroupName: "Foo", + LogStreamName: "tStreamName", + }, }, }, { @@ -153,8 +163,10 @@ func TestLogToCWLog(t *testing.T) { Timestamp: aws.Int64(1609719139), Message: aws.String(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","LogStreamName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`), }, - LogGroupName: "Foo", - LogStreamName: "Foo", + StreamKey: cwlogs.StreamKey{ + LogGroupName: "Foo", + LogStreamName: "Foo", + }, }, }, { @@ -172,8 +184,10 @@ func TestLogToCWLog(t *testing.T) { Timestamp: aws.Int64(1609719139), Message: aws.String(`{"Timestamp":1574109732004,"log_group_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`), }, - LogGroupName: "Foo", - LogStreamName: "tStreamName", + StreamKey: cwlogs.StreamKey{ + LogGroupName: "Foo", + LogStreamName: "tStreamName", + }, }, }, { @@ -191,8 +205,10 @@ func TestLogToCWLog(t *testing.T) { Timestamp: aws.Int64(1609719139), Message: aws.String(`{"Timestamp":1574109732004,"log_group_name":"Foo","log_stream_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`), }, - LogGroupName: "Foo", - LogStreamName: "Foo", + StreamKey: cwlogs.StreamKey{ + LogGroupName: "Foo", + LogStreamName: "Foo", + }, }, }, } @@ -268,6 +284,14 @@ func createPLog(log string) plog.LogRecord { return pLog } +type mockFactory struct { + *mockPusher +} + +func (mf *mockFactory) CreateMultiStreamPusher() cwlogs.Pusher { + return mf.mockPusher +} + func TestConsumeLogs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -278,25 +302,74 @@ func TestConsumeLogs(t *testing.T) { expCfg.LogStreamName = "testStream" expCfg.MaxRetries = 0 exp, err := newCwLogsPusher(expCfg, exportertest.NewNopCreateSettings()) - assert.Nil(t, err) - assert.NotNil(t, exp) - ld := plog.NewLogs() - r := ld.ResourceLogs().AppendEmpty() - r.Resource().Attributes().PutStr("hello", "test") - logRecords := r.ScopeLogs().AppendEmpty().LogRecords() - logRecords.EnsureCapacity(5) - logRecords.AppendEmpty() - assert.Equal(t, 1, ld.LogRecordCount()) - logPusher := new(mockPusher) - logPusher.On("AddLogEntry", nil).Return("").Once() - logPusher.On("ForceFlush", nil).Return("").Twice() - exp.pusherMap[cwlogs.PusherKey{ - LogGroupName: expCfg.LogGroupName, - LogStreamName: expCfg.LogStreamName, - }] = logPusher - require.NoError(t, exp.consumeLogs(ctx, ld)) - require.NoError(t, exp.shutdown(ctx)) + testcases := []struct { + id string + setupLogPusherFunc func(pusher *mockPusher) + shouldError bool + }{ + { + id: "push has no errors", + setupLogPusherFunc: func(pusher *mockPusher) { + pusher.On("AddLogEntry", nil).Return("").Times(3) + pusher.On("ForceFlush", nil).Return("").Once() + }, + }, + { + id: "AddLogEntry has error", + setupLogPusherFunc: func(pusher *mockPusher) { + pusher.On("AddLogEntry", nil).Return("").Once(). + On("AddLogEntry", nil).Return("error").Once(). + On("AddLogEntry", nil).Return("").Once() + pusher.On("ForceFlush", nil).Return("").Once() + }, + shouldError: true, + }, + { + id: "ForceFlush has error", + setupLogPusherFunc: func(pusher *mockPusher) { + pusher.On("AddLogEntry", nil).Return("").Times(3) + pusher.On("ForceFlush", nil).Return("error").Once() + }, + shouldError: true, + }, + } + + for _, testcase := range testcases { + t.Run(testcase.id, func(t *testing.T) { + logPusher := new(mockPusher) + exp.pusherFactory = &mockFactory{logPusher} + assert.Nil(t, err) + assert.NotNil(t, exp) + ld := plog.NewLogs() + r := ld.ResourceLogs().AppendEmpty() + r.Resource().Attributes().PutStr("hello", "test") + logRecords := r.ScopeLogs().AppendEmpty().LogRecords() + + record1 := logRecords.AppendEmpty() + record2 := logRecords.AppendEmpty() + record3 := logRecords.AppendEmpty() + + record1.Body().SetStr("Hello world") + record2.Body().SetStr("Hello world") + record3.Body().SetStr("Hello world") + + require.Equal(t, 3, ld.LogRecordCount()) + + testcase.setupLogPusherFunc(logPusher) + + if !testcase.shouldError { + require.NoError(t, exp.consumeLogs(ctx, ld)) + } else { + require.Error(t, exp.consumeLogs(ctx, ld)) + } + + require.NoError(t, exp.shutdown(ctx)) + + logPusher.AssertNumberOfCalls(t, "ForceFlush", 1) + logPusher.AssertNumberOfCalls(t, "AddLogEntry", 3) + }) + } } func TestNewExporterWithoutRegionErr(t *testing.T) { diff --git a/exporter/awscloudwatchlogsexporter/factory.go b/exporter/awscloudwatchlogsexporter/factory.go index 5973a088b39e..0b8f5b030c49 100644 --- a/exporter/awscloudwatchlogsexporter/factory.go +++ b/exporter/awscloudwatchlogsexporter/factory.go @@ -27,12 +27,14 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { + queueSettings := exporterhelper.NewDefaultQueueSettings() + // For backwards compatibilitiy, we default to 1 consumer + queueSettings.NumConsumers = 1 + return &Config{ RetrySettings: exporterhelper.NewDefaultRetrySettings(), AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), - QueueSettings: QueueSettings{ - QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, - }, + QueueSettings: queueSettings, } } diff --git a/exporter/awscloudwatchlogsexporter/factory_test.go b/exporter/awscloudwatchlogsexporter/factory_test.go index ff34bccb98ad..f647bcd7e0c2 100644 --- a/exporter/awscloudwatchlogsexporter/factory_test.go +++ b/exporter/awscloudwatchlogsexporter/factory_test.go @@ -16,8 +16,10 @@ func TestDefaultConfig_exporterSettings(t *testing.T) { want := &Config{ RetrySettings: exporterhelper.NewDefaultRetrySettings(), AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), - QueueSettings: QueueSettings{ - QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 1, + QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, } assert.Equal(t, want, createDefaultConfig()) diff --git a/exporter/awscloudwatchlogsexporter/testdata/config.yaml b/exporter/awscloudwatchlogsexporter/testdata/config.yaml index 1e7a98361250..4cbff3ee015b 100644 --- a/exporter/awscloudwatchlogsexporter/testdata/config.yaml +++ b/exporter/awscloudwatchlogsexporter/testdata/config.yaml @@ -23,6 +23,12 @@ awscloudwatchlogs/invalid_queue_size: sending_queue: queue_size: 0 +awscloudwatchlogs/invalid_num_consumers: + log_group_name: "test-3" + log_stream_name: "testing" + sending_queue: + num_consumers: 0 + awscloudwatchlogs/invalid_required_field_stream: log_group_name: "test-1" diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 7609e56dff2b..b80dcf12c0e0 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -29,7 +29,7 @@ const ( ) type emfExporter struct { - pusherMap map[cwlogs.PusherKey]cwlogs.Pusher + pusherMap map[cwlogs.StreamKey]cwlogs.Pusher svcStructuredLog *cwlogs.Client config *Config @@ -68,7 +68,7 @@ func newEmfExporter(config *Config, set exporter.CreateSettings) (*emfExporter, metricTranslator: newMetricTranslator(*config), retryCnt: *awsConfig.MaxRetries, collectorID: collectorIdentifier.String(), - pusherMap: map[cwlogs.PusherKey]cwlogs.Pusher{}, + pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{}, } config.logger.Warn("the default value for DimensionRollupOption will be changing to NoDimensionRollup" + @@ -105,8 +105,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e } for _, groupedMetric := range groupedMetrics { - cWMetric := translateGroupedMetricToCWMetric(groupedMetric, emf.config) - putLogEvent, err := translateCWMetricToEMF(cWMetric, emf.config) + putLogEvent, err := translateGroupedMetricToEmf(groupedMetric, emf.config, defaultLogStream) if err != nil { return err } @@ -118,16 +117,8 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e fmt.Println(*putLogEvent.InputLogEvent.Message) } } else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) { - logGroup := groupedMetric.metadata.logGroup - logStream := groupedMetric.metadata.logStream - if logStream == "" { - logStream = defaultLogStream - } - emfPusher := emf.getPusher(cwlogs.PusherKey{ - LogGroupName: logGroup, - LogStreamName: logStream, - }) + emfPusher := emf.getPusher(putLogEvent.StreamKey) if emfPusher != nil { returnError := emfPusher.AddLogEntry(putLogEvent) if returnError != nil { @@ -156,7 +147,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e return nil } -func (emf *emfExporter) getPusher(key cwlogs.PusherKey) cwlogs.Pusher { +func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher { var ok bool if _, ok = emf.pusherMap[key]; !ok { diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index 9d4426c2b819..c9ae63c56697 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -144,7 +144,7 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { }) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - pusherMap, ok := exp.pusherMap[cwlogs.PusherKey{ + pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: expCfg.LogStreamName, }] @@ -175,7 +175,7 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { }) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - pusherMap, ok := exp.pusherMap[cwlogs.PusherKey{ + pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{ LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance", LogStreamName: "test-task-id", }] @@ -206,7 +206,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { }) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - pusherMap, ok := exp.pusherMap[cwlogs.PusherKey{ + pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: "test-task-id", }] @@ -237,7 +237,7 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { }) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - pusherMap, ok := exp.pusherMap[cwlogs.PusherKey{ + pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: expCfg.LogStreamName, }] @@ -264,8 +264,8 @@ func TestPushMetricsDataWithErr(t *testing.T) { logPusher.On("ForceFlush", nil).Return("some error").Once() logPusher.On("ForceFlush", nil).Return("").Once() logPusher.On("ForceFlush", nil).Return("some error").Once() - exp.pusherMap = map[cwlogs.PusherKey]cwlogs.Pusher{} - exp.pusherMap[cwlogs.PusherKey{ + exp.pusherMap = map[cwlogs.StreamKey]cwlogs.Pusher{} + exp.pusherMap[cwlogs.StreamKey{ LogGroupName: "test-logGroupName", LogStreamName: "test-logStreamName", }] = logPusher diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index 3b97f4627f6c..8ee0687585d1 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -445,3 +445,25 @@ func translateCWMetricToEMF(cWMetric *cWMetrics, config *Config) (*cwlogs.Event, return logEvent, nil } + +// Utility function that converts from groupedMetric to a cloudwatch event +func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, defaultLogStream string) (*cwlogs.Event, error) { + cWMetric := translateGroupedMetricToCWMetric(groupedMetric, config) + event, err := translateCWMetricToEMF(cWMetric, config) + + if err != nil { + return nil, err + } + + logGroup := groupedMetric.metadata.logGroup + logStream := groupedMetric.metadata.logStream + + if logStream == "" { + logStream = defaultLogStream + } + + event.LogGroupName = logGroup + event.LogStreamName = logStream + + return event, nil +} diff --git a/internal/aws/cwlogs/cwlog_client.go b/internal/aws/cwlogs/cwlog_client.go index aa0da76adf97..4c58fe80065d 100644 --- a/internal/aws/cwlogs/cwlog_client.go +++ b/internal/aws/cwlogs/cwlog_client.go @@ -54,44 +54,37 @@ func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.Bu // PutLogEvents mainly handles different possible error could be returned from server side, and retries them // if necessary. -func (client *Client) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput, retryCnt int) (*string, error) { +func (client *Client) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput, retryCnt int) error { var response *cloudwatchlogs.PutLogEventsOutput var err error - var token = input.SequenceToken - + // CloudWatch Logs API was changed to ignore the sequenceToken + // PutLogEvents actions are now accepted and never return + // InvalidSequenceTokenException or DataAlreadyAcceptedException even + // if the sequence token is not valid. + // Finally InvalidSequenceTokenException and DataAlreadyAcceptedException are + // never returned by the PutLogEvents action. for i := 0; i <= retryCnt; i++ { - input.SequenceToken = token response, err = client.svc.PutLogEvents(input) if err != nil { var awsErr awserr.Error if !errors.As(err, &awsErr) { client.logger.Error("Cannot cast PutLogEvents error into awserr.Error.", zap.Error(err)) - return token, err + return err } switch e := awsErr.(type) { case *cloudwatchlogs.InvalidParameterException: client.logger.Error("cwlog_client: Error occurs in PutLogEvents, will not retry the request", zap.Error(e), zap.String("LogGroupName", *input.LogGroupName), zap.String("LogStreamName", *input.LogStreamName)) - return token, err - case *cloudwatchlogs.InvalidSequenceTokenException: // Resend log events with new sequence token when InvalidSequenceTokenException happens - client.logger.Warn("cwlog_client: Error occurs in PutLogEvents, will search the next token and retry the request", zap.Error(e)) - token = e.ExpectedSequenceToken - continue - case *cloudwatchlogs.DataAlreadyAcceptedException: // Skip batch if DataAlreadyAcceptedException happens - client.logger.Warn("cwlog_client: Error occurs in PutLogEvents, drop this request and continue to the next request", zap.Error(e)) - token = e.ExpectedSequenceToken - return token, err + return err case *cloudwatchlogs.OperationAbortedException: // Retry request if OperationAbortedException happens client.logger.Warn("cwlog_client: Error occurs in PutLogEvents, will retry the request", zap.Error(e)) - return token, err + return err case *cloudwatchlogs.ServiceUnavailableException: // Retry request if ServiceUnavailableException happens client.logger.Warn("cwlog_client: Error occurs in PutLogEvents, will retry the request", zap.Error(e)) - return token, err + return err case *cloudwatchlogs.ResourceNotFoundException: - tmpToken, tmpErr := client.CreateStream(input.LogGroupName, input.LogStreamName) - if tmpErr == nil { - if tmpToken == "" { - token = nil - } + tmpErr := client.CreateStream(input.LogGroupName, input.LogStreamName) + if tmpErr != nil { + return tmpErr } continue default: @@ -99,10 +92,10 @@ func (client *Client) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput, retr // Drop request if ThrottlingException happens if awsErr.Code() == errCodeThrottlingException { client.logger.Warn("cwlog_client: Error occurs in PutLogEvents, will not retry the request", zap.Error(awsErr), zap.String("LogGroupName", *input.LogGroupName), zap.String("LogStreamName", *input.LogStreamName)) - return token, err + return err } client.logger.Error("cwlog_client: Error occurs in PutLogEvents", zap.Error(awsErr)) - return token, err + return err } } @@ -123,7 +116,6 @@ func (client *Client) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput, retr } if response.NextSequenceToken != nil { - token = response.NextSequenceToken break } } @@ -131,11 +123,11 @@ func (client *Client) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput, retr if err != nil { client.logger.Error("All retries failed for PutLogEvents. Drop this request.", zap.Error(err)) } - return token, err + return err } // Prepare the readiness for the log group and log stream. -func (client *Client) CreateStream(logGroup, streamName *string) (token string, e error) { +func (client *Client) CreateStream(logGroup, streamName *string) error { // CreateLogStream / CreateLogGroup _, err := client.svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: logGroup, @@ -157,8 +149,8 @@ func (client *Client) CreateStream(logGroup, streamName *string) (token string, if err != nil { var awsErr awserr.Error if errors.As(err, &awsErr) { - client.logger.Debug("CreateLogStream / CreateLogGroup has errors related to log retention policy.", zap.String("LogGroupName", *logGroup), zap.String("LogStreamName", *streamName), zap.Error(e)) - return token, err + client.logger.Debug("CreateLogStream / CreateLogGroup has errors related to log retention policy.", zap.String("LogGroupName", *logGroup), zap.String("LogStreamName", *streamName), zap.Error(err)) + return err } } } @@ -173,14 +165,14 @@ func (client *Client) CreateStream(logGroup, streamName *string) (token string, if err != nil { var awsErr awserr.Error if errors.As(err, &awsErr) && awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException { - return "", nil + return nil } - client.logger.Debug("CreateLogStream / CreateLogGroup has errors.", zap.String("LogGroupName", *logGroup), zap.String("LogStreamName", *streamName), zap.Error(e)) - return token, err + client.logger.Debug("CreateLogStream / CreateLogGroup has errors.", zap.String("LogGroupName", *logGroup), zap.String("LogStreamName", *streamName), zap.Error(err)) + return err } // After a log stream is created the token is always empty. - return "", nil + return nil } func newCollectorUserAgentHandler(buildInfo component.BuildInfo, logGroupName string) request.NamedHandler { diff --git a/internal/aws/cwlogs/cwlog_client_test.go b/internal/aws/cwlogs/cwlog_client_test.go index 12de74020df6..8694799822a4 100644 --- a/internal/aws/cwlogs/cwlog_client_test.go +++ b/internal/aws/cwlogs/cwlog_client_test.go @@ -97,10 +97,10 @@ func TestPutLogEvents_HappyCase(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil) client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) + assert.NoError(t, err) } func TestPutLogEvents_HappyCase_SomeRejectedInfo(t *testing.T) { @@ -123,10 +123,10 @@ func TestPutLogEvents_HappyCase_SomeRejectedInfo(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil) client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) + assert.NoError(t, err) } func TestPutLogEvents_NonAWSError(t *testing.T) { @@ -143,10 +143,10 @@ func TestPutLogEvents_NonAWSError(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, errors.New("some random error")).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, previousSequenceToken, *tokenP) + assert.Error(t, err) } func TestPutLogEvents_InvalidParameterException(t *testing.T) { @@ -164,54 +164,10 @@ func TestPutLogEvents_InvalidParameterException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, invalidParameterException).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) - - svc.AssertExpectations(t) - assert.Equal(t, previousSequenceToken, *tokenP) -} - -func TestPutLogEvents_InvalidSequenceTokenException(t *testing.T) { - logger := zap.NewNop() - svc := new(mockCloudWatchLogsClient) - putLogEventsInput := &cloudwatchlogs.PutLogEventsInput{ - LogGroupName: &logGroup, - LogStreamName: &logStreamName, - SequenceToken: &previousSequenceToken, - } - putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: &expectedNextSequenceToken} - awsErr := &cloudwatchlogs.InvalidSequenceTokenException{ExpectedSequenceToken: &expectedNextSequenceToken} - - // the test framework does not support return different result sequentially for the same method call. - svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, awsErr).Once() - svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() - - client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) - - svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) -} - -func TestPutLogEvents_DataAlreadyAcceptedException(t *testing.T) { - logger := zap.NewNop() - svc := new(mockCloudWatchLogsClient) - putLogEventsInput := &cloudwatchlogs.PutLogEventsInput{ - LogGroupName: &logGroup, - LogStreamName: &logStreamName, - SequenceToken: &previousSequenceToken, - } - putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: &expectedNextSequenceToken} - awsErr := &cloudwatchlogs.DataAlreadyAcceptedException{ExpectedSequenceToken: &expectedNextSequenceToken} - - svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, awsErr).Once() - - client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) + assert.Error(t, err) } func TestPutLogEvents_OperationAbortedException(t *testing.T) { @@ -229,10 +185,10 @@ func TestPutLogEvents_OperationAbortedException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, operationAbortedException).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, previousSequenceToken, *tokenP) + assert.Error(t, err) } func TestPutLogEvents_ServiceUnavailableException(t *testing.T) { @@ -250,10 +206,10 @@ func TestPutLogEvents_ServiceUnavailableException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, serviceUnavailableException).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, previousSequenceToken, *tokenP) + assert.Error(t, err) } func TestPutLogEvents_UnknownException(t *testing.T) { @@ -271,10 +227,10 @@ func TestPutLogEvents_UnknownException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, unknownException).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, previousSequenceToken, *tokenP) + assert.Error(t, err) } func TestPutLogEvents_ThrottlingException(t *testing.T) { @@ -292,10 +248,10 @@ func TestPutLogEvents_ThrottlingException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, throttlingException).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, previousSequenceToken, *tokenP) + assert.Error(t, err) } func TestPutLogEvents_ResourceNotFoundException(t *testing.T) { @@ -319,10 +275,10 @@ func TestPutLogEvents_ResourceNotFoundException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) + assert.NoError(t, err) } func TestLogRetention_NeverExpire(t *testing.T) { @@ -354,10 +310,10 @@ func TestLogRetention_NeverExpire(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) + assert.NoError(t, err) } func TestLogRetention_RetentionDaysInputted(t *testing.T) { @@ -390,10 +346,10 @@ func TestLogRetention_RetentionDaysInputted(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() client := newCloudWatchLogClient(svc, 365, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) + assert.NoError(t, err) } func TestSetTags_NotCalled(t *testing.T) { @@ -425,10 +381,10 @@ func TestSetTags_NotCalled(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) + assert.NoError(t, err) } func TestSetTags_Called(t *testing.T) { @@ -461,10 +417,10 @@ func TestSetTags_Called(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() client := newCloudWatchLogClient(svc, 0, sampleTags, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Equal(t, expectedNextSequenceToken, *tokenP) + assert.NoError(t, err) } func TestPutLogEvents_AllRetriesFail(t *testing.T) { @@ -486,10 +442,10 @@ func TestPutLogEvents_AllRetriesFail(t *testing.T) { &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return(new(cloudwatchlogs.CreateLogStreamOutput), nil).Twice() client := newCloudWatchLogClient(svc, 0, nil, logger) - tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + err := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) - assert.Nil(t, tokenP) + assert.Error(t, err) } func TestCreateStream_HappyCase(t *testing.T) { @@ -500,11 +456,10 @@ func TestCreateStream_HappyCase(t *testing.T) { &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return(new(cloudwatchlogs.CreateLogStreamOutput), nil) client := newCloudWatchLogClient(svc, 0, nil, logger) - token, err := client.CreateStream(&logGroup, &logStreamName) + err := client.CreateStream(&logGroup, &logStreamName) svc.AssertExpectations(t) assert.NoError(t, err) - assert.Equal(t, emptySequenceToken, token) } func TestCreateStream_CreateLogStream_ResourceAlreadyExists(t *testing.T) { @@ -517,11 +472,10 @@ func TestCreateStream_CreateLogStream_ResourceAlreadyExists(t *testing.T) { new(cloudwatchlogs.CreateLogStreamOutput), resourceAlreadyExistsException) client := newCloudWatchLogClient(svc, 0, nil, logger) - token, err := client.CreateStream(&logGroup, &logStreamName) + err := client.CreateStream(&logGroup, &logStreamName) svc.AssertExpectations(t) assert.NoError(t, err) - assert.Equal(t, emptySequenceToken, token) } func TestCreateStream_CreateLogStream_ResourceNotFound(t *testing.T) { @@ -542,11 +496,10 @@ func TestCreateStream_CreateLogStream_ResourceNotFound(t *testing.T) { new(cloudwatchlogs.CreateLogStreamOutput), nil).Once() client := newCloudWatchLogClient(svc, 0, nil, logger) - token, err := client.CreateStream(&logGroup, &logStreamName) + err := client.CreateStream(&logGroup, &logStreamName) svc.AssertExpectations(t) assert.NoError(t, err) - assert.Equal(t, emptySequenceToken, token) } type UnknownError struct { diff --git a/internal/aws/cwlogs/pusher.go b/internal/aws/cwlogs/pusher.go index e9e7535ccfb5..d0f940730278 100644 --- a/internal/aws/cwlogs/pusher.go +++ b/internal/aws/cwlogs/pusher.go @@ -23,8 +23,6 @@ const ( perEventHeaderBytes = 26 maxRequestPayloadBytes = 1024 * 1024 * 1 - minPusherIntervalMs = 200 // 5 TPS - truncatedSuffix = "[Truncated...]" eventTimestampLimitInPast = 14 * 24 * time.Hour // None of the log events in the batch can be older than 14 days @@ -40,8 +38,8 @@ type Event struct { InputLogEvent *cloudwatchlogs.InputLogEvent // The time which log generated. GeneratedTime time.Time - LogGroupName string - LogStreamName string + // Identify what is the stream of destination of this event + StreamKey } // NewEvent creates a new log event @@ -55,7 +53,8 @@ func NewEvent(timestampMs int64, message string) *Event { return event } -type PusherKey struct { +// Uniquely identify a cloudwatch logs stream +type StreamKey struct { LogGroupName string LogStreamName string } @@ -111,7 +110,7 @@ type eventBatch struct { } // Create a new log event batch if needed. -func newEventBatch(key PusherKey) *eventBatch { +func newEventBatch(key StreamKey) *eventBatch { return &eventBatch{ putLogEventsInput: &cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String(key.LogGroupName), @@ -187,20 +186,17 @@ type logPusher struct { // log stream name of the current logPusher logStreamName *string - batchUpdateLock sync.Mutex - logEventBatch *eventBatch + logEventBatch *eventBatch - pushLock sync.Mutex - streamToken string // no init value svcStructuredLog Client retryCnt int } // NewPusher creates a logPusher instance -func NewPusher(pusherKey PusherKey, retryCnt int, +func NewPusher(streamKey StreamKey, retryCnt int, svcStructuredLog Client, logger *zap.Logger) Pusher { - pusher := newLogPusher(pusherKey, svcStructuredLog, logger) + pusher := newLogPusher(streamKey, svcStructuredLog, logger) pusher.retryCnt = defaultRetryCount if retryCnt > 0 { @@ -211,15 +207,15 @@ func NewPusher(pusherKey PusherKey, retryCnt int, } // Only create a logPusher, but not start the instance. -func newLogPusher(pusherKey PusherKey, +func newLogPusher(streamKey StreamKey, svcStructuredLog Client, logger *zap.Logger) *logPusher { pusher := &logPusher{ - logGroupName: aws.String(pusherKey.LogGroupName), - logStreamName: aws.String(pusherKey.LogStreamName), + logGroupName: aws.String(streamKey.LogGroupName), + logStreamName: aws.String(streamKey.LogStreamName), svcStructuredLog: svcStructuredLog, logger: logger, } - pusher.logEventBatch = newEventBatch(pusherKey) + pusher.logEventBatch = newEventBatch(streamKey) return pusher } @@ -254,8 +250,6 @@ func (p *logPusher) ForceFlush() error { } func (p *logPusher) pushEventBatch(req interface{}) error { - p.pushLock.Lock() - defer p.pushLock.Unlock() // http://docs.aws.amazon.com/goto/SdkForGoV1/logs-2014-03-28/PutLogEvents // The log events in the batch must be in chronological ordered by their @@ -265,27 +259,9 @@ func (p *logPusher) pushEventBatch(req interface{}) error { logEventBatch.sortLogEvents() putLogEventsInput := logEventBatch.putLogEventsInput - if p.streamToken == "" { - var err error - // log part and retry logic are already done inside the CreateStream - // when the error is not nil, the stream token is "", which is handled in the below logic. - p.streamToken, err = p.svcStructuredLog.CreateStream(p.logGroupName, p.logStreamName) - // TODO Known issue: createStream will fail if the corresponding logGroup and logStream has been created. - // The retry mechanism helps get the first stream token, yet the first batch will be sent twice in this situation. - if err != nil { - p.logger.Warn("Failed to create stream token", zap.Error(err)) - } - } - - if p.streamToken != "" { - putLogEventsInput.SequenceToken = aws.String(p.streamToken) - } - startTime := time.Now() - var tmpToken *string - var err error - tmpToken, err = p.svcStructuredLog.PutLogEvents(putLogEventsInput, p.retryCnt) + err := p.svcStructuredLog.PutLogEvents(putLogEventsInput, p.retryCnt) if err != nil { return err @@ -296,13 +272,6 @@ func (p *logPusher) pushEventBatch(req interface{}) error { zap.Float64("LogEventsSize", float64(logEventBatch.byteTotal)/float64(1024)), zap.Int64("Time", time.Since(startTime).Nanoseconds()/int64(time.Millisecond))) - if tmpToken != nil { - p.streamToken = *tmpToken - } - diff := time.Since(startTime) - if timeLeft := minPusherIntervalMs*time.Millisecond - diff; timeLeft > 0 { - time.Sleep(timeLeft) - } return nil } @@ -311,14 +280,11 @@ func (p *logPusher) addLogEvent(logEvent *Event) *eventBatch { return nil } - p.batchUpdateLock.Lock() - defer p.batchUpdateLock.Unlock() - var prevBatch *eventBatch currentBatch := p.logEventBatch if currentBatch.exceedsLimit(logEvent.eventPayloadBytes()) || !currentBatch.isActive(logEvent.InputLogEvent.Timestamp) { prevBatch = currentBatch - currentBatch = newEventBatch(PusherKey{ + currentBatch = newEventBatch(StreamKey{ LogGroupName: *p.logGroupName, LogStreamName: *p.logStreamName, }) @@ -330,13 +296,11 @@ func (p *logPusher) addLogEvent(logEvent *Event) *eventBatch { } func (p *logPusher) renewEventBatch() *eventBatch { - p.batchUpdateLock.Lock() - defer p.batchUpdateLock.Unlock() var prevBatch *eventBatch if len(p.logEventBatch.putLogEventsInput.LogEvents) > 0 { prevBatch = p.logEventBatch - p.logEventBatch = newEventBatch(PusherKey{ + p.logEventBatch = newEventBatch(StreamKey{ LogGroupName: *p.logGroupName, LogStreamName: *p.logStreamName, }) @@ -344,3 +308,114 @@ func (p *logPusher) renewEventBatch() *eventBatch { return prevBatch } + +// A Pusher that is able to send events to multiple streams. +type multiStreamPusher struct { + logStreamManager LogStreamManager + client Client + pusherMap map[StreamKey]Pusher + logger *zap.Logger +} + +func newMultiStreamPusher(logStreamManager LogStreamManager, client Client, logger *zap.Logger) *multiStreamPusher { + return &multiStreamPusher{ + logStreamManager: logStreamManager, + client: client, + logger: logger, + pusherMap: make(map[StreamKey]Pusher), + } +} + +func (m *multiStreamPusher) AddLogEntry(event *Event) error { + if err := m.logStreamManager.InitStream(event.StreamKey); err != nil { + return err + } + + var pusher Pusher + var ok bool + + if pusher, ok = m.pusherMap[event.StreamKey]; !ok { + pusher = NewPusher(event.StreamKey, 1, m.client, m.logger) + m.pusherMap[event.StreamKey] = pusher + } + + return pusher.AddLogEntry(event) +} + +func (m *multiStreamPusher) ForceFlush() error { + var errs []error + + for _, val := range m.pusherMap { + err := val.ForceFlush() + if err != nil { + errs = append(errs, err) + } + } + + if len(errs) != 0 { + return errors.Join(errs...) + } + + return nil +} + +// Factory for a Pusher that has capability of sending events to multiple log streams +type MultiStreamPusherFactory interface { + CreateMultiStreamPusher() Pusher +} + +type multiStreamPusherFactory struct { + logStreamManager LogStreamManager + logger *zap.Logger + client Client +} + +// Creates a new MultiStreamPusherFactory +func NewMultiStreamPusherFactory(logStreamManager LogStreamManager, client Client, logger *zap.Logger) MultiStreamPusherFactory { + return &multiStreamPusherFactory{ + logStreamManager: logStreamManager, + client: client, + logger: logger, + } +} + +// Factory method to create a Pusher that has support to sending events to multiple log streams +func (msf *multiStreamPusherFactory) CreateMultiStreamPusher() Pusher { + return newMultiStreamPusher(msf.logStreamManager, msf.client, msf.logger) +} + +// Manages the creation of streams +type LogStreamManager interface { + // Initialize a stream so that it can receive logs + // This will make sure that the stream exists and if it does not exist, + // It will create one. Implementations of this method MUST be safe for concurrent use. + InitStream(streamKey StreamKey) error +} + +type logStreamManager struct { + logStreamMutex sync.Mutex + streams map[StreamKey]bool + client Client +} + +func NewLogStreamManager(svcStructuredLog Client) LogStreamManager { + return &logStreamManager{ + client: svcStructuredLog, + streams: make(map[StreamKey]bool), + } +} + +func (lsm *logStreamManager) InitStream(streamKey StreamKey) error { + if _, ok := lsm.streams[streamKey]; !ok { + lsm.logStreamMutex.Lock() + defer lsm.logStreamMutex.Unlock() + + if _, ok := lsm.streams[streamKey]; !ok { + err := lsm.client.CreateStream(&streamKey.LogGroupName, &streamKey.LogStreamName) + lsm.streams[streamKey] = true + return err + } + } + return nil + // does not do anything if stream already exists +} diff --git a/internal/aws/cwlogs/pusher_test.go b/internal/aws/cwlogs/pusher_test.go index c82ecf4a0009..d90ff21c4456 100644 --- a/internal/aws/cwlogs/pusher_test.go +++ b/internal/aws/cwlogs/pusher_test.go @@ -7,7 +7,6 @@ import ( "fmt" "math/rand" "strings" - "sync" "testing" "time" @@ -18,61 +17,6 @@ import ( "go.uber.org/zap" ) -func TestConcurrentPushAndFlush(t *testing.T) { - maxEventPayloadBytes = 128 - - concurrency := 10 - current := time.Now().UnixNano() / 1e6 - collection := map[string]interface{}{} - - emfPusher := newMockPusherWithEventCheck(func(msg string) { - if _, ok := collection[msg]; ok { - t.Errorf("Sending duplicated event message %s", msg) - } else { - collection[msg] = struct{}{} - } - }) - - wg := sync.WaitGroup{} - wg.Add(concurrency) - for i := 0; i < concurrency; i++ { - go func(ii int) { - for j := 0; j < 10; j++ { - err := emfPusher.AddLogEntry(NewEvent(current, fmt.Sprintf("batch-%d-%d", ii, j))) - if err != nil { - t.Errorf("Error adding log entry: %v", err) - } - } - time.Sleep(1000 * time.Millisecond) - err := emfPusher.ForceFlush() - if err != nil { - t.Errorf("Error flushing: %v", err) - - } - wg.Done() - }(i) - } - wg.Wait() - assert.Equal(t, concurrency*10, len(collection)) - - maxEventPayloadBytes = defaultMaxEventPayloadBytes -} - -func newMockPusherWithEventCheck(check func(msg string)) Pusher { - svc := newAlwaysPassMockLogClient(func(args mock.Arguments) { - input := args.Get(0).(*cloudwatchlogs.PutLogEventsInput) - for _, event := range input.LogEvents { - eventMsg := *event.Message - check(eventMsg) - } - }) - p := newLogPusher(PusherKey{ - LogGroupName: logGroup, - LogStreamName: logStreamName, - }, *svc, zap.NewNop()) - return p -} - // logEvent Tests func TestLogEvent_eventPayloadBytes(t *testing.T) { testMessage := "test message" @@ -166,7 +110,7 @@ func TestLogEventBatch_sortLogEvents(t *testing.T) { // Need to remove the tmp state folder after testing. func newMockPusher() *logPusher { svc := newAlwaysPassMockLogClient(func(args mock.Arguments) {}) - return newLogPusher(PusherKey{ + return newLogPusher(StreamKey{ LogGroupName: logGroup, LogStreamName: logStreamName, }, *svc, zap.NewNop()) @@ -182,7 +126,7 @@ var msg = "test log message" func TestPusher_newLogEventBatch(t *testing.T) { p := newMockPusher() - logEventBatch := newEventBatch(PusherKey{ + logEventBatch := newEventBatch(StreamKey{ LogGroupName: logGroup, LogStreamName: logStreamName, }) @@ -247,3 +191,89 @@ func TestAddLogEventWithValidation(t *testing.T) { logEvent = NewEvent(timestampMs, "") assert.NotNil(t, p.addLogEvent(logEvent)) } + +func TestStreamManager(t *testing.T) { + svc := newAlwaysPassMockLogClient(func(args mock.Arguments) {}) + mockCwAPI := svc.svc.(*mockCloudWatchLogsClient) + manager := NewLogStreamManager(*svc) + + // Verify that the stream is created in the first time + assert.Nil(t, manager.InitStream(StreamKey{ + LogGroupName: "foo", + LogStreamName: "bar", + })) + + mockCwAPI.AssertCalled(t, "CreateLogStream", mock.Anything) + mockCwAPI.AssertNumberOfCalls(t, "CreateLogStream", 1) + + // Verify that the stream is not created in the second time + assert.Nil(t, manager.InitStream(StreamKey{ + LogGroupName: "foo", + LogStreamName: "bar", + })) + + mockCwAPI.AssertNumberOfCalls(t, "CreateLogStream", 1) + + // Verify that a different stream is created + assert.Nil(t, manager.InitStream(StreamKey{ + LogGroupName: "foo", + LogStreamName: "bar2", + })) + + mockCwAPI.AssertNumberOfCalls(t, "CreateLogStream", 2) +} + +func TestMultiStreamFactory(t *testing.T) { + svc := newAlwaysPassMockLogClient(func(args mock.Arguments) {}) + logStreamManager := NewLogStreamManager(*svc) + factory := NewMultiStreamPusherFactory(logStreamManager, *svc, nil) + + pusher := factory.CreateMultiStreamPusher() + + assert.IsType(t, &multiStreamPusher{}, pusher) +} + +func TestMultiStreamPusher(t *testing.T) { + inputs := make([]*cloudwatchlogs.PutLogEventsInput, 0) + svc := newAlwaysPassMockLogClient(func(args mock.Arguments) { + input := args.Get(0).(*cloudwatchlogs.PutLogEventsInput) + inputs = append(inputs, input) + }) + mockCwAPI := svc.svc.(*mockCloudWatchLogsClient) + manager := NewLogStreamManager(*svc) + zap := zap.NewNop() + pusher := newMultiStreamPusher(manager, *svc, zap) + event := NewEvent(time.Now().UnixMilli(), "testing") + event.StreamKey.LogGroupName = "foo" + event.StreamKey.LogStreamName = "bar" + event.GeneratedTime = time.Now() + + assert.Nil(t, pusher.AddLogEntry(event)) + assert.Nil(t, pusher.AddLogEntry(event)) + mockCwAPI.AssertNumberOfCalls(t, "PutLogEvents", 0) + assert.Nil(t, pusher.ForceFlush()) + + mockCwAPI.AssertNumberOfCalls(t, "CreateLogStream", 1) + mockCwAPI.AssertNumberOfCalls(t, "PutLogEvents", 1) + + assert.Equal(t, 1, len(inputs)) + assert.Equal(t, 2, len(inputs[0].LogEvents)) + assert.Equal(t, "foo", *inputs[0].LogGroupName) + assert.Equal(t, "bar", *inputs[0].LogStreamName) + + event2 := NewEvent(time.Now().UnixMilli(), "testing") + event2.StreamKey.LogGroupName = "foo" + event2.StreamKey.LogStreamName = "bar2" + event2.GeneratedTime = time.Now() + + assert.Nil(t, pusher.AddLogEntry(event2)) + assert.Nil(t, pusher.ForceFlush()) + + mockCwAPI.AssertNumberOfCalls(t, "CreateLogStream", 2) + mockCwAPI.AssertNumberOfCalls(t, "PutLogEvents", 2) + + assert.Equal(t, 2, len(inputs)) + assert.Equal(t, 1, len(inputs[1].LogEvents)) + assert.Equal(t, "foo", *inputs[1].LogGroupName) + assert.Equal(t, "bar2", *inputs[1].LogStreamName) +}