Skip to content

Commit

Permalink
Added metric and log support
Browse files Browse the repository at this point in the history
  • Loading branch information
akats7 committed Dec 18, 2023
1 parent 223cbb9 commit ff92582
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 17 deletions.
27 changes: 27 additions & 0 deletions .chloggen/failover-PR3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: failoverconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: PR extends failover connector for metric and log pipelines

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20766]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
53 changes: 45 additions & 8 deletions connector/failoverconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,58 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector/internal/state"
)

type logsFailover struct {
component.StartFunc
component.ShutdownFunc

config *Config
failover *failoverRouter[consumer.Logs]
logger *zap.Logger
config *Config
failover *failoverRouter[consumer.Logs]
logger *zap.Logger
errTryLock *state.TryLock
stableTryLock *state.TryLock
}

func (f *logsFailover) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// ConsumeLogs will try to export to the current set priority level and handle failover in the case of an error
func (f *logsFailover) ConsumeLogs(_ context.Context, _ plog.Logs) error {
return nil
func (f *logsFailover) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
mc, idx, ok := f.failover.getCurrentConsumer()
if !ok {
return errNoValidPipeline
}
err := mc.ConsumeLogs(ctx, ld)
if err == nil {
f.stableTryLock.TryExecute(f.failover.reportStable, idx)
return nil
}
return f.FailoverLogs(ctx, ld)
}

// FailoverLogs is the function responsible for handling errors returned by the nextConsumer
func (f *logsFailover) FailoverLogs(ctx context.Context, ld plog.Logs) error {
for mc, idx, ok := f.failover.getCurrentConsumer(); ok; mc, idx, ok = f.failover.getCurrentConsumer() {
err := mc.ConsumeLogs(ctx, ld)
if err != nil {
f.errTryLock.TryExecute(f.failover.handlePipelineError, idx)
continue
}
f.stableTryLock.TryExecute(f.failover.reportStable, idx)
return nil
}
f.logger.Error("All provided pipelines return errors, dropping data")
return errNoValidPipeline
}

func (f *logsFailover) Shutdown(_ context.Context) error {
if f.failover != nil {
f.failover.rS.InvokeCancel()
}
return nil
}

Expand All @@ -44,9 +75,15 @@ func newLogsToLogs(set connector.CreateSettings, cfg component.Config, logs cons
}

failover := newFailoverRouter[consumer.Logs](lr.Consumer, config) // temp add type spec to resolve linter issues
err := failover.registerConsumers()
if err != nil {
return nil, err
}
return &logsFailover{
config: config,
failover: failover,
logger: set.TelemetrySettings.Logger,
config: config,
failover: failover,
logger: set.TelemetrySettings.Logger,
errTryLock: state.NewTryLock(),
stableTryLock: state.NewTryLock(),
}, nil
}
192 changes: 192 additions & 0 deletions connector/failoverconnector/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package failoverconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector"
import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
)

var errLogsConsumer = errors.New("Error from ConsumeLogs")

func TestLogsRegisterConsumers(t *testing.T) {
var sinkFirst, sinkSecond, sinkThird consumertest.LogsSink
logsFirst := component.NewIDWithName(component.DataTypeLogs, "logs/first")
logsSecond := component.NewIDWithName(component.DataTypeLogs, "logs/second")
logsThird := component.NewIDWithName(component.DataTypeLogs, "logs/third")

cfg := &Config{
PipelinePriority: [][]component.ID{{logsFirst}, {logsSecond}, {logsThird}},
RetryInterval: 5 * time.Minute,
RetryGap: 10 * time.Second,
MaxRetries: 5,
}

router := connectortest.NewLogsRouter(
connectortest.WithLogsSink(logsFirst, &sinkFirst),
connectortest.WithLogsSink(logsSecond, &sinkSecond),
connectortest.WithLogsSink(logsThird, &sinkThird),
)

conn, err := NewFactory().CreateLogsToLogs(context.Background(),
connectortest.NewNopCreateSettings(), cfg, router.(consumer.Logs))

failoverConnector := conn.(*logsFailover)
defer func() {
assert.NoError(t, failoverConnector.Shutdown(context.Background()))
}()

require.NoError(t, err)
require.NotNil(t, conn)

tc, idx, ok := failoverConnector.failover.getCurrentConsumer()
tc1 := failoverConnector.failover.GetConsumerAtIndex(1)
tc2 := failoverConnector.failover.GetConsumerAtIndex(2)

assert.True(t, ok)
require.Equal(t, idx, 0)
require.Implements(t, (*consumer.Logs)(nil), tc)
require.Implements(t, (*consumer.Logs)(nil), tc1)
require.Implements(t, (*consumer.Logs)(nil), tc2)
}

