Skip to content

Commit

Permalink
Merge branch 'main' into balance-metrics-by-resources
Browse files Browse the repository at this point in the history
  • Loading branch information
SHaaD94 authored Mar 30, 2024
2 parents 4390dfe + 6be6423 commit 7802c17
Show file tree
Hide file tree
Showing 29 changed files with 883 additions and 42 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-avrologencodingextension.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: avrologencodingextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new encoding extension to support mapping of AVRO messages to logs.

# One or more tracking issues related to the change
issues: [21067]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
13 changes: 13 additions & 0 deletions .chloggen/sinkingpoint_failover-max-retries.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: failoverconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support ignoring `max_retries` setting in failover connector

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [9868]
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ body:
- extension/basicauth
- extension/bearertokenauth
- extension/encoding
- extension/encoding/avrologencoding
- extension/encoding/jaegerencoding
- extension/encoding/jsonlogencoding
- extension/encoding/otlpencoding
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ body:
- extension/basicauth
- extension/bearertokenauth
- extension/encoding
- extension/encoding/avrologencoding
- extension/encoding/jaegerencoding
- extension/encoding/jsonlogencoding
- extension/encoding/otlpencoding
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ body:
- extension/basicauth
- extension/bearertokenauth
- extension/encoding
- extension/encoding/avrologencoding
- extension/encoding/jaegerencoding
- extension/encoding/jsonlogencoding
- extension/encoding/otlpencoding
Expand Down
2 changes: 1 addition & 1 deletion connector/failoverconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ The following settings are available:
- `priority_levels (required)`: list of pipeline level priorities in a 1 - n configuration, multiple pipelines can sit at a single priority level.
- `retry_interval (optional)`: the frequency at which the pipeline levels will attempt to reestablish connection with all higher priority levels. Default value is 10 minutes. (See Example below for further explanation)
- `retry_gap (optional)`: the amount of time between trying two separate priority levels in a single retry_interval timeframe. Default value is 30 seconds. (See Example below for further explanation)
- `max_retries (optional)`: the maximum retries per level. Default value is 10.
- `max_retries (optional)`: the maximum retries per level. Default value is 10. Set to 0 to allow unlimited retries.

The connector intakes a list of `priority_levels` each of which can contain multiple pipelines.
If any pipeline at a stable level fails, the level is considered unhealthy and the connector will move down one priority level and route all data to the new level (assuming it is stable).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (p *PipelineSelector) checkContinueRetry(index int) bool {
}

func (p *PipelineSelector) exceededMaxRetries(idx int) bool {
return idx < len(p.pipelineRetries) && (p.loadRetryCount(idx) >= p.constants.MaxRetries)
return p.constants.MaxRetries > 0 && idx < len(p.pipelineRetries) && (p.loadRetryCount(idx) >= p.constants.MaxRetries)
}

// SetToStableIndex returns the CurrentIndex to the known Stable Index
Expand Down
10 changes: 10 additions & 0 deletions exporter/awss3exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,13 @@ This exporter follows default credential resolution for the

Follow the [guidelines](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html) for the
credential configuration.

### OpenTelemetry Collector Helm Chart for Kubernetes
For example, when using OpenTelemetry Collector Helm Chart you could use `extraEnvs` in the values.yaml.
```yaml
extraEnvs:
- name: AWS_ACCESS_KEY_ID
value: "< YOUR AWS ACCESS KEY >"
- name: AWS_SECRET_ACCESS_KEY
value: "< YOUR AWS SECRET ACCESS KEY >"
```
7 changes: 7 additions & 0 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,13 @@ func (f *factory) createTracesExporter(
c component.Config,
) (exporter.Traces, error) {
cfg := checkAndCastConfig(c, set.TelemetrySettings.Logger)
if noAPMStatsFeatureGate.IsEnabled() {
set.Logger.Warn(
"Trace metrics are now disabled in the Datadog Exporter by default. To continue receiving Trace Metrics, configure the Datadog Connector or disable the feature gate.",
zap.String("documentation", "https://docs.datadoghq.com/opentelemetry/guide/migration/"),
zap.String("feature gate ID", noAPMStatsFeatureGate.ID()),
)
}

var (
pusher consumer.ConsumeTracesFunc
Expand Down
1 change: 1 addition & 0 deletions extension/encoding/avrologencodingextension/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
14 changes: 14 additions & 0 deletions extension/encoding/avrologencodingextension/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# AVRO Log encoding extension

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development] |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aextension%2Favrologencoding%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aextension%2Favrologencoding) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aextension%2Favrologencoding%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aextension%2Favrologencoding) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@thmshmm](https://www.github.com/thmshmm) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->

The `avrolog` encoding extension is used to unmarshal AVRO and insert it into the body of a log record. Marshalling is not supported.
38 changes: 38 additions & 0 deletions extension/encoding/avrologencodingextension/avro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"

import (
"fmt"

"github.com/linkedin/goavro/v2"
)

type avroDeserializer interface {
Deserialize([]byte) (map[string]any, error)
}

type avroStaticSchemaDeserializer struct {
codec *goavro.Codec
}

func newAVROStaticSchemaDeserializer(schema string) (avroDeserializer, error) {
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, fmt.Errorf("failed to create avro codec: %w", err)
}

return &avroStaticSchemaDeserializer{
codec: codec,
}, nil
}

