Skip to content

Commit

Permalink
Test connection for destinations (#1327)
Browse files Browse the repository at this point in the history
Co-authored-by: Amir Blum <amirgiraffe@gmail.com>
  • Loading branch information
RonFed and blumamir authored Jul 7, 2024
1 parent 412b216 commit b3988f0
Show file tree
Hide file tree
Showing 16 changed files with 623 additions and 50 deletions.
3 changes: 3 additions & 0 deletions api/odigos/v1alpha1/destination_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/config"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -63,6 +64,8 @@ func init() {
SchemeBuilder.Register(&Destination{}, &DestinationList{})
}

var _ config.ExporterConfigurer = &Destination{}

/* Implement common.ExporterConfigurer */
func (dest Destination) GetID() string {
return dest.Name
Expand Down
3 changes: 3 additions & 0 deletions common/config/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const (

type Jaeger struct{}

// compile time checks
var _ Configer = (*Jaeger)(nil)

func (j *Jaeger) DestType() common.DestinationType {
return common.JaegerDestinationType
}
Expand Down
4 changes: 2 additions & 2 deletions common/config/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Calculate(dests []ExporterConfigurer, processors []ProcessorConfigurer, mem
}

func CalculateWithBase(currentConfig *Config, globalProcessors []string, dests []ExporterConfigurer, processors []ProcessorConfigurer) (string, error, *ResourceStatuses) {
configers, err := loadConfigers()
configers, err := LoadConfigers()
if err != nil {
return "", err, nil
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func getBasicConfig(memoryLimiterConfig GenericMap) (*Config, []string) {
}, []string{memoryLimiterProcessorName, "resource/odigos-version"}
}

func loadConfigers() (map[common.DestinationType]Configer, error) {
func LoadConfigers() (map[common.DestinationType]Configer, error) {
configers := map[common.DestinationType]Configer{}
for _, configer := range availableConfigers {
if _, exists := configers[configer.DestType()]; exists {
Expand Down
1 change: 1 addition & 0 deletions destinations/data/dynatrace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ spec:
type: password
required: true
secret: true
testConnectionSupported: true
1 change: 1 addition & 0 deletions destinations/data/jaeger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ spec:
componentProps:
type: text
required: true
testConnectionSupported: true
1 change: 1 addition & 0 deletions destinations/data/newrelic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ spec:
- https://otlp.nr-data.net
- https://otlp.eu01.nr-data.net
required: true
testConnectionSupported: true
1 change: 1 addition & 0 deletions destinations/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Spec struct {
}
}
Fields []Field `yaml:"fields"`
TestConnectionSupported bool `yaml:"testConnectionSupported"`
}

type Field struct {
Expand Down
51 changes: 50 additions & 1 deletion frontend/endpoints/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/gin-gonic/gin"
"github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/config"
"github.com/odigos-io/odigos/destinations"
testconnection "github.com/odigos-io/odigos/frontend/endpoints/test_connection"
"github.com/odigos-io/odigos/frontend/kube"
k8s "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -56,6 +58,21 @@ type Destination struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
}

var _ config.ExporterConfigurer = &Destination{}

func (dest Destination) GetID() string {
return dest.Name
}
func (dest Destination) GetType() common.DestinationType {
return dest.Type
}
func (dest Destination) GetConfig() map[string]string {
return dest.Fields
}
func (dest Destination) GetSignals() []common.ObservabilitySignal {
return exportedSignalsObjectToSlice(dest.ExportedSignals)
}

func GetDestinationTypes(c *gin.Context) {
var resp GetDestinationTypesResponse
itemsByCategory := make(map[string][]DestinationTypesCategoryItem)
Expand Down Expand Up @@ -153,7 +170,6 @@ func GetDestinationById(c *gin.Context, odigosns string) {
}

func CreateNewDestination(c *gin.Context, odigosns string) {

request := Destination{}
if err := c.ShouldBindJSON(&request); err != nil {
returnError(c, err)
Expand Down Expand Up @@ -214,6 +230,39 @@ func CreateNewDestination(c *gin.Context, odigosns string) {
c.JSON(201, resp)
}

func TestConnectionForDestination(c *gin.Context, odigosns string) {
request := Destination{}
if err := c.ShouldBindJSON(&request); err != nil {
returnError(c, err)
return
}

destType := request.Type

destConfig, err := getDestinationTypeConfig(destType)
if err != nil {
returnError(c, err)
return
}

if !destConfig.Spec.TestConnectionSupported {
returnError(c, fmt.Errorf("destination type %s does not support test connection", request.Type))
return
}

res := testconnection.TestConnection(c, request)
if !res.Succeeded {
c.JSON(res.StatusCode, gin.H{
"type": res.DestinationType,
"message": res.Message,
"reason": res.Reason,
})
return
}

c.Status(200)
}

func UpdateExistingDestination(c *gin.Context, odigosns string) {
destId := c.Param("id")
request := Destination{}
Expand Down
37 changes: 37 additions & 0 deletions frontend/endpoints/test_connection/otlp_test_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package testconnection

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"

"go.opentelemetry.io/collector/exporter/otlpexporter"
)

var _ ExporterConnectionTester = &otlpExporterConnectionTester{}

type otlpExporterConnectionTester struct {
f exporter.Factory
}

func NewOTLPTester() *otlpExporterConnectionTester {
return &otlpExporterConnectionTester{
f: otlpexporter.NewFactory(),
}
}

func (t *otlpExporterConnectionTester) Factory() exporter.Factory {
return t.f
}

func (t *otlpExporterConnectionTester) ModifyConfigForConnectionTest(cfg component.Config) component.Config {
otlpConf, ok := cfg.(*otlpexporter.Config)
if !ok {
return nil
}

// currently using the default timeout config of the collector - 5 seconds
// Avoid batching and retries
otlpConf.QueueConfig.Enabled = false
otlpConf.RetryConfig.Enabled = false
return otlpConf
}
36 changes: 36 additions & 0 deletions frontend/endpoints/test_connection/otlphttp_test_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package testconnection

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
)

var _ ExporterConnectionTester = &otlphttpExporterConnectionTester{}

type otlphttpExporterConnectionTester struct {
f exporter.Factory
}

func NewOTLPHTTPTester() *otlphttpExporterConnectionTester {
return &otlphttpExporterConnectionTester{
f: otlphttpexporter.NewFactory(),
}
}

func (t *otlphttpExporterConnectionTester) Factory() exporter.Factory {
return t.f
}

func (t *otlphttpExporterConnectionTester) ModifyConfigForConnectionTest(cfg component.Config) component.Config {
otlpConf, ok := cfg.(*otlphttpexporter.Config)
if !ok {
return nil
}

// currently using the default timeout config of the collector - 5 seconds
// Avoid batching and retries
otlpConf.QueueConfig.Enabled = false
otlpConf.RetryConfig.Enabled = false
return otlpConf
}
157 changes: 157 additions & 0 deletions frontend/endpoints/test_connection/test_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package testconnection

import (
"context"
"fmt"
"net/http"
"strings"

"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/config"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/ptrace"
)

var (
configres map[common.DestinationType]config.Configer
connectionTesters = []ExporterConnectionTester{
NewOTLPTester(), // "otlp/" prefix
NewOTLPHTTPTester(), // "otlphttp/" prefix
}
)

func init() {
var err error
configres, err = config.LoadConfigers()
if err != nil {
panic(1)
}
}

type TestConnectionErrorReason string

const (
UnKnownDestination TestConnectionErrorReason = "unknown destination"
InvalidConfig TestConnectionErrorReason = "invalid config"
UnsupportedExporterType TestConnectionErrorReason = "unsupported exporter type"
FailedToConnect TestConnectionErrorReason = "failed to connect"
)

type TestConnectionResult struct {
Succeeded bool
Message string
Reason TestConnectionErrorReason
StatusCode int
DestinationType common.DestinationType
}

type ExporterConnectionTester interface {
// Factory returns the exporter factory for the exporter type.
// This is used to create the exporter instance for testing the connection.
Factory() exporter.Factory
// ModifyConfigForConnectionTest modifies the exporter configuration for testing the connection.
// Since the default configuration may have batching, retries, etc. which may not be suitable for testing the connection.
ModifyConfigForConnectionTest(component.Config) component.Config
}

func getConnectionTester(exporterID string) ExporterConnectionTester {
for _, tester := range connectionTesters {
prefix := fmt.Sprintf("%s/", tester.Factory().Type().String())
if strings.HasPrefix(exporterID, prefix) {
return tester
}
}
return nil
}

func TestConnection(ctx context.Context, dest config.ExporterConfigurer) TestConnectionResult {
destType := dest.GetType()
configer, ok := configres[destType]
if !ok {
return TestConnectionResult{Succeeded: false, Reason: UnKnownDestination, DestinationType: destType, StatusCode: http.StatusNotImplemented}
}

currentConfig := config.Config{
Exporters: make(config.GenericMap),
Service: config.Service{
Pipelines: make(map[string]config.Pipeline),
},
}
err := configer.ModifyConfig(dest, &currentConfig)
if err != nil {
return TestConnectionResult{Succeeded: false, Message: err.Error(), Reason: InvalidConfig, DestinationType: destType, StatusCode: http.StatusInternalServerError}
}

exporters := currentConfig.Exporters
if len(exporters) == 0 {
return TestConnectionResult{Message: "no exporters found in config", Reason: InvalidConfig, DestinationType: destType, StatusCode: http.StatusInternalServerError, Succeeded: false}
}

var exporterRawConfig config.GenericMap
var connectionTester ExporterConnectionTester
foundTester := false
for componentID, cfg := range exporters {
gm, ok := cfg.(config.GenericMap)
if !ok {
continue
}
ct := getConnectionTester(componentID)
if ct != nil {
connectionTester = ct
foundTester = true
exporterRawConfig = gm
break
}
}

if !foundTester {
return TestConnectionResult{Succeeded: false, Message: "no supported exporter found in config", Reason: UnsupportedExporterType, DestinationType: destType, StatusCode: http.StatusNotFound}
}

// before testing the connection, replace placeholders (if exists) in the config with actual values
replacePlaceholders(exporterRawConfig, dest.GetConfig())
defaultConfig := connectionTester.Factory().CreateDefaultConfig()
connectionTester.ModifyConfigForConnectionTest(defaultConfig)

// convert the user provided fields to a collector config
exportersConf := confmap.NewFromStringMap(exporterRawConfig)
if exportersConf == nil {
return TestConnectionResult{Succeeded: false, Message: "failed to create exporter config", Reason: InvalidConfig, DestinationType: destType, StatusCode: http.StatusInternalServerError}
}

// unmarshal the user provided configuration into the default one, merging them
err = exportersConf.Unmarshal(&defaultConfig)
if err != nil {
return TestConnectionResult{Succeeded: false, Message: err.Error(), Reason: InvalidConfig, DestinationType: destType, StatusCode: http.StatusInternalServerError}
}

if validator, ok := defaultConfig.(component.ConfigValidator); ok {
// if the component has a Validate method, call it to validate the configuration
err = validator.Validate()
if err != nil {
return TestConnectionResult{Succeeded: false, Message: err.Error(), Reason: InvalidConfig, DestinationType: destType, StatusCode: http.StatusInternalServerError}
}
}

exporter, err := connectionTester.Factory().CreateTracesExporter(ctx, exportertest.NewNopCreateSettings(), defaultConfig)
if err != nil {
return TestConnectionResult{Succeeded: false, Message: err.Error(), Reason: InvalidConfig, DestinationType: destType, StatusCode: http.StatusInternalServerError}
}

err = exporter.Start(ctx, nil)
if err != nil {
return TestConnectionResult{Succeeded: false, Message: err.Error(), Reason: FailedToConnect, DestinationType: destType, StatusCode: http.StatusInternalServerError}
}

defer exporter.Shutdown(ctx)
err = exporter.ConsumeTraces(ctx, ptrace.NewTraces())
if err != nil {
return TestConnectionResult{Succeeded: false, Message: err.Error(), Reason: FailedToConnect, DestinationType: destType, StatusCode: http.StatusInternalServerError}
}

return TestConnectionResult{Succeeded: true, DestinationType: destType, StatusCode: http.StatusOK}
}
Loading

0 comments on commit b3988f0

Please sign in to comment.