diff --git a/.chloggen/awss3-sumologic-ic-marshaller.yaml b/.chloggen/awss3-sumologic-ic-marshaller.yaml new file mode 100644 index 000000000000..996d93cc0aea --- /dev/null +++ b/.chloggen/awss3-sumologic-ic-marshaller.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awss3exporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add Sumo Logic Installed Collector marshaler + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23212] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/awss3exporter/README.md b/exporter/awss3exporter/README.md index d9d7dfc50a1f..f1af0f270c38 100644 --- a/exporter/awss3exporter/README.md +++ b/exporter/awss3exporter/README.md @@ -28,9 +28,17 @@ The following exporter configuration parameters are supported. | `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | | `s3_partition` | time granularity of S3 key: hour or minute | "minute" | | `file_prefix` | file prefix defined by user | | -| `marshaler` | marshaler used to produce output data otlp_json | | +| `marshaler` | marshaler used to produce output data | `otlp_json` | | `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | +### Marshaler + +Marshaler determines the format of data sent to AWS S3. Currently, the following marshalers are implemented: + +- `otlp_json` (default): the [OpenTelemetry Protocol format](https://github.com/open-telemetry/opentelemetry-proto), represented as json. +- `sumo_ic`: the [Sumo Logic Installed Collector Archive format](https://help.sumologic.com/docs/manage/data-archiving/archive/). + **This format is supported only for logs.** + # Example Configuration Following example configuration defines to store output in 'eu-central' region and bucket named 'databucket'. diff --git a/exporter/awss3exporter/config.go b/exporter/awss3exporter/config.go index 008f63996288..e54d0eb72201 100644 --- a/exporter/awss3exporter/config.go +++ b/exporter/awss3exporter/config.go @@ -24,6 +24,7 @@ type MarshalerType string const ( OtlpJSON MarshalerType = "otlp_json" + SumoIC MarshalerType = "sumo_ic" ) // Config contains the main configuration options for the s3 exporter diff --git a/exporter/awss3exporter/config_test.go b/exporter/awss3exporter/config_test.go index 3d21888ec428..b4d9013ab0e0 100644 --- a/exporter/awss3exporter/config_test.go +++ b/exporter/awss3exporter/config_test.go @@ -124,3 +124,29 @@ func TestConfig_Validate(t *testing.T) { }) } } + +func TestMarshallerName(t *testing.T) { + factories, err := otelcoltest.NopFactories() + assert.Nil(t, err) + + factory := NewFactory() + factories.Exporters[factory.Type()] = factory + cfg, err := otelcoltest.LoadConfigAndValidate( + filepath.Join("testdata", "marshaler.yaml"), factories) + + require.NoError(t, err) + require.NotNil(t, cfg) + + e := cfg.Exporters[component.NewID("awss3")].(*Config) + + assert.Equal(t, e, + &Config{ + S3Uploader: S3UploaderConfig{ + Region: "us-east-1", + S3Bucket: "foo", + S3Partition: "minute", + }, + MarshalerName: "sumo_ic", + }, + ) +} diff --git a/exporter/awss3exporter/factory.go b/exporter/awss3exporter/factory.go index 355c371cef0a..61c64807f483 100644 --- a/exporter/awss3exporter/factory.go +++ b/exporter/awss3exporter/factory.go @@ -5,6 +5,7 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect import ( "context" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" @@ -57,6 +58,10 @@ func createMetricsExporter(ctx context.Context, return nil, err } + if config.(*Config).MarshalerName == SumoIC { + return nil, fmt.Errorf("metrics are not supported by sumo_ic output format") + } + return exporterhelper.NewMetricsExporter(ctx, params, config, s3Exporter.ConsumeMetrics) @@ -71,6 +76,10 @@ func createTracesExporter(ctx context.Context, return nil, err } + if config.(*Config).MarshalerName == SumoIC { + return nil, fmt.Errorf("traces are not supported by sumo_ic output format") + } + return exporterhelper.NewTracesExporter(ctx, params, config, diff --git a/exporter/awss3exporter/factory_test.go b/exporter/awss3exporter/factory_test.go index 56ecbfb706a0..c8fc0ec1faea 100644 --- a/exporter/awss3exporter/factory_test.go +++ b/exporter/awss3exporter/factory_test.go @@ -48,3 +48,21 @@ func TestCreateLogsExporter(t *testing.T) { assert.NoError(t, err) require.NotNil(t, exp) } + +func TestUnsupportedMarshalerOptions(t *testing.T) { + cfg := createDefaultConfig() + cfg.(*Config).MarshalerName = SumoIC + exp, err := createMetricsExporter( + context.Background(), + exportertest.NewNopCreateSettings(), + cfg) + assert.Error(t, err) + require.Nil(t, exp) + + exp2, err := createTracesExporter( + context.Background(), + exportertest.NewNopCreateSettings(), + cfg) + assert.Error(t, err) + require.Nil(t, exp2) +} diff --git a/exporter/awss3exporter/marshaler.go b/exporter/awss3exporter/marshaler.go index 1478306b5705..686ca49d6dfe 100644 --- a/exporter/awss3exporter/marshaler.go +++ b/exporter/awss3exporter/marshaler.go @@ -31,6 +31,10 @@ func NewMarshaler(mType MarshalerType, logger *zap.Logger) (marshaler, error) { marshaler.tracesMarshaler = &ptrace.JSONMarshaler{} marshaler.metricsMarshaler = &pmetric.JSONMarshaler{} marshaler.fileFormat = "json" + case SumoIC: + sumomarshaler := newSumoICMarshaler() + marshaler.logsMarshaler = &sumomarshaler + marshaler.fileFormat = "json.gz" default: return nil, ErrUnknownMarshaler } diff --git a/exporter/awss3exporter/marshaler_test.go b/exporter/awss3exporter/marshaler_test.go index 60d814493546..791a31bc8903 100644 --- a/exporter/awss3exporter/marshaler_test.go +++ b/exporter/awss3exporter/marshaler_test.go @@ -18,6 +18,12 @@ func TestMarshaler(t *testing.T) { require.NotNil(t, m) assert.Equal(t, m.format(), "json") } + { + m, err := NewMarshaler("sumo_ic", zap.NewNop()) + assert.NoError(t, err) + require.NotNil(t, m) + assert.Equal(t, m.format(), "json.gz") + } { m, err := NewMarshaler("unknown", zap.NewNop()) assert.Error(t, err) diff --git a/exporter/awss3exporter/sumo_marshaler.go b/exporter/awss3exporter/sumo_marshaler.go new file mode 100644 index 000000000000..1828b136dfb5 --- /dev/null +++ b/exporter/awss3exporter/sumo_marshaler.go @@ -0,0 +1,162 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3exporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter" + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +const ( + logBodyKey = "log" +) + +type sumoMarshaler struct{} + +func (*sumoMarshaler) format() string { + return string(SumoIC) +} + +func newSumoICMarshaler() sumoMarshaler { + return sumoMarshaler{} +} + +func logEntry(buf *bytes.Buffer, format string, a ...interface{}) { + buf.WriteString(fmt.Sprintf(format, a...)) + buf.WriteString("\n") +} + +func attributeValueToString(v pcommon.Value) (string, error) { + switch v.Type() { + case pcommon.ValueTypeStr: + return v.Str(), nil + case pcommon.ValueTypeBool: + return strconv.FormatBool(v.Bool()), nil + case pcommon.ValueTypeBytes: + return valueToJSON(v.Bytes().AsRaw()) + case pcommon.ValueTypeDouble: + return strconv.FormatFloat(v.Double(), 'f', -1, 64), nil + case pcommon.ValueTypeInt: + return strconv.FormatInt(v.Int(), 10), nil + case pcommon.ValueTypeSlice: + return valueToJSON(v.Slice().AsRaw()) + case pcommon.ValueTypeMap: + return valueToJSON(v.Map().AsRaw()) + case pcommon.ValueTypeEmpty: + return "", nil + default: + return "", fmt.Errorf("unknown OpenTelemetry attribute value type: %q", v.Type()) + } +} + +func valueToJSON(m any) (string, error) { + jsonString := new(bytes.Buffer) + enc := json.NewEncoder(jsonString) + err := enc.Encode(m) + + return strings.Trim(jsonString.String(), "\n"), err +} + +const ( + SourceCategoryKey = "_sourceCategory" + SourceHostKey = "_sourceHost" + SourceNameKey = "_sourceName" +) + +func (sumoMarshaler) MarshalLogs(ld plog.Logs) ([]byte, error) { + buf := bytes.Buffer{} + rls := ld.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + rl := rls.At(i) + ra := rl.Resource().Attributes() + sourceCategory, exists := ra.Get(SourceCategoryKey) + if !exists { + return nil, errors.New("_sourceCategory attribute does not exist") + } + sourceHost, exists := ra.Get(SourceHostKey) + if !exists { + return nil, errors.New("_sourceHost attribute does not exist") + } + sourceName, exists := ra.Get(SourceNameKey) + if !exists { + return nil, errors.New("_sourceName attribute does not exist") + } + + sc, err := attributeValueToString(sourceCategory) + if err != nil { + return nil, err + } + sh, err := attributeValueToString(sourceHost) + if err != nil { + return nil, err + } + sn, err := attributeValueToString(sourceName) + if err != nil { + return nil, err + } + sc = strconv.Quote(sc) + sh = strconv.Quote(sh) + sn = strconv.Quote(sn) + + // Remove the source attributes so that they won't be included in "fields" value. + ra.Remove(SourceCategoryKey) + ra.Remove(SourceHostKey) + ra.Remove(SourceNameKey) + + fields, err := valueToJSON(ra.AsRaw()) + if err != nil { + return nil, err + } + + ills := rl.ScopeLogs() + for j := 0; j < ills.Len(); j++ { + ils := ills.At(j) + logs := ils.LogRecords() + for k := 0; k < logs.Len(); k++ { + lr := logs.At(k) + dateVal := lr.ObservedTimestamp() + + message, err := getMessageJSON(lr) + if err != nil { + return nil, err + } + + logEntry(&buf, "{\"date\": \"%s\",\"sourceName\":%s,\"sourceHost\":%s,\"sourceCategory\":%s,\"fields\":%s,\"message\":%s}", + dateVal, sn, sh, sc, fields, message) + } + } + } + return buf.Bytes(), nil +} + +func getMessageJSON(lr plog.LogRecord) (string, error) { + // The "message" fields is a JSON created from combining the actual log body and log-level attributes, + // where the log body is stored under "log" key. + // More info: + // https://help.sumologic.com/docs/send-data/opentelemetry-collector/data-source-configurations/additional-configurations-reference/#mapping-opentelemetry-concepts-to-sumo-logic + message := new(bytes.Buffer) + enc := json.NewEncoder(message) + + lr.Body().CopyTo(lr.Attributes().PutEmpty(logBodyKey)) + err := enc.Encode(lr.Attributes().AsRaw()) + + return strings.Trim(message.String(), "\n"), err +} + +func (s sumoMarshaler) MarshalTraces(_ ptrace.Traces) ([]byte, error) { + return nil, fmt.Errorf("traces can't be marshaled into %s format", s.format()) +} + +func (s sumoMarshaler) MarshalMetrics(_ pmetric.Metrics) ([]byte, error) { + return nil, fmt.Errorf("metrics can't be marshaled into %s format", s.format()) +} diff --git a/exporter/awss3exporter/sumo_marshaler_test.go b/exporter/awss3exporter/sumo_marshaler_test.go new file mode 100644 index 000000000000..aa6f656e1a73 --- /dev/null +++ b/exporter/awss3exporter/sumo_marshaler_test.go @@ -0,0 +1,192 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3exporter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +func TestMarshalerMissingAttributes(t *testing.T) { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.ScopeLogs().AppendEmpty() + marshaler := &sumoMarshaler{} + require.NotNil(t, marshaler) + _, err := marshaler.MarshalLogs(logs) + assert.Error(t, err) +} + +func TestMarshalerMissingSourceHost(t *testing.T) { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + rls.Resource().Attributes().PutStr("_sourceCategory", "testcategory") + + marshaler := &sumoMarshaler{} + require.NotNil(t, marshaler) + _, err := marshaler.MarshalLogs(logs) + assert.Error(t, err) +} + +func TestMarshalerMissingScopedLogs(t *testing.T) { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + rls.Resource().Attributes().PutStr("_sourceCategory", "testcategory") + rls.Resource().Attributes().PutStr("_sourceHost", "testHost") + rls.Resource().Attributes().PutStr("_sourceName", "testName") + + marshaler := &sumoMarshaler{} + require.NotNil(t, marshaler) + _, err := marshaler.MarshalLogs(logs) + assert.NoError(t, err) +} + +func TestMarshalerMissingSourceName(t *testing.T) { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + rls.Resource().Attributes().PutStr("_sourceCategory", "testcategory") + rls.Resource().Attributes().PutStr("_sourceHost", "testHost") + + sl := rls.ScopeLogs().AppendEmpty() + const recordNum = 0 + + ts := pcommon.Timestamp(int64(recordNum) * time.Millisecond.Nanoseconds()) + logRecord := sl.LogRecords().AppendEmpty() + logRecord.Body().SetStr("entry1") + logRecord.SetTimestamp(ts) + + marshaler := &sumoMarshaler{} + require.NotNil(t, marshaler) + _, err := marshaler.MarshalLogs(logs) + assert.Error(t, err) +} + +func TestMarshalerOkStructure(t *testing.T) { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + rls.Resource().Attributes().PutStr("_sourceCategory", "testcategory") + rls.Resource().Attributes().PutStr("_sourceHost", "testHost") + rls.Resource().Attributes().PutStr("_sourceName", "testSourceName") + rls.Resource().Attributes().PutStr("42", "the question") + slice := rls.Resource().Attributes().PutEmptySlice("slice") + pcommon.NewValueInt(13).CopyTo(slice.AppendEmpty()) + m := pcommon.NewValueMap() + m.Map().PutBool("b", true) + m.CopyTo(slice.AppendEmpty()) + + sl := rls.ScopeLogs().AppendEmpty() + const recordNum = 0 + + ts := pcommon.Timestamp(int64(recordNum) * time.Millisecond.Nanoseconds()) + logRecord := sl.LogRecords().AppendEmpty() + logRecord.Body().SetStr("entry1") + logRecord.SetTimestamp(ts) + logRecord.Attributes().PutStr("key", "value") + + marshaler := &sumoMarshaler{} + require.NotNil(t, marshaler) + buf, err := marshaler.MarshalLogs(logs) + assert.NoError(t, err) + expectedEntry := "{\"date\": \"1970-01-01 00:00:00 +0000 UTC\",\"sourceName\":\"testSourceName\",\"sourceHost\":\"testHost\"" + expectedEntry += ",\"sourceCategory\":\"testcategory\",\"fields\":{\"42\":\"the question\",\"slice\":[13,{\"b\":true}]},\"message\":{\"key\":\"value\",\"log\":\"entry1\"}}\n" + assert.Equal(t, expectedEntry, string(buf)) +} + +func TestMarshalerQuotes(t *testing.T) { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + rls.Resource().Attributes().PutStr("_sourceCategory", `"foo"bar"`) + rls.Resource().Attributes().PutStr("_sourceHost", "testHost") + rls.Resource().Attributes().PutStr("_sourceName", "testSourceName") + + sl := rls.ScopeLogs().AppendEmpty() + const recordNum = 0 + + ts := pcommon.Timestamp(int64(recordNum) * time.Millisecond.Nanoseconds()) + logRecord := sl.LogRecords().AppendEmpty() + logRecord.Body().SetStr("entry1") + logRecord.SetTimestamp(ts) + + marshaler := &sumoMarshaler{} + require.NotNil(t, marshaler) + buf, err := marshaler.MarshalLogs(logs) + assert.NoError(t, err) + expectedEntry := "{\"date\": \"1970-01-01 00:00:00 +0000 UTC\",\"sourceName\":\"testSourceName\",\"sourceHost\":\"testHost\"" + expectedEntry += ",\"sourceCategory\":\"\\\"foo\\\"bar\\\"\",\"fields\":{},\"message\":{\"log\":\"entry1\"}}\n" + assert.Equal(t, expectedEntry, string(buf)) +} + +func TestAttributeValueToString(t *testing.T) { + testCases := []struct { + value pcommon.Value + result string + init func(pcommon.Value) + }{ + { + value: pcommon.NewValueBool(true), + result: "true", + }, + { + value: pcommon.NewValueBytes(), + result: "\"KiFN/wA=\"", + init: func(v pcommon.Value) { + v.Bytes().Append(42, 33, 77, 255, 0) + }, + }, + { + value: pcommon.NewValueDouble(1.69), + result: "1.69", + }, + { + value: pcommon.NewValueInt(42), + result: "42", + }, + { + // Format of a map entry: + // " -> : ()\n" + // Type names: https://github.com/open-telemetry/opentelemetry-collector/blob/ed8547a8e5d6ed527e6d54136cb2e137b954f888/pdata/pcommon/value.go#L32 + value: pcommon.NewValueMap(), + result: "{" + + "\"bool\":false," + + "\"map\":{}," + + "\"string\":\"abc\"" + + "}", + init: func(v pcommon.Value) { + m := v.Map() + m.PutBool("bool", false) + m.PutEmptyMap("map") + m.PutStr("string", "abc") + }, + }, + { + value: pcommon.NewValueSlice(), + result: "[110.37,[true],\"YWJj\",\"asdfg\"]", + init: func(v pcommon.Value) { + s := v.Slice() + s.AppendEmpty().SetDouble(110.37) + s.AppendEmpty().SetEmptySlice().AppendEmpty().SetBool(true) + s.AppendEmpty().SetEmptyBytes().Append(97, 98, 99) + s.AppendEmpty().SetStr("asdfg") + }, + }, + { + value: pcommon.NewValueStr("qwerty"), + result: "qwerty", + }, + } + + for _, testCase := range testCases { + if testCase.init != nil { + testCase.init(testCase.value) + } + val, err := attributeValueToString(testCase.value) + assert.NoError(t, err) + assert.Equal(t, testCase.result, val) + } +} diff --git a/exporter/awss3exporter/testdata/marshaler.yaml b/exporter/awss3exporter/testdata/marshaler.yaml new file mode 100644 index 000000000000..fcbcba6be079 --- /dev/null +++ b/exporter/awss3exporter/testdata/marshaler.yaml @@ -0,0 +1,18 @@ +receivers: + nop: + +exporters: + awss3: + s3uploader: + s3_bucket: "foo" + marshaler: sumo_ic + +processors: + nop: + +service: + pipelines: + traces: + receivers: [nop] + processors: [nop] + exporters: [awss3]