Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sumologicexporter] Add extrapolation of source headers #1756

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
)

type sumologicexporter struct {
config *Config
config *Config
sources sourceFormats
}

func initExporter(cfg *Config) (*sumologicexporter, error) {
Expand Down Expand Up @@ -56,8 +57,14 @@ func initExporter(cfg *Config) (*sumologicexporter, error) {
return nil, errors.New("endpoint is not set")
}

sfs, err := newSourceFormats(cfg)
if err != nil {
return nil, err
}

se := &sumologicexporter{
config: cfg,
config: cfg,
sources: sfs,
}

return se, nil
Expand Down
35 changes: 35 additions & 0 deletions exporter/sumologicexporter/fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sumologicexporter

import (
"fmt"
"sort"
"strings"
)

// fields represents metadata
type fields map[string]string

// string returns fields as ordered key=value string with `, ` as separator
func (f fields) string() string {
rv := make([]string, 0, len(f))
for k, v := range f {
rv = append(rv, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(rv)

return strings.Join(rv, ", ")
}
32 changes: 32 additions & 0 deletions exporter/sumologicexporter/fields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sumologicexporter

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestFieldsAsString(t *testing.T) {
expected := "key1=value1, key2=value2, key3=value3"
flds := fields{
"key1": "value1",
"key3": "value3",
"key2": "value2",
}

assert.Equal(t, expected, flds.string())
}
31 changes: 6 additions & 25 deletions exporter/sumologicexporter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
package sumologicexporter

import (
"fmt"
"regexp"
"sort"
"strings"

"go.opentelemetry.io/collector/consumer/pdata"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
Expand All @@ -28,9 +25,6 @@ type filter struct {
regexes []*regexp.Regexp
}

// fields represents concatenated metadata
type fields string

func newFilter(flds []string) (filter, error) {
metadataRegexes := make([]*regexp.Regexp, len(flds))

Expand All @@ -48,9 +42,9 @@ func newFilter(flds []string) (filter, error) {
}, nil
}

// filterIn returns map of strings which matches at least one of the filter regexes
func (f *filter) filterIn(attributes pdata.AttributeMap) map[string]string {
returnValue := make(map[string]string)
// filterIn returns fields which match at least one of the filter regexes
func (f *filter) filterIn(attributes pdata.AttributeMap) fields {
returnValue := make(fields)

attributes.ForEach(func(k string, v pdata.AttributeValue) {
for _, regex := range f.regexes {
Expand All @@ -63,9 +57,9 @@ func (f *filter) filterIn(attributes pdata.AttributeMap) map[string]string {
return returnValue
}

// filterOut returns map of strings which doesn't match any of the filter regexes
func (f *filter) filterOut(attributes pdata.AttributeMap) map[string]string {
returnValue := make(map[string]string)
// filterOut returns fields which don't match any of the filter regexes
func (f *filter) filterOut(attributes pdata.AttributeMap) fields {
returnValue := make(fields)

attributes.ForEach(func(k string, v pdata.AttributeValue) {
for _, regex := range f.regexes {
Expand All @@ -77,16 +71,3 @@ func (f *filter) filterOut(attributes pdata.AttributeMap) map[string]string {
})
return returnValue
}

// getMetadata builds string which represents metadata in alphabetical order
func (f *filter) getMetadata(attributes pdata.AttributeMap) fields {
attrs := f.filterIn(attributes)
metadata := make([]string, 0, len(attrs))

for k, v := range attrs {
metadata = append(metadata, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(metadata)

return fields(strings.Join(metadata, ", "))
}
6 changes: 3 additions & 3 deletions exporter/sumologicexporter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestGetMetadata(t *testing.T) {
f, err := newFilter(regexes)
require.NoError(t, err)

metadata := f.getMetadata(attributes)
const expected fields = "key1=value1, key2=value2, key3=value3"
metadata := f.filterIn(attributes)
expected := fields{"key1": "value1", "key2": "value2", "key3": "value3"}
assert.Equal(t, expected, metadata)
}

Expand All @@ -52,7 +52,7 @@ func TestFilterOutMetadata(t *testing.T) {
require.NoError(t, err)

data := f.filterOut(attributes)
expected := map[string]string{
expected := fields{
"additional_key2": "value2",
"additional_key3": "value3",
}
Expand Down
43 changes: 26 additions & 17 deletions exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ type appendResponse struct {
}

type sender struct {
buffer []pdata.LogRecord
config *Config
client *http.Client
filter filter
ctx context.Context
buffer []pdata.LogRecord
config *Config
client *http.Client
filter filter
ctx context.Context
sources sourceFormats
}

const (
Expand All @@ -55,17 +56,25 @@ func newAppendResponse() appendResponse {
}
}

func newSender(ctx context.Context, cfg *Config, cl *http.Client, f filter) *sender {
func newSender(
ctx context.Context,
cfg *Config,
cl *http.Client,
f filter,
s sourceFormats,
) *sender {
return &sender{
config: cfg,
client: cl,
filter: f,
ctx: ctx,
config: cfg,
client: cl,
filter: f,
ctx: ctx,
sources: s,
}
}

// send sends data to sumologic
func (s *sender) send(pipeline PipelineType, body io.Reader, flds fields) error {

// Add headers
req, err := http.NewRequestWithContext(s.ctx, http.MethodPost, s.config.HTTPClientSettings.Endpoint, body)
if err != nil {
Expand All @@ -74,22 +83,22 @@ func (s *sender) send(pipeline PipelineType, body io.Reader, flds fields) error

req.Header.Add("X-Sumo-Client", s.config.Client)

if len(s.config.SourceHost) > 0 {
req.Header.Add("X-Sumo-Host", s.config.SourceHost)
if s.sources.host.isSet() {
req.Header.Add("X-Sumo-Host", s.sources.host.format(flds))
}

if len(s.config.SourceName) > 0 {
req.Header.Add("X-Sumo-Name", s.config.SourceName)
if s.sources.name.isSet() {
req.Header.Add("X-Sumo-Name", s.sources.name.format(flds))
}

if len(s.config.SourceCategory) > 0 {
req.Header.Add("X-Sumo-Category", s.config.SourceCategory)
if s.sources.category.isSet() {
req.Header.Add("X-Sumo-Category", s.sources.category.format(flds))
}

switch pipeline {
case LogsPipeline:
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("X-Sumo-Fields", string(flds))
req.Header.Add("X-Sumo-Fields", flds.string())
case MetricsPipeline:
// ToDo: Implement metrics pipeline
return errors.New("current sender version doesn't support metrics")
Expand Down
Loading