func TestLogsWithValidFailover(t *testing.T) {
var sinkSecond, sinkThird consumertest.LogsSink
logsFirst := component.NewIDWithName(component.DataTypeLogs, "logs/first")
logsSecond := component.NewIDWithName(component.DataTypeLogs, "logs/second")
logsThird := component.NewIDWithName(component.DataTypeLogs, "logs/third")

cfg := &Config{
PipelinePriority: [][]component.ID{{logsFirst}, {logsSecond}, {logsThird}},
RetryInterval: 5 * time.Minute,
RetryGap: 10 * time.Second,
MaxRetries: 5,
}

router := connectortest.NewLogsRouter(
connectortest.WithNopLogs(logsFirst),
connectortest.WithLogsSink(logsSecond, &sinkSecond),
connectortest.WithLogsSink(logsThird, &sinkThird),
)

conn, err := NewFactory().CreateLogsToLogs(context.Background(),
connectortest.NewNopCreateSettings(), cfg, router.(consumer.Logs))

require.NoError(t, err)

failoverConnector := conn.(*logsFailover)
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errLogsConsumer))
defer func() {
assert.NoError(t, failoverConnector.Shutdown(context.Background()))
}()

tr := sampleLog()

require.NoError(t, conn.ConsumeLogs(context.Background(), tr))
_, idx, ok := failoverConnector.failover.getCurrentConsumer()
assert.True(t, ok)
require.Equal(t, idx, 1)
}

func TestLogsWithFailoverError(t *testing.T) {
var sinkSecond, sinkThird consumertest.LogsSink
logsFirst := component.NewIDWithName(component.DataTypeLogs, "logs/first")
logsSecond := component.NewIDWithName(component.DataTypeLogs, "logs/second")
logsThird := component.NewIDWithName(component.DataTypeLogs, "logs/third")

cfg := &Config{
PipelinePriority: [][]component.ID{{logsFirst}, {logsSecond}, {logsThird}},
RetryInterval: 5 * time.Minute,
RetryGap: 10 * time.Second,
MaxRetries: 5,
}

router := connectortest.NewLogsRouter(
connectortest.WithNopLogs(logsFirst),
connectortest.WithLogsSink(logsSecond, &sinkSecond),
connectortest.WithLogsSink(logsThird, &sinkThird),
)

conn, err := NewFactory().CreateLogsToLogs(context.Background(),
connectortest.NewNopCreateSettings(), cfg, router.(consumer.Logs))

require.NoError(t, err)

failoverConnector := conn.(*logsFailover)
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errLogsConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errLogsConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(2, consumertest.NewErr(errLogsConsumer))
defer func() {
assert.NoError(t, failoverConnector.Shutdown(context.Background()))
}()

tr := sampleLog()

assert.EqualError(t, conn.ConsumeLogs(context.Background(), tr), "All provided pipelines return errors")
}

