diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 9b357b685d80..7f052f0f59ad 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -54,13 +54,15 @@ func (c *client) pushMetricsData( if err != nil { return exporterhelper.NumTimeSeries(md), consumererror.Permanent(err) } + if len(splunkDataPoints) == 0 { + return numDroppedTimeseries, nil + } body, compressed, err := encodeBody(&c.zippers, splunkDataPoints, c.config.DisableCompression) if err != nil { return exporterhelper.NumTimeSeries(md), consumererror.Permanent(err) } - // TODO the client sends synchronously data as of now. It would make more sense to buffer data coming in, and send batches as supported by Splunk. To batch effectively, a ring buffer approach would work well - based on time and number of messages queued. req, err := http.NewRequest("POST", c.url.String(), body) if err != nil { return exporterhelper.NumTimeSeries(md), consumererror.Permanent(err) @@ -94,6 +96,69 @@ func (c *client) pushMetricsData( return numDroppedTimeseries, nil } +func (c *client) pushTraceData( + ctx context.Context, + td consumerdata.TraceData, +) (droppedSpans int, err error) { + c.wg.Add(1) + defer c.wg.Done() + + splunkEvents, numDroppedSpans := traceDataToSplunk(c.logger, td, c.config) + if len(splunkEvents) == 0 { + return numDroppedSpans, nil + } + + body, compressed, err := encodeBodyEvents(&c.zippers, splunkEvents, c.config.DisableCompression) + if err != nil { + return len(td.Spans), consumererror.Permanent(err) + } + + req, err := http.NewRequest("POST", c.url.String(), body) + if err != nil { + return len(td.Spans), consumererror.Permanent(err) + } + + for k, v := range c.headers { + req.Header.Set(k, v) + } + + if compressed { + req.Header.Set("Content-Encoding", "gzip") + } + + resp, err := c.client.Do(req) + if err != nil { + return len(td.Spans), err + } + + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + + // Splunk accepts all 2XX codes. + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { + err = fmt.Errorf( + "HTTP %d %q", + resp.StatusCode, + http.StatusText(resp.StatusCode)) + return len(td.Spans), err + } + + return numDroppedSpans, nil +} + +func encodeBodyEvents(zippers *sync.Pool, evs []*splunkEvent, disableCompression bool) (bodyReader io.Reader, compressed bool, err error) { + buf := new(bytes.Buffer) + encoder := json.NewEncoder(buf) + for _, e := range evs { + err := encoder.Encode(e) + if err != nil { + return nil, false, err + } + buf.WriteString("\r\n\r\n") + } + return getReader(zippers, buf, disableCompression) +} + func encodeBody(zippers *sync.Pool, dps []*splunkMetric, disableCompression bool) (bodyReader io.Reader, compressed bool, err error) { buf := new(bytes.Buffer) encoder := json.NewEncoder(buf) diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go new file mode 100644 index 000000000000..eb4a6e588171 --- /dev/null +++ b/exporter/splunkhecexporter/client_test.go @@ -0,0 +1,306 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package splunkhecexporter + +import ( + "compress/gzip" + "context" + "errors" + "io/ioutil" + "math" + "net" + "net/http" + "sync" + "testing" + "time" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/testutils/metricstestutils" + "go.uber.org/zap" +) + +func createMetricsData(numberOfDataPoints int) consumerdata.MetricsData { + keys := []string{"k0", "k1"} + values := []string{"v0", "v1"} + + unixSecs := int64(1574092046) + unixNSecs := int64(11 * time.Millisecond) + tsUnix := time.Unix(unixSecs, unixNSecs) + doubleVal := 1234.5678 + doublePt := metricstestutils.Double(tsUnix, doubleVal) + var metrics []*metricspb.Metric + for i := 0; i < numberOfDataPoints; i++ { + metric := metricstestutils.Gauge("gauge_double_with_dims", keys, metricstestutils.Timeseries(tsUnix, values, doublePt)) + metrics = append(metrics, metric) + } + + return consumerdata.MetricsData{ + Node: &commonpb.Node{ + Attributes: map[string]string{ + "k/n0": "vn0", + "k/n1": "vn1", + }, + }, + Resource: &resourcepb.Resource{ + Labels: map[string]string{ + "k/r0": "vr0", + "k/r1": "vr1", + }, + }, + Metrics: metrics, + } +} + +func createTraceData(numberOfTraces int) consumerdata.TraceData { + var traces []*tracepb.Span + for i := 0; i < numberOfTraces; i++ { + span := &tracepb.Span{ + TraceId: []byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + SpanId: []byte{0, 0, 0, 0, 0, 0, 0, 1}, + Name: &tracepb.TruncatableString{Value: "root"}, + Status: &tracepb.Status{}, + StartTime: ×tamp.Timestamp{Seconds: int64(i)}, + } + + traces = append(traces, span) + } + + return consumerdata.TraceData{ + Node: &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: "test-service"}, + }, + Resource: &resourcepb.Resource{ + Labels: map[string]string{ + "resource": "R1", + }, + }, + Spans: traces, + } +} + +type CapturingData struct { + testing *testing.T + receivedRequest chan string + statusCode int + checkCompression bool +} + +func (c *CapturingData) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if c.checkCompression { + if r.Header.Get("Content-Encoding") != "gzip" { + c.testing.Fatal("No compression") + } + } + body, err := ioutil.ReadAll(r.Body) + if err != nil { + panic(err) + } + go func() { + c.receivedRequest <- string(body) + }() + w.WriteHeader(c.statusCode) +} + +func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testing.T) (string, error) { + receivedRequest := make(chan string) + capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !disableCompression} + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + s := &http.Server{ + Handler: &capture, + } + go func() { + panic(s.Serve(listener)) + }() + + factory := Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" + cfg.DisableCompression = disableCompression + cfg.Token = "1234-1234" + + exporter, err := factory.CreateMetricsExporter(zap.NewNop(), cfg) + assert.NoError(t, err) + + md := createMetricsData(numberOfDataPoints) + + err = exporter.ConsumeMetricsData(context.Background(), md) + assert.NoError(t, err) + select { + case request := <-receivedRequest: + return request, nil + case <-time.After(5 * time.Second): + return "", errors.New("Timeout") + } +} + +func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) (string, error) { + receivedRequest := make(chan string) + capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !disableCompression} + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + s := &http.Server{ + Handler: &capture, + } + go func() { + panic(s.Serve(listener)) + }() + + factory := Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" + cfg.DisableCompression = disableCompression + cfg.Token = "1234-1234" + + exporter, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + assert.NoError(t, err) + + td := createTraceData(numberOfTraces) + + err = exporter.ConsumeTraceData(context.Background(), td) + assert.NoError(t, err) + select { + case request := <-receivedRequest: + return request, nil + case <-time.After(5 * time.Second): + return "", errors.New("Timeout") + } +} + +func TestReceiveTraces(t *testing.T) { + actual, err := runTraceExport(true, 3, t) + assert.NoError(t, err) + expected := `{"time":0,"host":"unknown","event":{"trace_id":"AQEBAQEBAQEBAQEBAQEBAQ==","span_id":"AAAAAAAAAAE=","name":{"value":"root"},"start_time":{},"status":{}}}` + expected += "\n\r\n\r\n" + expected += `{"time":1,"host":"unknown","event":{"trace_id":"AQEBAQEBAQEBAQEBAQEBAQ==","span_id":"AAAAAAAAAAE=","name":{"value":"root"},"start_time":{"seconds":1},"status":{}}}` + expected += "\n\r\n\r\n" + expected += `{"time":2,"host":"unknown","event":{"trace_id":"AQEBAQEBAQEBAQEBAQEBAQ==","span_id":"AAAAAAAAAAE=","name":{"value":"root"},"start_time":{"seconds":2},"status":{}}}` + expected += "\n\r\n\r\n" + assert.Equal(t, expected, actual) +} + +func TestReceiveMetrics(t *testing.T) { + actual, err := runTraceExport(true, 3, t) + assert.NoError(t, err) + expected := `{"time":0,"host":"unknown","event":{"trace_id":"AQEBAQEBAQEBAQEBAQEBAQ==","span_id":"AAAAAAAAAAE=","name":{"value":"root"},"start_time":{},"status":{}}}` + expected += "\n\r\n\r\n" + expected += `{"time":1,"host":"unknown","event":{"trace_id":"AQEBAQEBAQEBAQEBAQEBAQ==","span_id":"AAAAAAAAAAE=","name":{"value":"root"},"start_time":{"seconds":1},"status":{}}}` + expected += "\n\r\n\r\n" + expected += `{"time":2,"host":"unknown","event":{"trace_id":"AQEBAQEBAQEBAQEBAQEBAQ==","span_id":"AAAAAAAAAAE=","name":{"value":"root"},"start_time":{"seconds":2},"status":{}}}` + expected += "\n\r\n\r\n" + assert.Equal(t, expected, actual) +} + +func TestReceiveTracesWithCompression(t *testing.T) { + request, err := runTraceExport(false, 5000, t) + assert.NoError(t, err) + assert.NotEqual(t, "", request) +} + +func TestReceiveMetricsWithCompression(t *testing.T) { + request, err := runMetricsExport(false, 5000, t) + assert.NoError(t, err) + assert.NotEqual(t, "", request) +} + +func TestErrorReceived(t *testing.T) { + receivedRequest := make(chan string) + capture := CapturingData{receivedRequest: receivedRequest, statusCode: 500} + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + s := &http.Server{ + Handler: &capture, + } + go func() { + panic(s.Serve(listener)) + }() + + factory := Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" + cfg.DisableCompression = true + cfg.Token = "1234-1234" + + exporter, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + assert.NoError(t, err) + + assert.NoError(t, err) + + td := createTraceData(3) + + err = exporter.ConsumeTraceData(context.Background(), td) + select { + case <-receivedRequest: + case <-time.After(5 * time.Second): + t.Fatal("Should have received request") + } + assert.EqualError(t, err, "HTTP 500 \"Internal Server Error\"") +} + +func TestInvalidTraces(t *testing.T) { + _, err := runTraceExport(false, 0, t) + assert.Error(t, err) +} + +func TestInvalidMetrics(t *testing.T) { + _, err := runMetricsExport(false, 0, t) + assert.Error(t, err) +} + +func TestInvalidURL(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Endpoint = "ftp://example.com:134" + cfg.Token = "1234-1234" + exporter, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + assert.NoError(t, err) + + td := createTraceData(2) + + err = exporter.ConsumeTraceData(context.Background(), td) + assert.EqualError(t, err, "Post \"ftp://example.com:134/services/collector\": unsupported protocol scheme \"ftp\"") +} + +type badJSON struct { + Foo float64 `json:"foo"` +} + +func TestInvalidJson(t *testing.T) { + badEvent := badJSON{ + Foo: math.Inf(1), + } + syncPool := sync.Pool{New: func() interface{} { + return gzip.NewWriter(nil) + }} + evs := []*splunkEvent{ + { + Event: badEvent, + }, + nil, + } + reader, _, err := encodeBodyEvents(&syncPool, evs, false) + assert.Error(t, err, reader) +} diff --git a/exporter/splunkhecexporter/exporter.go b/exporter/splunkhecexporter/exporter.go index 336d3b5d3efe..ccd3e4746138 100644 --- a/exporter/splunkhecexporter/exporter.go +++ b/exporter/splunkhecexporter/exporter.go @@ -27,6 +27,7 @@ import ( "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/obsreport" @@ -40,8 +41,16 @@ const ( dialerKeepAlive = 30 * time.Second ) +// TraceAndMetricExporter sends traces and metrics. +type TraceAndMetricExporter interface { + component.Component + consumer.MetricsConsumerOld + consumer.TraceConsumerOld +} + type splunkExporter struct { pushMetricsData func(ctx context.Context, md consumerdata.MetricsData) (droppedTimeSeries int, err error) + pushTraceData func(ctx context.Context, td consumerdata.TraceData) (numDroppedSpans int, err error) stop func(ctx context.Context) (err error) } @@ -50,12 +59,11 @@ type exporterOptions struct { token string } -// New returns a new Splunk exporter. -func New( +// createExporter returns a new Splunk exporter. +func createExporter( config *Config, logger *zap.Logger, -) (component.MetricsExporterOld, error) { - +) (TraceAndMetricExporter, error) { if config == nil { return nil, errors.New("nil config") } @@ -70,6 +78,7 @@ func New( return splunkExporter{ pushMetricsData: client.pushMetricsData, + pushTraceData: client.pushTraceData, stop: client.stop, }, nil } @@ -125,3 +134,12 @@ func (se splunkExporter) ConsumeMetricsData(ctx context.Context, md consumerdata obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err) return err } + +func (se splunkExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { + ctx = obsreport.StartTraceDataExportOp(ctx, typeStr) + + numDroppedSpans, err := se.pushTraceData(ctx, td) + + obsreport.EndTraceDataExportOp(ctx, len(td.Spans), numDroppedSpans, err) + return err +} diff --git a/exporter/splunkhecexporter/exporter_test.go b/exporter/splunkhecexporter/exporter_test.go index bab31d594c4e..9f0248e02b39 100644 --- a/exporter/splunkhecexporter/exporter_test.go +++ b/exporter/splunkhecexporter/exporter_test.go @@ -37,7 +37,7 @@ import ( ) func TestNew(t *testing.T) { - got, err := New(nil, zap.NewNop()) + got, err := createExporter(nil, zap.NewNop()) assert.EqualError(t, err, "nil config") assert.Nil(t, got) @@ -46,13 +46,9 @@ func TestNew(t *testing.T) { Endpoint: "https://example.com:8088", Timeout: 1 * time.Second, } - got, err = New(config, zap.NewNop()) + got, err = createExporter(config, zap.NewNop()) assert.NoError(t, err) require.NotNil(t, got) - - // This is expected to fail. - err = got.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{}) - assert.Error(t, err) } func TestConsumeMetricsData(t *testing.T) { diff --git a/exporter/splunkhecexporter/factory.go b/exporter/splunkhecexporter/factory.go index 929c1c8b1150..ad50da5c9a79 100644 --- a/exporter/splunkhecexporter/factory.go +++ b/exporter/splunkhecexporter/factory.go @@ -15,10 +15,10 @@ package splunkhecexporter import ( + "errors" "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configmodels" "go.uber.org/zap" ) @@ -59,7 +59,18 @@ func (f *Factory) CreateTraceExporter( logger *zap.Logger, config configmodels.Exporter, ) (component.TraceExporterOld, error) { - return nil, configerror.ErrDataTypeIsNotSupported + if config == nil { + return nil, errors.New("nil config") + } + expCfg := config.(*Config) + + exp, err := createExporter(expCfg, logger) + + if err != nil { + return nil, err + } + + return exp, nil } // CreateMetricsExporter creates a metrics exporter based on this config. @@ -67,10 +78,12 @@ func (f *Factory) CreateMetricsExporter( logger *zap.Logger, config configmodels.Exporter, ) (component.MetricsExporterOld, error) { - + if config == nil { + return nil, errors.New("nil config") + } expCfg := config.(*Config) - exp, err := New(expCfg, logger) + exp, err := createExporter(expCfg, logger) if err != nil { return nil, err diff --git a/exporter/splunkhecexporter/factory_test.go b/exporter/splunkhecexporter/factory_test.go index ed15a866dec7..8dfda34929a0 100644 --- a/exporter/splunkhecexporter/factory_test.go +++ b/exporter/splunkhecexporter/factory_test.go @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configcheck" - "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configmodels" "go.uber.org/zap" ) @@ -44,13 +43,35 @@ func TestCreateMetricsExporter(t *testing.T) { assert.NoError(t, err) } +func TestCreateMetricsExporterNoConfig(t *testing.T) { + factory := Factory{} + _, err := factory.CreateMetricsExporter(zap.NewNop(), nil) + assert.Error(t, err) +} + func TestCreateTraceExporter(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig().(*Config) cfg.Endpoint = "https://example.com:8088/services/collector" cfg.Token = "1234-1234" + + assert.Equal(t, configmodels.Type(typeStr), factory.Type()) + _, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + assert.NoError(t, err) +} + +func TestCreateTraceExporterNoConfig(t *testing.T) { + factory := Factory{} + _, err := factory.CreateTraceExporter(zap.NewNop(), nil) + assert.Error(t, err) +} + +func TestCreateTraceExporterInvalidEndpoint(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Endpoint = "urn:something:12345" _, err := factory.CreateTraceExporter(zap.NewNop(), cfg) - assert.Equal(t, configerror.ErrDataTypeIsNotSupported, err) + assert.Error(t, err) } func TestCreateInstanceViaFactory(t *testing.T) { diff --git a/exporter/splunkhecexporter/metricdata_to_splunk.go b/exporter/splunkhecexporter/metricdata_to_splunk.go index 6fe7c28f9f4b..183d7639b978 100644 --- a/exporter/splunkhecexporter/metricdata_to_splunk.go +++ b/exporter/splunkhecexporter/metricdata_to_splunk.go @@ -53,7 +53,7 @@ var ( ) type splunkMetric struct { - Time int64 `json:"time"` // epoch time + Time float64 `json:"time"` // epoch time Host string `json:"host"` // hostname Source string `json:"source,omitempty"` // optional description of the source of the event; typically the app's name SourceType string `json:"sourcetype,omitempty"` // optional name of a Splunk parsing configuration; this is usually inferred by Splunk @@ -120,11 +120,11 @@ func metricDataToSplunk(logger *zap.Logger, data consumerdata.MetricsData, confi return splunkMetrics, numDroppedTimeSeries, nil } -func timestampToEpochMilliseconds(ts *timestamp.Timestamp) int64 { +func timestampToEpochMilliseconds(ts *timestamp.Timestamp) float64 { if ts == nil { return 0 } - return ts.GetSeconds()*1e3 + int64(ts.GetNanos()/1e6) + return float64(ts.GetSeconds()) + math.Round(float64(ts.GetNanos())/1e6)/1e3 } func mapValues(logger *zap.Logger, metric *metricspb.Metric, value interface{}) (map[string]interface{}, error) { diff --git a/exporter/splunkhecexporter/metricdata_to_splunk_test.go b/exporter/splunkhecexporter/metricdata_to_splunk_test.go index 4c73167f8d2d..fb3470887ffd 100644 --- a/exporter/splunkhecexporter/metricdata_to_splunk_test.go +++ b/exporter/splunkhecexporter/metricdata_to_splunk_test.go @@ -24,6 +24,7 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + "github.com/golang/protobuf/ptypes/timestamp" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/testutil/metricstestutil" @@ -39,7 +40,7 @@ func Test_metricDataToSplunk(t *testing.T) { unixSecs := int64(1574092046) unixNSecs := int64(11 * time.Millisecond) tsUnix := time.Unix(unixSecs, unixNSecs) - tsMSecs := unixSecs*1e3 + unixNSecs/1e6 + tsMSecs := timestampToEpochMilliseconds(×tamp.Timestamp{Seconds: unixSecs, Nanos: int32(unixNSecs)}) doubleVal := 1234.5678 doublePt := metricstestutil.Double(tsUnix, doubleVal) @@ -208,7 +209,7 @@ func getFieldValue(metric *splunkMetric) string { func commonSplunkMetric( metricName string, - ts int64, + ts float64, keys []string, values []string, val interface{}, @@ -229,7 +230,7 @@ func commonSplunkMetric( func expectedFromDistribution( metricName string, - ts int64, + ts float64, keys []string, values []string, distributionTimeSeries *metricspb.TimeSeries, @@ -266,7 +267,7 @@ func expectedFromDistribution( func expectedFromSummary( metricName string, - ts int64, + ts float64, keys []string, values []string, summaryTimeSeries *metricspb.TimeSeries, @@ -294,3 +295,13 @@ func expectedFromSummary( return dps } + +func TestTimestampFormat(t *testing.T) { + ts := timestamp.Timestamp{Seconds: 32, Nanos: 1000345} + assert.Equal(t, 32.001, timestampToEpochMilliseconds(&ts)) +} + +func TestTimestampFormatRounding(t *testing.T) { + ts := timestamp.Timestamp{Seconds: 32, Nanos: 1999345} + assert.Equal(t, 32.002, timestampToEpochMilliseconds(&ts)) +} diff --git a/exporter/splunkhecexporter/tracedata_to_splunk.go b/exporter/splunkhecexporter/tracedata_to_splunk.go new file mode 100644 index 000000000000..69d3977d9496 --- /dev/null +++ b/exporter/splunkhecexporter/tracedata_to_splunk.go @@ -0,0 +1,61 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunkhecexporter + +import ( + "go.opentelemetry.io/collector/consumer/consumerdata" + "go.uber.org/zap" +) + +type splunkEvent struct { + Time float64 `json:"time"` // epoch time + Host string `json:"host"` // hostname + Source string `json:"source,omitempty"` // optional description of the source of the event; typically the app's name + SourceType string `json:"sourcetype,omitempty"` // optional name of a Splunk parsing configuration; this is usually inferred by Splunk + Index string `json:"index,omitempty"` // optional name of the Splunk index to store the event in; not required if the token has a default index set in Splunk + Event interface{} `json:"event"` // Payload of the event. +} + +func traceDataToSplunk(logger *zap.Logger, data consumerdata.TraceData, config *Config) ([]*splunkEvent, int) { + var host string + if data.Resource != nil { + host = data.Resource.Labels[hostnameLabel] + } + if host == "" { + host = unknownHostName + } + numDroppedSpans := 0 + splunkEvents := make([]*splunkEvent, 0) + for _, span := range data.Spans { + if span.StartTime == nil { + logger.Debug( + "Span dropped as it had no start timestamp", + zap.Any("span", span)) + numDroppedSpans++ + continue + } + se := &splunkEvent{ + Time: timestampToEpochMilliseconds(span.StartTime), + Host: host, + Source: config.Source, + SourceType: config.SourceType, + Index: config.Index, + Event: span, + } + splunkEvents = append(splunkEvents, se) + } + + return splunkEvents, numDroppedSpans +} diff --git a/exporter/splunkhecexporter/tracedata_to_splunk_test.go b/exporter/splunkhecexporter/tracedata_to_splunk_test.go new file mode 100644 index 000000000000..fe03a0df2afc --- /dev/null +++ b/exporter/splunkhecexporter/tracedata_to_splunk_test.go @@ -0,0 +1,102 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunkhecexporter + +import ( + "testing" + + v1 "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer/consumerdata" + "go.uber.org/zap" +) + +func Test_traceDataToSplunk(t *testing.T) { + logger := zap.NewNop() + ts := ×tamp.Timestamp{ + Nanos: 0, + } + + tests := []struct { + name string + traceDataFn func() consumerdata.TraceData + wantSplunkEvents []*splunkEvent + wantNumDroppedSpans int + }{ + { + name: "valid", + traceDataFn: func() consumerdata.TraceData { + return consumerdata.TraceData{ + Spans: []*v1.Span{ + makeSpan("myspan", ts), + }, + } + }, + wantSplunkEvents: []*splunkEvent{ + commonSplunkEvent("myspan", ts), + }, + wantNumDroppedSpans: 0, + }, + { + name: "missing_start_ts", + traceDataFn: func() consumerdata.TraceData { + return consumerdata.TraceData{ + Spans: []*v1.Span{ + makeSpan("myspan", nil), + }, + } + }, + wantSplunkEvents: []*splunkEvent{}, + wantNumDroppedSpans: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotEvents, gotNumDroppedSpans := traceDataToSplunk(logger, tt.traceDataFn(), &Config{}) + assert.Equal(t, tt.wantNumDroppedSpans, gotNumDroppedSpans) + assert.Equal(t, len(tt.wantSplunkEvents), len(gotEvents)) + for i, want := range tt.wantSplunkEvents { + assert.EqualValues(t, want, gotEvents[i]) + } + assert.Equal(t, tt.wantSplunkEvents, gotEvents) + }) + } +} + +func makeSpan(name string, ts *timestamp.Timestamp) *v1.Span { + trunceableName := &v1.TruncatableString{ + Value: name, + } + return &v1.Span{ + Name: trunceableName, + StartTime: ts, + } +} + +func commonSplunkEvent( + name string, + ts *timestamp.Timestamp, +) *splunkEvent { + trunceableName := &v1.TruncatableString{ + Value: name, + } + span := v1.Span{Name: trunceableName, StartTime: ts} + return &splunkEvent{ + Time: timestampToEpochMilliseconds(ts), + Host: "unknown", + Event: &span, + } +}