func (d *avroStaticSchemaDeserializer) Deserialize(data []byte) (map[string]any, error) {
native, _, err := d.codec.NativeFromBinary(data)
if err != nil {
return nil, fmt.Errorf("failed to deserialize avro record: %w", err)
}

return native.(map[string]any), nil
}
42 changes: 42 additions & 0 deletions extension/encoding/avrologencodingextension/avro_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewAvroLogsUnmarshaler(t *testing.T) {
schema, data := createAVROTestData(t)

deserializer, err := newAVROStaticSchemaDeserializer(schema)
if err != nil {
t.Errorf("Did not expect an error, got %q", err.Error())
}

logMap, err := deserializer.Deserialize(data)
if err != nil {
t.Fatalf("Did not expect an error, got %q", err.Error())
}

assert.Equal(t, int64(1697187201488000000), logMap["timestamp"].(time.Time).UnixNano())
assert.Equal(t, "host1", logMap["hostname"])
assert.Equal(t, int64(12), logMap["nestedRecord"].(map[string]any)["field1"])

props := logMap["properties"].([]any)
propsStr := make([]string, len(props))
for i, prop := range props {
propsStr[i] = prop.(string)
}

assert.Equal(t, []string{"prop1", "prop2"}, propsStr)
}

func TestNewAvroLogsUnmarshalerInvalidSchema(t *testing.T) {
_, err := newAVROStaticSchemaDeserializer("invalid schema")
assert.Error(t, err)
}
20 changes: 20 additions & 0 deletions extension/encoding/avrologencodingextension/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"

import "errors"

var errNoSchema = errors.New("no schema provided")

type Config struct {
Schema string `mapstructure:"schema"`
}

func (c *Config) Validate() error {
if c.Schema == "" {
return errNoSchema
}

return nil
}
20 changes: 20 additions & 0 deletions extension/encoding/avrologencodingextension/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfigValidate(t *testing.T) {
cfg := &Config{}
err := cfg.Validate()
assert.ErrorIs(t, err, errNoSchema)

cfg.Schema = "schema1"
err = cfg.Validate()
assert.NoError(t, err)
}
5 changes: 5 additions & 0 deletions extension/encoding/avrologencodingextension/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml
package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"
89 changes: 89 additions & 0 deletions extension/encoding/avrologencodingextension/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/avrologencodingextension"

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding"
)

var (
_ encoding.LogsUnmarshalerExtension = (*avroLogExtension)(nil)
)

type avroLogExtension struct {
deserializer avroDeserializer
}

func newExtension(config *Config) (*avroLogExtension, error) {
deserializer, err := newAVROStaticSchemaDeserializer(config.Schema)
if err != nil {
return nil, err
}

return &avroLogExtension{deserializer: deserializer}, nil
}

func (e *avroLogExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
p := plog.NewLogs()

avroLog, err := e.deserializer.Deserialize(buf)
if err != nil {
return p, fmt.Errorf("failed to deserialize avro log: %w", err)
}

logRecords := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
logRecords.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))

// removes time.Time values as FromRaw does not support it
replaceLogicalTypes(avroLog)

// Set the unmarshaled avro as the body of the log record
if err := logRecords.Body().SetEmptyMap().FromRaw(avroLog); err != nil {
return p, err
}

return p, nil
}

func replaceLogicalTypes(m map[string]any) {
for k, v := range m {
m[k] = transformValue(v)
}
}

func transformValue(value any) any {
if timeValue, ok := value.(time.Time); ok {
return timeValue.UnixNano()
}

if mapValue, ok := value.(map[string]any); ok {
replaceLogicalTypes(mapValue)
return mapValue
}

if arrayValue, ok := value.([]any); ok {
for i, v := range arrayValue {
arrayValue[i] = transformValue(v)
}
return arrayValue
}

return value
}

func (e *avroLogExtension) Start(_ context.Context, _ component.Host) error {
return nil
}

func (e *avroLogExtension) Shutdown(_ context.Context) error {
return nil
}
53 changes: 53 additions & 0 deletions extension/encoding/avrologencodingextension/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package avrologencodingextension

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
)

func TestExtension_Start_Shutdown(t *testing.T) {
avroExtention := &avroLogExtension{}

err := avroExtention.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

err = avroExtention.Shutdown(context.Background())
require.NoError(t, err)
}

func TestUnmarshal(t *testing.T) {
t.Parallel()

schema, data := createAVROTestData(t)

e, err := newExtension(&Config{Schema: schema})
assert.NoError(t, err)

logs, err := e.UnmarshalLogs(data)
logRecord := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)

assert.NoError(t, err)
assert.Equal(t, "{\"count\":5,\"hostname\":\"host1\",\"level\":\"warn\",\"levelEnum\":\"INFO\",\"mapField\":{},\"message\":\"log message\",\"nestedRecord\":{\"field1\":12,\"field2\":\"val2\"},\"properties\":[\"prop1\",\"prop2\"],\"severity\":1,\"timestamp\":1697187201488000000}", logRecord.Body().AsString())
}

func TestInvalidUnmarshal(t *testing.T) {
t.Parallel()

schema, err := loadAVROSchemaFromFile("testdata/schema1.avro")
if err != nil {
t.Fatalf("Failed to read avro schema file: %q", err.Error())
}

e, err := newExtension(&Config{Schema: string(schema)})
assert.NoError(t, err)

_, err = e.UnmarshalLogs([]byte("NOT A AVRO"))
assert.Error(t, err)
}
Loading

0 comments on commit 7802c17

Please sign in to comment.