Skip to content

Commit

Permalink
Add traces
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed May 9, 2024
1 parent 7fdf150 commit 97d5c19
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 32 deletions.
52 changes: 46 additions & 6 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewFactory() exporter.Factory {
metadata.Type,
createDefaultConfig,
exporter.WithLogs(createLogsRequestExporter, metadata.LogsStability),
exporter.WithTraces(createTracesExporter, metadata.TracesStability),
exporter.WithTraces(createTracesRequestExporter, metadata.TracesStability),
)
}

Expand Down Expand Up @@ -144,7 +144,7 @@ func createLogsRequestExporter(
)
}

func createTracesExporter(ctx context.Context,
func createTracesRequestExporter(ctx context.Context,
set exporter.CreateSettings,
cfg component.Config) (exporter.Traces, error) {

Expand All @@ -156,13 +156,53 @@ func createTracesExporter(ctx context.Context,
if err != nil {
return nil, fmt.Errorf("cannot configure Elasticsearch tracesExporter: %w", err)
}
return exporterhelper.NewTracesExporter(

batchMergeFunc := func(ctx context.Context, r1, r2 exporterhelper.Request) (exporterhelper.Request, error) {
rr1 := r1.(*request)
rr2 := r2.(*request)
req := newRequest(tracesExporter.bulkIndexer)
req.Items = append(rr1.Items, rr2.Items...)
return req, nil
}

batchMergeSplitFunc := func(ctx context.Context, conf exporterbatcher.MaxSizeConfig, optReq, req exporterhelper.Request) ([]exporterhelper.Request, error) {
// FIXME: implement merge split func
panic("not implemented")
return nil, nil
}

marshalRequest := func(req exporterhelper.Request) ([]byte, error) {
b, err := json.Marshal(*req.(*request))
return b, err
}

unmarshalRequest := func(b []byte) (exporterhelper.Request, error) {
var req request
err := json.Unmarshal(b, &req)
req.bulkIndexer = tracesExporter.bulkIndexer
return &req, err
}

batcherCfg := exporterbatcher.NewDefaultConfig()

// FIXME: is this right?
queueCfg := exporterqueue.NewDefaultConfig()
queueCfg.Enabled = cf.QueueSettings.Enabled
queueCfg.NumConsumers = cf.QueueSettings.NumConsumers
queueCfg.QueueSize = cf.QueueSettings.QueueSize

return exporterhelper.NewTracesRequestExporter(
ctx,
set,
cfg,
tracesExporter.pushTraceData,
tracesExporter.traceDataToRequest,
exporterhelper.WithBatcher(batcherCfg, exporterhelper.WithRequestBatchFuncs(batchMergeFunc, batchMergeSplitFunc)),
exporterhelper.WithShutdown(tracesExporter.Shutdown),
exporterhelper.WithQueue(cf.QueueSettings))
exporterhelper.WithRequestQueue(queueCfg,
exporterqueue.NewPersistentQueueFactory[exporterhelper.Request](cf.QueueSettings.StorageID, exporterqueue.PersistentQueueSettings[exporterhelper.Request]{
Marshaler: marshalRequest,
Unmarshaler: unmarshalRequest,
})),
)
}

// set default User-Agent header with BuildInfo if User-Agent is empty
Expand Down
10 changes: 4 additions & 6 deletions exporter/elasticsearchexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ func (e *elasticsearchLogsExporter) Shutdown(ctx context.Context) error {
}

func (e *elasticsearchLogsExporter) logsDataToRequest(ctx context.Context, ld plog.Logs) (exporterhelper.Request, error) {
request := newRequest(e.bulkIndexer)

req := newRequest(e.bulkIndexer)
var errs []error

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
Expand All @@ -88,18 +86,18 @@ func (e *elasticsearchLogsExporter) logsDataToRequest(ctx context.Context, ld pl
item, err := e.logRecordToItem(ctx, resource, logs.At(k), scope)
if err != nil {
if cerr := ctx.Err(); cerr != nil {
return request, cerr
return req, cerr
}

errs = append(errs, err)
continue
}
request.Add(item)
req.Add(item)
}
}
}

return request, errors.Join(errs...)
return req, errors.Join(errs...)
}

func (e *elasticsearchLogsExporter) logRecordToItem(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) (bulkIndexerItem, error) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestExporter_PushEvent(t *testing.T) {

exporter := newTestExporter(t, server.URL)
err := send(t, exporter, `{"message": "test1"}`)
assert.ErrorContains(t, err, "flush failed")
assert.ErrorContains(t, err, "flush failed: [400 Bad Request] oops")

assert.Equal(t, int64(1), attempts.Load())
})
Expand Down
26 changes: 17 additions & 9 deletions exporter/elasticsearchexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -65,10 +66,11 @@ func (e *elasticsearchTracesExporter) Shutdown(ctx context.Context) error {
return e.bulkIndexer.Close(ctx)
}

