Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(otel): upgrade opentelemetry to v0.75 #2303

Merged
merged 7 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions agent/backend/pktvisor/pktvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/orb-community/orb/agent/backend"
"github.com/orb-community/orb/agent/config"
"github.com/orb-community/orb/agent/policies"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -83,8 +84,8 @@ type pktvisorBackend struct {
otelReceiverType string
otelReceiverHost string
otelReceiverPort int
receiver map[string]component.MetricsReceiver
exporter map[string]component.MetricsExporter
receiver map[string]receiver.Metrics
exporter map[string]exporter.Metrics
routineMap map[string]context.CancelFunc
}

Expand Down Expand Up @@ -166,11 +167,11 @@ func (p *pktvisorBackend) Start(ctx context.Context, cancelFunc context.CancelFu
}

if p.receiver == nil {
p.receiver = make(map[string]component.MetricsReceiver)
p.receiver = make(map[string]receiver.Metrics)
}

if p.exporter == nil {
p.exporter = make(map[string]component.MetricsExporter)
p.exporter = make(map[string]exporter.Metrics)
}

_, err := exec.LookPath(p.binary)
Expand Down
9 changes: 5 additions & 4 deletions agent/backend/pktvisor/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
"github.com/orb-community/orb/agent/otel/otlpmqttexporter"
"github.com/orb-community/orb/agent/otel/pktvisorreceiver"
"github.com/orb-community/orb/fleet"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.uber.org/zap"
)
Expand All @@ -37,7 +38,7 @@ func (p *pktvisorBackend) scrapeMetrics(period uint) (map[string]interface{}, er
return metrics, nil
}

