diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go index d44f09d6a04..7fcb671bf84 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go @@ -384,15 +384,18 @@ func (l *logStream) collectBatch() { eventBufferNegative := eventBufferAge < 0 if eventBufferExpired || eventBufferNegative { events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + eventBuffer = eventBuffer[:0] } } l.publishBatch(events) events = events[:0] case msg, more := <-l.messages: if !more { - // Flush event buffer + // Flush event buffer and release resources events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + eventBuffer = eventBuffer[:0] l.publishBatch(events) + events = events[:0] return } if eventBufferTimestamp == 0 { @@ -400,17 +403,13 @@ func (l *logStream) collectBatch() { } unprocessedLine := msg.Line if l.multilinePattern != nil { - if l.multilinePattern.Match(unprocessedLine) { - // This is a new log event so flush the current eventBuffer to events + if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent { + // This is a new log event or we will exceed max bytes per event + // so flush the current eventBuffer to events and reset timestamp events = l.processEvent(events, eventBuffer, eventBufferTimestamp) eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) eventBuffer = eventBuffer[:0] } - // If we will exceed max bytes per event flush the current event buffer before appending - if len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent { - events = l.processEvent(events, eventBuffer, eventBufferTimestamp) - eventBuffer = eventBuffer[:0] - } // Append new line processedLine := append(unprocessedLine, "\n"...) eventBuffer = append(eventBuffer, processedLine...) diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go index e3862ffebe5..8688026a0da 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -641,7 +641,7 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { }) // Fire ticker batchPublishFrequency seconds later - ticks <- time.Now().Add(batchPublishFrequency * time.Second) + ticks <- time.Now().Add(batchPublishFrequency + time.Second) // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency) argument := <-mockClient.putLogEventsArgument @@ -649,6 +649,20 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event") assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message") + // Log an event 1 second later + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now().Add(time.Second), + }) + + // Fire ticker another batchPublishFrequency seconds later + ticks <- time.Now().Add(2*batchPublishFrequency + time.Second) + + // Verify the event buffer is truly flushed - we should only receive a single event + argument = <-mockClient.putLogEventsArgument + assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") + assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event") + assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message") stream.Close() }