Skip to content

Commit

Permalink
feat(otel): upgrade opentelemetry to v0.75 (#2303)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoparente authored Apr 12, 2023
1 parent 6f22b3c commit d2f4ed0
Show file tree
Hide file tree
Showing 41 changed files with 962 additions and 1,987 deletions.
11 changes: 6 additions & 5 deletions agent/backend/pktvisor/pktvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/orb-community/orb/agent/backend"
"github.com/orb-community/orb/agent/config"
"github.com/orb-community/orb/agent/policies"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -83,8 +84,8 @@ type pktvisorBackend struct {
otelReceiverType string
otelReceiverHost string
otelReceiverPort int
receiver map[string]component.MetricsReceiver
exporter map[string]component.MetricsExporter
receiver map[string]receiver.Metrics
exporter map[string]exporter.Metrics
routineMap map[string]context.CancelFunc
}

Expand Down Expand Up @@ -166,11 +167,11 @@ func (p *pktvisorBackend) Start(ctx context.Context, cancelFunc context.CancelFu
}

if p.receiver == nil {
p.receiver = make(map[string]component.MetricsReceiver)
p.receiver = make(map[string]receiver.Metrics)
}

if p.exporter == nil {
p.exporter = make(map[string]component.MetricsExporter)
p.exporter = make(map[string]exporter.Metrics)
}

_, err := exec.LookPath(p.binary)
Expand Down
9 changes: 5 additions & 4 deletions agent/backend/pktvisor/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
"github.com/orb-community/orb/agent/otel/otlpmqttexporter"
"github.com/orb-community/orb/agent/otel/pktvisorreceiver"
"github.com/orb-community/orb/fleet"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.uber.org/zap"
)
Expand All @@ -37,7 +38,7 @@ func (p *pktvisorBackend) scrapeMetrics(period uint) (map[string]interface{}, er
return metrics, nil
}