func TestLogsWithFailoverRecovery(t *testing.T) {
var sinkSecond, sinkThird consumertest.LogsSink
logsFirst := component.NewIDWithName(component.DataTypeLogs, "logs/first")
logsSecond := component.NewIDWithName(component.DataTypeLogs, "logs/second")
logsThird := component.NewIDWithName(component.DataTypeLogs, "logs/third")

cfg := &Config{
PipelinePriority: [][]component.ID{{logsFirst}, {logsSecond}, {logsThird}},
RetryInterval: 50 * time.Millisecond,
RetryGap: 10 * time.Millisecond,
MaxRetries: 1000,
}

router := connectortest.NewLogsRouter(
connectortest.WithNopLogs(logsFirst),
connectortest.WithLogsSink(logsSecond, &sinkSecond),
connectortest.WithLogsSink(logsThird, &sinkThird),
)

conn, err := NewFactory().CreateLogsToLogs(context.Background(),
connectortest.NewNopCreateSettings(), cfg, router.(consumer.Logs))

require.NoError(t, err)

failoverConnector := conn.(*logsFailover)
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errLogsConsumer))
defer func() {
assert.NoError(t, failoverConnector.Shutdown(context.Background()))
}()

tr := sampleLog()

require.NoError(t, conn.ConsumeLogs(context.Background(), tr))
_, idx, ok := failoverConnector.failover.getCurrentConsumer()

assert.True(t, ok)
require.Equal(t, idx, 1)

// Simulate recovery of exporter
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewNop())

time.Sleep(100 * time.Millisecond)

_, idx, ok = failoverConnector.failover.getCurrentConsumer()
assert.True(t, ok)
require.Equal(t, idx, 0)
}

func sampleLog() plog.Logs {
l := plog.NewLogs()
rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("test", "logs-test")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
return l
}
55 changes: 46 additions & 9 deletions connector/failoverconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,58 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector/internal/state"
)

type metricsFailover struct {
component.StartFunc
component.ShutdownFunc

config *Config
failover *failoverRouter[consumer.Metrics]
logger *zap.Logger
config *Config
failover *failoverRouter[consumer.Metrics]
logger *zap.Logger
errTryLock *state.TryLock
stableTryLock *state.TryLock
}

func (f *metricsFailover) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// ConsumeMetrics will try to export to the current set priority level and handle failover in the case of an error
func (f *metricsFailover) ConsumeMetrics(_ context.Context, _ pmetric.Metrics) error {
return nil
func (f *metricsFailover) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
mc, idx, ok := f.failover.getCurrentConsumer()
if !ok {
return errNoValidPipeline
}
err := mc.ConsumeMetrics(ctx, md)
if err == nil {
f.stableTryLock.TryExecute(f.failover.reportStable, idx)
return nil
}
return f.FailoverMetrics(ctx, md)
}

// FailoverMetrics is the function responsible for handling errors returned by the nextConsumer
func (f *metricsFailover) FailoverMetrics(ctx context.Context, md pmetric.Metrics) error {
for mc, idx, ok := f.failover.getCurrentConsumer(); ok; mc, idx, ok = f.failover.getCurrentConsumer() {
err := mc.ConsumeMetrics(ctx, md)
if err != nil {
f.errTryLock.TryExecute(f.failover.handlePipelineError, idx)
continue
}
f.stableTryLock.TryExecute(f.failover.reportStable, idx)
return nil
}
f.logger.Error("All provided pipelines return errors, dropping data")
return errNoValidPipeline
}

func (f *metricsFailover) Shutdown(_ context.Context) error {
if f.failover != nil {
f.failover.rS.InvokeCancel()
}
return nil
}

Expand All @@ -43,10 +74,16 @@ func newMetricsToMetrics(set connector.CreateSettings, cfg component.Config, met
return nil, errors.New("consumer is not of type MetricsRouter")
}

failover := newFailoverRouter[consumer.Metrics](mr.Consumer, config) // temp add type spec to resolve linter issues
failover := newFailoverRouter[consumer.Metrics](mr.Consumer, config)
err := failover.registerConsumers()
if err != nil {
return nil, err
}
return &metricsFailover{
config: config,
failover: failover,
logger: set.TelemetrySettings.Logger,
config: config,
failover: failover,
logger: set.TelemetrySettings.Logger,
errTryLock: state.NewTryLock(),
stableTryLock: state.NewTryLock(),
}, nil
}
Loading

0 comments on commit ff92582

Please sign in to comment.