Skip to content

Commit

Permalink
[processor/logstransformprocessor] Re-enable flaky test and capture e…
Browse files Browse the repository at this point in the history
…rrors better (open-telemetry#9776)

* Re-enable flaky test and capture errors better for logstransformprocessor

* Add additional log to processor simple test

* Cleanup & lint

* trigger new test run

* Add sort in while awaiting change to otel-log-collection

* Update changelog
  • Loading branch information
Sam DeHaan authored and djaglowski committed May 10, 2022
1 parent acd11f1 commit 5cfc697
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
- `prometheusreceiver`: Handle the condition where `up` metric value is NaN (#9253)
- `tanzuobservabilityexporter`: Make metrics stanza in config be optional (#9098)
- `filelogreceiver`: Update Kubernetes examples to fix native OTel logs collection issue where 0 length logs cause errors (#9754)
- `logstransformprocessor`: Resolve node ordering to fix intermittent failures (#9761)

## v0.50.0

Expand Down
8 changes: 4 additions & 4 deletions processor/logstransformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func TestLoadConfig(t *testing.T) {
Operators: stanza.OperatorConfigs{
map[string]interface{}{
"type": "regex_parser",
"regex": "^(?P<time>\\d{4}-\\d{2}-\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$",
"regex": "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$",
"severity": map[string]interface{}{
"parse_from": "body.sev",
"parse_from": "attributes.sev",
},
"timestamp": map[string]interface{}{
"layout": "%Y-%m-%d",
"parse_from": "body.time",
"layout": "%Y-%m-%d %H:%M:%S",
"parse_from": "attributes.time",
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion processor/logstransformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
go.opentelemetry.io/collector/model v0.50.0
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.uber.org/zap v1.21.0
gonum.org/v1/gonum v0.11.0
)

require (
Expand All @@ -36,7 +37,6 @@ require (
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/text v0.3.7 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
Expand Down
47 changes: 34 additions & 13 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,39 @@ package logstransformprocessor // import "github.com/open-telemetry/opentelemetr
import (
"context"
"errors"
"fmt"
"math"
"runtime"
"sync"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
"gonum.org/v1/gonum/graph/topo"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanza"
)

type outputType struct {
logs pdata.Logs
err error
}

type logsTransformProcessor struct {
logger *zap.Logger
config *Config
id config.ComponentID

pipe pipeline.Pipeline
pipe *pipeline.DirectedPipeline
firstOperator operator.Operator
emitter *stanza.LogEmitter
converter *stanza.Converter
fromConverter *stanza.FromPdataConverter
wg sync.WaitGroup
outputChannel chan pdata.Logs
outputChannel chan outputType
}

func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -90,6 +99,15 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos

ltp.pipe = pipe

orderedNodes, err := topo.Sort(pipe.Graph)
if err != nil {
return err
}
if len(orderedNodes) == 0 {
return errors.New("processor requires at least one operator to be configured")
}
ltp.firstOperator = orderedNodes[0].(pipeline.OperatorNode).Operator()

wkrCount := int(math.Max(1, float64(runtime.NumCPU())))
if baseCfg.Converter.WorkerCount > 0 {
wkrCount = baseCfg.Converter.WorkerCount
Expand All @@ -104,7 +122,7 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos
ltp.fromConverter = stanza.NewFromPdataConverter(wkrCount, ltp.logger)
ltp.fromConverter.Start()

ltp.outputChannel = make(chan pdata.Logs)
ltp.outputChannel = make(chan outputType)

// Below we're starting 3 loops:
// * first which reads all the logs translated by the fromConverter and then forwards
Expand Down Expand Up @@ -141,12 +159,15 @@ func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld pdata.Log
case <-doneChan:
ltp.logger.Debug("loop stopped")
return ld, errors.New("processor interrupted")
case pLogs, ok := <-ltp.outputChannel:
case output, ok := <-ltp.outputChannel:
if !ok {
return ld, errors.New("processor encountered an issue receiving logs from stanza operators pipeline")
}
if output.err != nil {
return ld, err
}

return pLogs, nil
return output.logs, nil
}
}
}
Expand All @@ -165,13 +186,14 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) {
case entries, ok := <-ltp.fromConverter.OutChannel():
if !ok {
ltp.logger.Debug("fromConverter channel got closed")
continue
return
}

for _, e := range entries {
// Add item to the first operator of the pipeline manually
if err := ltp.pipe.Operators()[0].Process(ctx, e); err != nil {
ltp.logger.Error("unexpected error encountered adding entries to pipeline", zap.Error(err))
if err := ltp.firstOperator.Process(ctx, e); err != nil {
ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the pipeline: %w", err)}
break
}
}
}
Expand All @@ -188,15 +210,14 @@ func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) {
case <-ctx.Done():
ltp.logger.Debug("emitter loop stopped")
return

case e, ok := <-ltp.emitter.OutChannel():
if !ok {
ltp.logger.Debug("emitter channel got closed")
continue
return
}

if err := ltp.converter.Batch(e); err != nil {
ltp.logger.Error("unexpected error encountered batching logs to converter", zap.Error(err))
ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the converter: %w", err)}
}
}
}
Expand All @@ -215,10 +236,10 @@ func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context) {
case pLogs, ok := <-ltp.converter.OutChannel():
if !ok {
ltp.logger.Debug("converter channel got closed")
continue
return
}

ltp.outputChannel <- pLogs
ltp.outputChannel <- outputType{logs: pLogs, err: nil}
}
}
}
47 changes: 33 additions & 14 deletions processor/logstransformprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ var (
Operators: stanza.OperatorConfigs{
map[string]interface{}{
"type": "regex_parser",
"regex": "^(?P<time>\\d{4}-\\d{2}-\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$",
"regex": "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$",
"severity": map[string]interface{}{
"parse_from": "attributes.sev",
},
"timestamp": map[string]interface{}{
"layout": "%Y-%m-%d",
"layout": "%Y-%m-%d %H:%M:%S",
"parse_from": "attributes.time",
},
},
Expand Down Expand Up @@ -74,14 +74,8 @@ type testLogMessage struct {
attributes *map[string]pdata.Value
}

// Skips test without applying unused rule: https://github.com/dominikh/go-tools/issues/633#issuecomment-606560616
var skip = func(t *testing.T) {
t.Skip("Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9761")
}

func TestLogsTransformProcessor(t *testing.T) {
skip(t)
baseMessage := pcommon.NewValueString("2022-01-01 INFO this is a test")
baseMessage := pcommon.NewValueString("2022-01-01 01:02:03 INFO this is a test message")
spanID := pcommon.NewSpanID([8]byte{0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff})
traceID := pcommon.NewTraceID([16]byte{0x48, 0x01, 0x40, 0xf3, 0xd7, 0x70, 0xa5, 0xae, 0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff})
infoSeverityText := "Info"
Expand All @@ -103,22 +97,44 @@ func TestLogsTransformProcessor(t *testing.T) {
flags: uint32(0x01),
observedTime: parseTime("2006-01-02", "2022-01-02"),
},
{
body: &baseMessage,
spanID: &spanID,
traceID: &traceID,
flags: uint32(0x02),
observedTime: parseTime("2006-01-02", "2022-01-03"),
},
},
parsedMessages: []testLogMessage{
{
body: &baseMessage,
severity: plog.SeverityNumberINFO,
severityText: &infoSeverityText,
attributes: &map[string]pdata.Value{
"msg": pcommon.NewValueString("this is a test"),
"time": pcommon.NewValueString("2022-01-01"),
"msg": pcommon.NewValueString("this is a test message"),
"time": pcommon.NewValueString("2022-01-01 01:02:03"),
"sev": pcommon.NewValueString("INFO"),
},
spanID: &spanID,
traceID: &traceID,
flags: uint32(0x01),
observedTime: parseTime("2006-01-02", "2022-01-02"),
time: parseTime("2006-01-02", "2022-01-01"),
time: parseTime("2006-01-02 15:04:05", "2022-01-01 01:02:03"),
},
{
body: &baseMessage,
severity: plog.SeverityNumberINFO,
severityText: &infoSeverityText,
attributes: &map[string]pdata.Value{
"msg": pcommon.NewValueString("this is a test message"),
"time": pcommon.NewValueString("2022-01-01 01:02:03"),
"sev": pcommon.NewValueString("INFO"),
},
spanID: &spanID,
traceID: &traceID,
flags: uint32(0x02),
observedTime: parseTime("2006-01-02", "2022-01-03"),
time: parseTime("2006-01-02 15:04:05", "2022-01-01 01:02:03"),
},
},
},
Expand All @@ -142,16 +158,19 @@ func TestLogsTransformProcessor(t *testing.T) {
logs := tln.AllLogs()
require.Len(t, logs, 1)

logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Sort()
for i := 0; i < logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len(); i++ {
logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().Sort()
}
assert.EqualValues(t, wantLogData, logs[0])
})
}
}

func generateLogData(messages []testLogMessage) pdata.Logs {
ld := testdata.GenerateLogsOneEmptyResourceLogs()
scope := ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
for _, content := range messages {
log := ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
log := scope.LogRecords().AppendEmpty()
if content.body != nil {
content.body.CopyTo(log.Body())
}
Expand Down
8 changes: 4 additions & 4 deletions processor/logstransformprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ processors:
logstransform:
operators:
- type: regex_parser
regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
regex: '^(?P<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
timestamp:
parse_from: body.time
layout: '%Y-%m-%d'
parse_from: attributes.time
layout: '%Y-%m-%d %H:%M:%S'
severity:
parse_from: body.sev
parse_from: attributes.sev


exporters:
Expand Down

0 comments on commit 5cfc697

Please sign in to comment.