func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (component.MetricsExporter, error) {
func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Metrics, error) {

bridgeService := otel.NewBridgeService(ctx, &p.policyRepo, p.agentTags)
if p.mqttClient != nil {
Expand All @@ -63,7 +64,7 @@ func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc

}

func (p *pktvisorBackend) createReceiver(ctx context.Context, exporter component.MetricsExporter, logger *zap.Logger) (component.MetricsReceiver, error) {
func (p *pktvisorBackend) createReceiver(ctx context.Context, expo exporter.Metrics, logger *zap.Logger) (receiver.Metrics, error) {
set := pktvisorreceiver.CreateDefaultSettings(logger)
var pktvisorEndpoint string
if p.adminAPIHost == "" || p.adminAPIPort == "" {
Expand All @@ -76,7 +77,7 @@ func (p *pktvisorBackend) createReceiver(ctx context.Context, exporter component
p.logger.Info("starting receiver with pktvisorEndpoint", zap.String("endpoint", pktvisorEndpoint), zap.String("metrics_url", metricsPath))
cfg := pktvisorreceiver.CreateReceiverConfig(pktvisorEndpoint, metricsPath)
// Create the Prometheus receiver and pass in the previously created Prometheus exporter.
receiver, err := pktvisorreceiver.CreateMetricsReceiver(ctx, set, cfg, exporter)
receiver, err := pktvisorreceiver.CreateMetricsReceiver(ctx, set, cfg, expo)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions agent/otel/otlpmqttexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/orb-community/orb/agent/otel"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for OTLP/HTTP exporter.
type Config struct {
config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// Add Client directly to only re-use an existing connection - requires "github.com/eclipse/paho.mqtt.golang"
Client *mqtt.Client
Expand All @@ -31,7 +31,7 @@ type Config struct {
OrbAgentService otel.AgentBridgeService
}

var _ config.Exporter = (*Config)(nil)
var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
Expand Down
48 changes: 6 additions & 42 deletions agent/otel/otlpmqttexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,17 @@
package otlpmqttexporter

import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/service/servicetest"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
)

func TestLoadConfig(t *testing.T) {
t.Skip("this ")
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory()
factories.Exporters[typeStr] = factory

// Bad config
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "bad_empty_config.yaml"), factories)

require.Error(t, err, "should expect Error in the bad_empty_config.yaml loading")
require.Nil(t, cfg, "should expect LoadConfigAndValidate should return nil cfg")

// Good config
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

e1 := cfg.Exporters[config.NewComponentIDWithName(typeStr, "2")]
assert.Equal(t, e1,
&Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "2")),
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
})
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(confmap.New(), cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}
108 changes: 54 additions & 54 deletions agent/otel/otlpmqttexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"fmt"

"github.com/orb-community/orb/agent/otel"
"go.uber.org/zap"

mqtt "github.com/eclipse/paho.mqtt.golang"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

const (
Expand All @@ -29,72 +29,72 @@ const (

// NewFactory creates a factory for OTLP exporter.
// Reducing the scope to just Metrics since it is our use-case
func NewFactory() component.ExporterFactory {
return component.NewExporterFactory(
func NewFactory() exporter.Factory {
return exporter.NewFactory(
typeStr,
CreateDefaultConfig,
component.WithMetricsExporter(CreateMetricsExporter, component.StabilityLevelAlpha))
exporter.WithMetrics(CreateMetricsExporter, component.StabilityLevelStable))
}

func CreateConfig(addr, id, key, channel, pktvisor, metricsTopic string, bridgeService otel.AgentBridgeService) config.Exporter {
func CreateConfig(addr, id, key, channel, pktvisor, metricsTopic string, bridgeService otel.AgentBridgeService) component.Config {
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
MetricsTopic: metricsTopic,
Address: addr,
Id: id,
Key: key,
ChannelID: channel,
PktVisorVersion: pktvisor,
OrbAgentService: bridgeService,
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
MetricsTopic: metricsTopic,
Address: addr,
Id: id,
Key: key,
ChannelID: channel,
PktVisorVersion: pktvisor,
OrbAgentService: bridgeService,
}
}

func CreateDefaultConfig() config.Exporter {
base := fmt.Sprintf("channels/%s/messages", defaultMQTTId)
metricsTopic := fmt.Sprintf("%s/otlp/%s", base, defaultName)
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Address: defaultMQTTAddr,
Id: defaultMQTTId,
Key: defaultMQTTKey,
ChannelID: base,
TLS: defaultTLS,
MetricsTopic: metricsTopic,
func CreateDefaultSettings(logger *zap.Logger) exporter.CreateSettings {
return exporter.CreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: global.MeterProvider(),
},
BuildInfo: component.NewDefaultBuildInfo(),
}
}

func CreateConfigClient(client *mqtt.Client, metricsTopic, pktvisor string, bridgeService otel.AgentBridgeService) config.Exporter {
func CreateDefaultConfig() component.Config {
base := fmt.Sprintf("channels/%s/messages", defaultMQTTId)
metricsTopic := fmt.Sprintf("%s/otlp/%s", base, defaultName)
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Client: client,
MetricsTopic: metricsTopic,
PktVisorVersion: pktvisor,
OrbAgentService: bridgeService,
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Address: defaultMQTTAddr,
Id: defaultMQTTId,
Key: defaultMQTTKey,
ChannelID: base,
TLS: defaultTLS,
MetricsTopic: metricsTopic,
}
}

func CreateDefaultSettings(logger *zap.Logger) component.ExporterCreateSettings {
return component.ExporterCreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: global.MeterProvider(),
},
BuildInfo: component.NewDefaultBuildInfo(),
func CreateConfigClient(client *mqtt.Client, metricsTopic, pktvisor string, bridgeService otel.AgentBridgeService) component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Client: client,
MetricsTopic: metricsTopic,
PktVisorVersion: pktvisor,
OrbAgentService: bridgeService,
}
}

func createTracesExporter(
ctx context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.TracesExporter, error) {
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
oce, err := newExporter(cfg, set, ctx)
if err != nil {
return nil, err
Expand All @@ -116,9 +116,9 @@ func createTracesExporter(

func CreateMetricsExporter(
ctx context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.MetricsExporter, error) {
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
oce, err := newExporter(cfg, set, ctx)
if err != nil {
return nil, err
Expand All @@ -143,9 +143,9 @@ func CreateMetricsExporter(

func createLogsExporter(
ctx context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.LogsExporter, error) {
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
oce, err := newExporter(cfg, set, ctx)
if err != nil {
return nil, err
Expand Down
35 changes: 5 additions & 30 deletions agent/otel/otlpmqttexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -34,7 +33,7 @@ func TestCreateMetricsExporter(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

set := componenttest.NewNopExporterCreateSettings()
set := exportertest.NewNopCreateSettings()
ctx := context.Background()
ctx = context.WithValue(ctx, "policy_name", "test")
ctx = context.WithValue(ctx, "policy_id", "test")
Expand Down Expand Up @@ -101,31 +100,7 @@ func TestCreateConfigClient(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
c := &tt.args.client
got := CreateConfigClient(c, tt.args.metricsTopic, " 1.0", nil)
assert.Equal(t, tt.want, got.Validate(), "expected %s but got %s", tt.want, got.Validate())
})
}
}

func TestCreateDefaultSettings(t *testing.T) {
logger, _ := zap.NewDevelopment()
type args struct {
logger *zap.Logger
}
tests := []struct {
name string
args args
}{
{
name: "ok default",
args: args{
logger: logger,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := CreateDefaultSettings(tt.args.logger)
assert.NotNil(t, got.TelemetrySettings.Logger, "expected to not be nil")
assert.Equal(t, tt.want, component.ValidateConfig(got), "expected %s but got %s", tt.want, component.ValidateConfig(got))
})
}
}
Expand All @@ -141,7 +116,7 @@ func TestCreateConfig(t *testing.T) {
tests := []struct {
name string
args args
want config.Exporter
want component.Config
}{
{
name: "local mqtt",
Expand Down
Loading