Skip to content

Commit

Permalink
change emit.Callback signature to accept a slice of tokens
Browse files Browse the repository at this point in the history
But still send each entry one by one to the next consumer in the file input.
Next step is to change Stanza operators to accept batches.
  • Loading branch information
andrzej-stencel committed Nov 8, 2024
1 parent a584e0a commit 6391f55
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 60 deletions.
9 changes: 6 additions & 3 deletions pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ func BenchmarkFileInput(b *testing.B) {
cfg.PollInterval = time.Microsecond

doneChan := make(chan bool, len(files))
callback := func(_ context.Context, token emit.Token) error {
if len(token.Body) == 0 {
doneChan <- true
callback := func(_ context.Context, tokens []emit.Token) error {
for _, token := range tokens {
if len(token.Body) == 0 {
doneChan <- true
break
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/emit/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"
)

type Callback func(ctx context.Context, token Token) error
type Callback func(ctx context.Context, tokens []Token) error

type Token struct {
Body []byte
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/emittest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
)

func Nop(_ context.Context, _ emit.Token) error {
func Nop(_ context.Context, _ []emit.Token) error {
return nil
}
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/emittest/nop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ import (
)

func TestNop(t *testing.T) {
require.NoError(t, Nop(context.Background(), emit.Token{}))
require.NoError(t, Nop(context.Background(), []emit.Token{}))
}
16 changes: 9 additions & 7 deletions pkg/stanza/fileconsumer/internal/emittest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ func NewSink(opts ...SinkOpt) *Sink {
return &Sink{
emitChan: emitChan,
timeout: cfg.timeout,
Callback: func(ctx context.Context, token emit.Token) error {
copied := make([]byte, len(token.Body))
copy(copied, token.Body)
select {
case <-ctx.Done():
return ctx.Err()
case emitChan <- &Call{copied, token.Attributes}:
Callback: func(ctx context.Context, tokens []emit.Token) error {
for _, token := range tokens {
copied := make([]byte, len(token.Body))
copy(copied, token.Body)
select {
case <-ctx.Done():
return ctx.Err()
case emitChan <- &Call{copied, token.Attributes}:
}
}
return nil
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/emittest/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) {
}
go func() {
for _, c := range testCalls {
assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs)))
assert.NoError(t, s.Callback(context.Background(), []emit.Token{emit.NewToken(c.Token, c.Attrs)}))
}
}()
return s, testCalls
Expand Down
16 changes: 6 additions & 10 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,23 +215,19 @@ func (r *Reader) readContents(ctx context.Context) {
tokens = append(tokens, emit.NewToken(copyBody(token), copyAttributes(r.FileAttributes)))

if r.maxBatchSize > 0 && len(tokens) >= r.maxBatchSize {
for _, t := range tokens {
err := r.emitFunc(ctx, t)
if err != nil {
r.set.Logger.Error("failed to process token", zap.Error(err))
}
err := r.emitFunc(ctx, tokens)
if err != nil {
r.set.Logger.Error("failed to process tokens", zap.Error(err))
}
tokens = tokens[:0]
r.Offset = s.Pos()
}
}

if len(tokens) > 0 {
for _, t := range tokens {
err := r.emitFunc(ctx, t)
if err != nil {
r.set.Logger.Error("failed to process token", zap.Error(err))
}
err := r.emitFunc(ctx, tokens)
if err != nil {
r.set.Logger.Error("failed to process tokens", zap.Error(err))
}
r.Offset = s.Pos()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
toBody: toBody,
}

input.fileConsumer, err = c.Config.Build(set, input.emit)
input.fileConsumer, err = c.Config.Build(set, input.emitBatch)
if err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/stanza/operator/input/file/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package file // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"
Expand Down Expand Up @@ -37,6 +38,20 @@ func (i *Input) Stop() error {
return i.fileConsumer.Stop()
}

func (i *Input) emitBatch(ctx context.Context, tokens []emit.Token) error {
var errs []error
for _, token := range tokens {
err := i.emit(ctx, token)
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

func (i *Input) emit(ctx context.Context, token emit.Token) error {
if len(token.Body) == 0 {
return nil
Expand Down
78 changes: 43 additions & 35 deletions receiver/otlpjsonfilereceiver/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,20 @@ func createLogsReceiver(_ context.Context, settings receiver.Settings, configura
if cfg.ReplayFile {
opts = append(opts, fileconsumer.WithNoTracking())
}
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
ctx = obsrecv.StartLogsOp(ctx)
var l plog.Logs
l, err = logsUnmarshaler.UnmarshalLogs(token.Body)
if err != nil {
obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, err)
} else {
logRecordCount := l.LogRecordCount()
if logRecordCount != 0 {
err = logs.ConsumeLogs(ctx, l)
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, tokens []emit.Token) error {
for _, token := range tokens {
ctx = obsrecv.StartLogsOp(ctx)
var l plog.Logs
l, err = logsUnmarshaler.UnmarshalLogs(token.Body)
if err != nil {
obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, err)
} else {
logRecordCount := l.LogRecordCount()
if logRecordCount != 0 {
err = logs.ConsumeLogs(ctx, l)
}
obsrecv.EndLogsOp(ctx, metadata.Type.String(), logRecordCount, err)
}
obsrecv.EndLogsOp(ctx, metadata.Type.String(), logRecordCount, err)
}
return nil
}, opts...)
Expand All @@ -120,17 +122,19 @@ func createMetricsReceiver(_ context.Context, settings receiver.Settings, config
if cfg.ReplayFile {
opts = append(opts, fileconsumer.WithNoTracking())
}
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
ctx = obsrecv.StartMetricsOp(ctx)
var m pmetric.Metrics
m, err = metricsUnmarshaler.UnmarshalMetrics(token.Body)
if err != nil {
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, err)
} else {
if m.ResourceMetrics().Len() != 0 {
err = metrics.ConsumeMetrics(ctx, m)
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, tokens []emit.Token) error {
for _, token := range tokens {
ctx = obsrecv.StartMetricsOp(ctx)
var m pmetric.Metrics
m, err = metricsUnmarshaler.UnmarshalMetrics(token.Body)
if err != nil {
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, err)
} else {
if m.ResourceMetrics().Len() != 0 {
err = metrics.ConsumeMetrics(ctx, m)
}
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), m.MetricCount(), err)
}
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), m.MetricCount(), err)
}
return nil
}, opts...)
Expand All @@ -156,17 +160,19 @@ func createTracesReceiver(_ context.Context, settings receiver.Settings, configu
if cfg.ReplayFile {
opts = append(opts, fileconsumer.WithNoTracking())
}
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
ctx = obsrecv.StartTracesOp(ctx)
var t ptrace.Traces
t, err = tracesUnmarshaler.UnmarshalTraces(token.Body)
if err != nil {
obsrecv.EndTracesOp(ctx, metadata.Type.String(), 0, err)
} else {
if t.ResourceSpans().Len() != 0 {
err = traces.ConsumeTraces(ctx, t)
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, tokens []emit.Token) error {
for _, token := range tokens {
ctx = obsrecv.StartTracesOp(ctx)
var t ptrace.Traces
t, err = tracesUnmarshaler.UnmarshalTraces(token.Body)
if err != nil {
obsrecv.EndTracesOp(ctx, metadata.Type.String(), 0, err)
} else {
if t.ResourceSpans().Len() != 0 {
err = traces.ConsumeTraces(ctx, t)
}
obsrecv.EndTracesOp(ctx, metadata.Type.String(), t.SpanCount(), err)
}
obsrecv.EndTracesOp(ctx, metadata.Type.String(), t.SpanCount(), err)
}
return nil
}, opts...)
Expand All @@ -184,10 +190,12 @@ func createProfilesReceiver(_ context.Context, settings receiver.Settings, confi
if cfg.ReplayFile {
opts = append(opts, fileconsumer.WithNoTracking())
}
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
p, _ := profilesUnmarshaler.UnmarshalProfiles(token.Body)
if p.ResourceProfiles().Len() != 0 {
_ = profiles.ConsumeProfiles(ctx, p)
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, tokens []emit.Token) error {
for _, token := range tokens {
p, _ := profilesUnmarshaler.UnmarshalProfiles(token.Body)
if p.ResourceProfiles().Len() != 0 {
_ = profiles.ConsumeProfiles(ctx, p)
}
}
return nil
}, opts...)
Expand Down

0 comments on commit 6391f55

Please sign in to comment.