func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (component.MetricsExporter, error) {
func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Metrics, error) {

bridgeService := otel.NewBridgeService(ctx, &p.policyRepo, p.agentTags)
if p.mqttClient != nil {
Expand All @@ -63,7 +64,7 @@ func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc

}

func (p *pktvisorBackend) createReceiver(ctx context.Context, exporter component.MetricsExporter, logger *zap.Logger) (component.MetricsReceiver, error) {
func (p *pktvisorBackend) createReceiver(ctx context.Context, expo exporter.Metrics, logger *zap.Logger) (receiver.Metrics, error) {
set := pktvisorreceiver.CreateDefaultSettings(logger)
var pktvisorEndpoint string
if p.adminAPIHost == "" || p.adminAPIPort == "" {
Expand All @@ -76,7 +77,7 @@ func (p *pktvisorBackend) createReceiver(ctx context.Context, exporter component
p.logger.Info("starting receiver with pktvisorEndpoint", zap.String("endpoint", pktvisorEndpoint), zap.String("metrics_url", metricsPath))
cfg := pktvisorreceiver.CreateReceiverConfig(pktvisorEndpoint, metricsPath)
// Create the Prometheus receiver and pass in the previously created Prometheus exporter.
receiver, err := pktvisorreceiver.CreateMetricsReceiver(ctx, set, cfg, exporter)
receiver, err := pktvisorreceiver.CreateMetricsReceiver(ctx, set, cfg, expo)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions agent/otel/otlpmqttexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/orb-community/orb/agent/otel"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for OTLP/HTTP exporter.
type Config struct {
config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// Add Client directly to only re-use an existing connection - requires "github.com/eclipse/paho.mqtt.golang"
Client *mqtt.Client
Expand All @@ -31,7 +31,7 @@ type Config struct {
OrbAgentService otel.AgentBridgeService
}

var _ config.Exporter = (*Config)(nil)
var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
Expand Down
48 changes: 6 additions & 42 deletions agent/otel/otlpmqttexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,17 @@
package otlpmqttexporter

import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/service/servicetest"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
)

func TestLoadConfig(t *testing.T) {
t.Skip("this ")
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory()
factories.Exporters[typeStr] = factory

// Bad config
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "bad_empty_config.yaml"), factories)

require.Error(t, err, "should expect Error in the bad_empty_config.yaml loading")
require.Nil(t, cfg, "should expect LoadConfigAndValidate should return nil cfg")

// Good config
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

e1 := cfg.Exporters[config.NewComponentIDWithName(typeStr, "2")]
assert.Equal(t, e1,
&Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "2")),
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
})
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(confmap.New(), cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}
108 changes: 54 additions & 54 deletions agent/otel/otlpmqttexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"fmt"

"github.com/orb-community/orb/agent/otel"
"go.uber.org/zap"

mqtt "github.com/eclipse/paho.mqtt.golang"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

const (
Expand All @@ -29,72 +29,72 @@ const (

// NewFactory creates a factory for OTLP exporter.
// Reducing the scope to just Metrics since it is our use-case
func NewFactory() component.ExporterFactory {
return component.NewExporterFactory(
func NewFactory() exporter.Factory {
return exporter.NewFactory(
typeStr,
CreateDefaultConfig,
component.WithMetricsExporter(CreateMetricsExporter, component.StabilityLevelAlpha))
exporter.WithMetrics(CreateMetricsExporter, component.StabilityLevelStable))
}

func CreateConfig(addr, id, key, channel, pktvisor, metricsTopic string, bridgeService otel.AgentBridgeService) config.Exporter {
func CreateConfig(addr, id, key, channel, pktvisor, metricsTopic string, bridgeService otel.AgentBridgeService) component.Config {
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
MetricsTopic: metricsTopic,
Address: addr,
Id: id,
Key: key,
ChannelID: channel,
PktVisorVersion: pktvisor,
OrbAgentService: bridgeService,
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
MetricsTopic: metricsTopic,
Address: addr,
Id: id,
Key: key,
ChannelID: channel,
PktVisorVersion: pktvisor,
OrbAgentService: bridgeService,
}
}

func CreateDefaultConfig() config.Exporter {
base := fmt.Sprintf("channels/%s/messages", defaultMQTTId)
metricsTopic := fmt.Sprintf("%s/otlp/%s", base, defaultName)
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Address: defaultMQTTAddr,
Id: defaultMQTTId,
Key: defaultMQTTKey,
ChannelID: base,
TLS: defaultTLS,
MetricsTopic: metricsTopic,
func CreateDefaultSettings(logger *zap.Logger) exporter.CreateSettings {
return exporter.CreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: global.MeterProvider(),
},
BuildInfo: component.NewDefaultBuildInfo(),
}
}

func CreateConfigClient(client *mqtt.Client, metricsTopic, pktvisor string, bridgeService otel.AgentBridgeService) config.Exporter {
func CreateDefaultConfig() component.Config {
base := fmt.Sprintf("channels/%s/messages", defaultMQTTId)
metricsTopic := fmt.Sprintf("%s/otlp/%s", base, defaultName)
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Client: client,
MetricsTopic: metricsTopic,
PktVisorVersion: pktvisor,
OrbAgentService: bridgeService,
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Address: defaultMQTTAddr,
Id: defaultMQTTId,
Key: defaultMQTTKey,
ChannelID: base,
TLS: defaultTLS,
MetricsTopic: metricsTopic,
}
}

func CreateDefaultSettings(logger *zap.Logger) component.ExporterCreateSettings {
return component.ExporterCreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: global.MeterProvider(),
},
BuildInfo: component.NewDefaultBuildInfo(),
func CreateConfigClient(client *mqtt.Client, metricsTopic, pktvisor string, bridgeService otel.AgentBridgeService) component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Client: client,
MetricsTopic: metricsTopic,
PktVisorVersion: pktvisor,
OrbAgentService: bridgeService,
}
}

func createTracesExporter(
ctx context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.TracesExporter, error) {
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
oce, err := newExporter(cfg, set, ctx)
if err != nil {
return nil, err
Expand All @@ -116,9 +116,9 @@ func createTracesExporter(

func CreateMetricsExporter(
ctx context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.MetricsExporter, error) {
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
oce, err := newExporter(cfg, set, ctx)
if err != nil {
return nil, err
Expand All @@ -143,9 +143,9 @@ func CreateMetricsExporter(

func createLogsExporter(
ctx context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.LogsExporter, error) {
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
oce, err := newExporter(cfg, set, ctx)
if err != nil {
return nil, err
Expand Down
35 changes: 5 additions & 30 deletions agent/otel/otlpmqttexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -34,7 +33,7 @@ func TestCreateMetricsExporter(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

set := componenttest.NewNopExporterCreateSettings()
set := exportertest.NewNopCreateSettings()
ctx := context.Background()
ctx = context.WithValue(ctx, "policy_name", "test")
ctx = context.WithValue(ctx, "policy_id", "test")
Expand Down Expand Up @@ -101,31 +100,7 @@ func TestCreateConfigClient(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
c := &tt.args.client
got := CreateConfigClient(c, tt.args.metricsTopic, " 1.0", nil)
assert.Equal(t, tt.want, got.Validate(), "expected %s but got %s", tt.want, got.Validate())
})
}
}

func TestCreateDefaultSettings(t *testing.T) {
logger, _ := zap.NewDevelopment()
type args struct {
logger *zap.Logger
}
tests := []struct {
name string
args args
}{
{
name: "ok default",
args: args{
logger: logger,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := CreateDefaultSettings(tt.args.logger)
assert.NotNil(t, got.TelemetrySettings.Logger, "expected to not be nil")
assert.Equal(t, tt.want, component.ValidateConfig(got), "expected %s but got %s", tt.want, component.ValidateConfig(got))
})
}
}
Expand All @@ -141,7 +116,7 @@ func TestCreateConfig(t *testing.T) {
tests := []struct {
name string
args args
want config.Exporter
want component.Config
}{
{
name: "local mqtt",
Expand Down
Loading

0 comments on commit d2f4ed0

Please sign in to comment.