func (e *elasticsearchTracesExporter) pushTraceData(
func (e *elasticsearchTracesExporter) traceDataToRequest(
ctx context.Context,
td ptrace.Traces,
) error {
) (exporterhelper.Request, error) {
req := newRequest(e.bulkIndexer)
var errs []error
resourceSpans := td.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
Expand All @@ -81,20 +83,23 @@ func (e *elasticsearchTracesExporter) pushTraceData(
spans := scopeSpan.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if err := e.pushTraceRecord(ctx, resource, span, scope); err != nil {
item, err := e.traceRecordToItem(ctx, resource, span, scope)
if err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
return req, cerr
}
errs = append(errs, err)
continue
}
req.Add(item)
}
}
}

return errors.Join(errs...)
return req, errors.Join(errs...)
}

func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error {
func (e *elasticsearchTracesExporter) traceRecordToItem(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) (bulkIndexerItem, error) {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromAttributes(indexPrefix, resource, scope, span)
Expand All @@ -106,16 +111,19 @@ func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resou
if e.logstashFormat.Enabled {
formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now())
if err != nil {
return err
return bulkIndexerItem{}, err
}
fIndex = formattedIndex
}

document, err := e.model.encodeSpan(resource, span, scope)
if err != nil {
return fmt.Errorf("Failed to encode trace record: %w", err)
return bulkIndexerItem{}, fmt.Errorf("Failed to encode trace record: %w", err)
}
return pushDocuments(ctx, fIndex, document, e.bulkIndexer)
return bulkIndexerItem{
Index: fIndex,
Body: document,
}, nil
}

func pushDocuments(ctx context.Context, index string, document []byte, current *esBulkIndexerCurrent) error {
Expand Down
37 changes: 27 additions & 10 deletions exporter/elasticsearchexporter/traces_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ func TestExporter_PushTraceRecord(t *testing.T) {
exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig })
mustSendTraces(t, exporter, `{"message": "test1"}`)

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), attempts.Load())
})
}
Expand All @@ -346,9 +345,9 @@ func TestExporter_PushTraceRecord(t *testing.T) {
})

exporter := newTestTracesExporter(t, server.URL)
mustSendTraces(t, exporter, `{"message": "test1"}`)
err := sendTraces(t, exporter, `{"message": "test1"}`)
assert.ErrorContains(t, err, "flush failed: [400 Bad Request] oops")

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), attempts.Load())
})

Expand Down Expand Up @@ -382,7 +381,6 @@ func TestExporter_PushTraceRecord(t *testing.T) {
exporter := newTestTracesExporter(t, server.URL)
mustSendTraces(t, exporter, `{"message": "test1"}`)

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), attempts.Load())
})

Expand Down Expand Up @@ -420,9 +418,7 @@ func TestExporter_PushTraceRecord(t *testing.T) {
cfg.Retry.InitialInterval = 1 * time.Millisecond
cfg.Retry.MaxInterval = 10 * time.Millisecond
})
mustSendTraces(t, exporter, `{"message": "test1", "idx": 0}`)
mustSendTraces(t, exporter, `{"message": "test2", "idx": 1}`)
mustSendTraces(t, exporter, `{"message": "test3", "idx": 2}`)
mustSendTraces(t, exporter, `{"message": "test1", "idx": 0}`, `{"message": "test2", "idx": 1}`, `{"message": "test3", "idx": 2}`)

wg.Wait() // <- this blocks forever if the trace is not retried

Expand Down Expand Up @@ -462,8 +458,22 @@ func withTestTracesExporterConfig(fns ...func(*Config)) func(string) *Config {
}
}

func mustSendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents string) {
err := pushDocuments(context.TODO(), exporter.index, []byte(contents), exporter.bulkIndexer)
func sendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents ...string) error {
req := request{
bulkIndexer: exporter.bulkIndexer,
Items: nil,
}
for _, body := range contents {
req.Add(bulkIndexerItem{
Index: exporter.index,
Body: []byte(body),
})
}
return req.Export(context.TODO())
}

func mustSendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents ...string) {
err := sendTraces(t, exporter, contents...)
require.NoError(t, err)
}

Expand All @@ -474,6 +484,13 @@ func mustSendTracesWithAttributes(t *testing.T, exporter *elasticsearchTracesExp
span := resSpans.ScopeSpans().At(0).Spans().At(0)
scope := resSpans.ScopeSpans().At(0).Scope()

err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span, scope)
req := request{
bulkIndexer: exporter.bulkIndexer,
Items: nil,
}
item, err := exporter.traceRecordToItem(context.TODO(), resSpans.Resource(), span, scope)
require.NoError(t, err)
req.Add(item)
err = req.Export(context.TODO())
require.NoError(t, err)
}

0 comments on commit 97d5c19

Please sign in to comment.