Skip to content

Commit

Permalink
feat: move default node collector port to avoid conflicts and add set…
Browse files Browse the repository at this point in the history
…ting in config (#1618)

Since node collector runs as a DaemonSet with host network (so that otlp
ports 4317 and 4318 are accessible from the pods exporters), it also
shares the ports namespace with the node.

The ports it binds to might conflict with ports of other tools installed
in the cluster that does similar things, and in this case data
collection will fail to start with error: "listen tcp 0.0.0.0:8888:
bind: address already in use".

The collectors currently expose and scrape their own metrics on port
`8888` which is good candidate for collistions.

This PR:

- makes the default port for own telemetry endpoint for node collector
`55682` to have low chance of collision with something that already runs
on the node (instead of using `8888` as default).
- Adds an option to odigos config to set the port, in case one needs it
hard-coded to a specific value to manage the public ports on the node).

The port to use is written on the collectors group CRD by the scheduler
to abstract away the default and odigos config resolving, and allow the
resolved value to be more easer to consume.

For the cluster collector, the collector runs as deployment with k8s
service, thus we can safely use `8888` without colliding with anything
else
  • Loading branch information
blumamir authored Oct 25, 2024
1 parent df56ec0 commit 34ca2c3
Show file tree
Hide file tree
Showing 21 changed files with 331 additions and 279 deletions.
7 changes: 7 additions & 0 deletions api/config/crd/bases/odigos.io_collectorsgroups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,19 @@ spec:
spec:
description: CollectorsGroupSpec defines the desired state of Collector
properties:
collectorOwnMetricsPort:
description: |-
The port to use for exposing the collector's own metrics as a prometheus endpoint.
This can be used to resolve conflicting ports when a collector is using the host network.
format: int32
type: integer
role:
enum:
- CLUSTER_GATEWAY
- NODE_COLLECTOR
type: string
required:
- collectorOwnMetricsPort
- role
type: object
status:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/odigos/v1alpha1/collectorsgroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
// CollectorsGroupSpec defines the desired state of Collector
type CollectorsGroupSpec struct {
Role CollectorsGroupRole `json:"role"`

// The port to use for exposing the collector's own metrics as a prometheus endpoint.
// This can be used to resolve conflicting ports when a collector is using the host network.
CollectorOwnMetricsPort int32 `json:"collectorOwnMetricsPort"`
}

// CollectorsGroupStatus defines the observed state of Collector
Expand Down
27 changes: 3 additions & 24 deletions autoscaler/controllers/collectorsgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/autoscaler/controllers/datacollection"
"github.com/odigos-io/odigos/autoscaler/controllers/gateway"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -32,28 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

type onlyCreatePredicate struct {
predicate.Funcs
}

func (i *onlyCreatePredicate) Create(e event.CreateEvent) bool {
return true
}

func (i *onlyCreatePredicate) Update(e event.UpdateEvent) bool {
return false
}

func (i *onlyCreatePredicate) Delete(e event.DeleteEvent) bool {
return false
}

func (i *onlyCreatePredicate) Generic(e event.GenericEvent) bool {
return false
}

var _ predicate.Predicate = &onlyCreatePredicate{}

// CollectorsGroupReconciler reconciles a CollectorsGroup object
type CollectorsGroupReconciler struct {
client.Client
Expand Down Expand Up @@ -102,6 +79,8 @@ func (r *CollectorsGroupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
func (r *CollectorsGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&odigosv1.CollectorsGroup{}).
WithEventFilter(&onlyCreatePredicate{}).
// we assume everything in the collectorsgroup spec is the configuration for the collectors to generate.
// thus, we need to monitor any change to the spec which is what the generation field is for.
WithEventFilter(&predicate.GenerationChangedPredicate{}).
Complete(r)
}
16 changes: 8 additions & 8 deletions autoscaler/controllers/datacollection/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func createConfigMap(desired *v1.ConfigMap, ctx context.Context, c client.Client

func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
datacollection *odigosv1.CollectorsGroup, scheme *runtime.Scheme, setTracesLoadBalancer bool, disableNameProcessor bool) (*v1.ConfigMap, error) {
cmData, err := calculateConfigMapData(apps, dests, processors, setTracesLoadBalancer, disableNameProcessor)
cmData, err := calculateConfigMapData(datacollection, apps, dests, processors, setTracesLoadBalancer, disableNameProcessor)
if err != nil {
return nil, err
}
Expand All @@ -124,16 +124,16 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig
return &desired, nil
}

func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
setTracesLoadBalancer bool, disableNameProcessor bool) (string, error) {

ownMetricsPort := collectorsGroup.Spec.CollectorOwnMetricsPort

empty := struct{}{}

processorsCfg, tracesProcessors, metricsProcessors, logsProcessors, errs := config.GetCrdProcessorsConfigMap(commonconf.ToProcessorConfigurerArray(processors))
if errs != nil {
for name, err := range errs {
log.Log.V(0).Info(err.Error(), "processor", name)
}
for name, err := range errs {
log.Log.V(0).Info(err.Error(), "processor", name)
}

if !disableNameProcessor {
Expand Down Expand Up @@ -214,7 +214,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
"targets": []string{fmt.Sprintf("127.0.0.1:%d", ownMetricsPort)},
},
},
"metric_relabel_configs": []config.GenericMap{
Expand Down Expand Up @@ -247,7 +247,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o
Extensions: []string{"health_check"},
Telemetry: config.Telemetry{
Metrics: config.GenericMap{
"address": "0.0.0.0:8888",
"address": fmt.Sprintf("0.0.0.0:%d", ownMetricsPort),
},
Resource: map[string]*string{
// The collector add "otelcol" as a service name, so we need to remove it
Expand Down
22 changes: 12 additions & 10 deletions autoscaler/controllers/gateway/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ const (
)

var (
errNoPipelineConfigured = errors.New("no pipeline was configured, cannot add self telemetry pipeline")
errNoPipelineConfigured = errors.New("no pipeline was configured, cannot add self telemetry pipeline")
errNoReceiversConfigured = errors.New("no receivers were configured, cannot add self telemetry pipeline")
errNoExportersConfigured = errors.New("no exporters were configured, cannot add self telemetry pipeline")
)

func addSelfTelemetryPipeline(c *config.Config) error {
func addSelfTelemetryPipeline(c *config.Config, ownTelemetryPort int32) error {
if c.Service.Pipelines == nil {
return errNoPipelineConfigured
}
Expand All @@ -47,18 +47,18 @@ func addSelfTelemetryPipeline(c *config.Config) error {
"config": config.GenericMap{
"scrape_configs": []config.GenericMap{
{
"job_name": "otelcol",
"job_name": "otelcol",
"scrape_interval": "10s",
"static_configs": []config.GenericMap{
{
"targets": []string{"127.0.0.1:8888"},
"targets": []string{fmt.Sprintf("127.0.0.1:%d", ownTelemetryPort)},
},
},
"metric_relabel_configs": []config.GenericMap{
{
"source_labels": []string{"__name__"},
"regex": "(.*odigos.*|^otelcol_processor_accepted.*|^otelcol_exporter_sent.*)",
"action": "keep",
"regex": "(.*odigos.*|^otelcol_processor_accepted.*|^otelcol_exporter_sent.*)",
"action": "keep",
},
},
},
Expand Down Expand Up @@ -91,13 +91,13 @@ func addSelfTelemetryPipeline(c *config.Config) error {
},
}
c.Service.Pipelines["metrics/otelcol"] = config.Pipeline{
Receivers: []string{"prometheus/self-metrics"},
Receivers: []string{"prometheus/self-metrics"},
Processors: []string{"resource/pod-name"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
Exporters: []string{"otlp/odigos-own-telemetry-ui"},
}

c.Service.Telemetry.Metrics = config.GenericMap{
"address": "0.0.0.0:8888",
"address": fmt.Sprintf("0.0.0.0:%d", ownTelemetryPort),
}

for pipelineName, pipeline := range c.Service.Pipelines {
Expand Down Expand Up @@ -126,7 +126,9 @@ func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.Proc
common.ToExporterConfigurerArray(dests),
common.ToProcessorConfigurerArray(processors),
memoryLimiterConfiguration,
addSelfTelemetryPipeline,
func(c *config.Config) error {
return addSelfTelemetryPipeline(c, gateway.Spec.CollectorOwnMetricsPort)
},
)
if err != nil {
logger.Error(err, "Failed to calculate config")
Expand Down
14 changes: 8 additions & 6 deletions autoscaler/controllers/gateway/configmap_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package gateway

import (
"fmt"
"testing"

"github.com/odigos-io/odigos/common/config"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/stretchr/testify/assert"
)

Expand All @@ -12,7 +14,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) {
cases := []struct {
name string
cfg *config.Config
err error
err error
}{
{
name: "no pipeline",
Expand Down Expand Up @@ -66,7 +68,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) {
},
Processors: config.GenericMap{
"memory_limiter": config.GenericMap{
"check_interval": "1s",
"check_interval": "1s",
},
"resource/odigos-version": config.GenericMap{
"attributes": []config.GenericMap{
Expand Down Expand Up @@ -98,7 +100,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) {
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
c := tc.cfg
Expand All @@ -115,15 +117,15 @@ func TestAddSelfTelemetryPipeline(t *testing.T) {
assert.Equal(t, []string{"prometheus"}, c.Service.Pipelines["metrics/otelcol"].Receivers)
assert.Equal(t, []string{"resource/pod-name"}, c.Service.Pipelines["metrics/otelcol"].Processors)
assert.Equal(t, []string{"otlp/ui"}, c.Service.Pipelines["metrics/otelcol"].Exporters)
assert.Equal(t, "0.0.0.0:8888", c.Service.Telemetry.Metrics["address"])
assert.Equal(t, fmt.Sprintf("0.0.0.0:%d", consts.OdigosNodeCollectorOwnTelemetryPortDefault), c.Service.Telemetry.Metrics["address"])
for pipelineName, pipeline := range c.Service.Pipelines {
if pipelineName == "metrics/otelcol" {
assert.NotContains(t, pipeline.Processors, "odigostrafficmetrics")
} else {
assert.Equal(t, pipeline.Processors[len(pipeline.Processors) - 1], "odigostrafficmetrics")
assert.Equal(t, pipeline.Processors[len(pipeline.Processors)-1], "odigostrafficmetrics")
}

}
})
}
}
}
6 changes: 3 additions & 3 deletions autoscaler/controllers/gateway/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func syncService(gateway *odigosv1.CollectorsGroup, ctx context.Context, c clien
}

result, err := controllerutil.CreateOrPatch(ctx, c, gatewaySvc, func() error {
updateGatewaySvc(gatewaySvc)
updateGatewaySvc(gatewaySvc, gateway)
return nil
})

Expand All @@ -67,7 +67,7 @@ func syncService(gateway *odigosv1.CollectorsGroup, ctx context.Context, c clien
return gatewaySvc, nil
}

func updateGatewaySvc(svc *v1.Service) {
func updateGatewaySvc(svc *v1.Service, collectorsGroup *odigosv1.CollectorsGroup) {
svc.Spec.Ports = []v1.ServicePort{
{
Name: "otlp",
Expand All @@ -83,7 +83,7 @@ func updateGatewaySvc(svc *v1.Service) {
},
{
Name: "metrics",
Port: 8888,
Port: collectorsGroup.Spec.CollectorOwnMetricsPort,
},
}

Expand Down
7 changes: 7 additions & 0 deletions common/odigos_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package common

type ProfileName string

type CollectorNodeConfiguration struct {
// The port to use for exposing the collector's own metrics as a prometheus endpoint.
// This can be used to resolve conflicting ports when a collector is using the host network.
CollectorOwnMetricsPort int32 `json:"collectorOwnMetricsPort,omitempty"`
}

type CollectorGatewayConfiguration struct {
// RequestMemoryMiB is the memory request for the cluster gateway collector deployment.
// it will be embedded in the deployment as a resource request of the form "memory: <value>Mi"
Expand Down Expand Up @@ -38,6 +44,7 @@ type OdigosConfiguration struct {
AutoscalerImage string `json:"autoscalerImage,omitempty"`
DefaultSDKs map[ProgrammingLanguage]OtelSdk `json:"defaultSDKs,omitempty"`
CollectorGateway *CollectorGatewayConfiguration `json:"collectorGateway,omitempty"`
CollectorNode *CollectorNodeConfiguration `json:"collectorNode,omitempty"`
Profiles []ProfileName `json:"profiles,omitempty"`

// this is internal currently, and is not exposed on the CLI / helm
Expand Down
6 changes: 6 additions & 0 deletions helm/odigos/templates/odigos-config-cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ data:
goMemLimitMiB: {{ . }}
{{- end }}
{{- end }}
{{- if .Values.collectorNode }}
collectorNode:
{{- with .Values.collectorNode.collectorOwnMetricsPort }}
collectorOwnMetricsPort: {{ . }}
{{- end }}
{{- end }}
instrumentorImage: {{ .Values.instrumentor.image.repository }}
telemetryEnabled: {{ .Values.telemetry.enabled }}
openshiftEnabled: {{ .Values.openshift.enabled }}
Expand Down
5 changes: 5 additions & 0 deletions helm/odigos/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ collectorGateway:
# if not specified, it will be set to 80% of the hard limit of the memory limiter.
goMemLimitMiB: 340

collectorNode:
# The port to use for exposing the collector's own metrics as a prometheus endpoint.
# This can be used to resolve conflicting ports when a collector is using the host network.
collectorOwnMetricsPort: 55682

autoscaler:
image:
repository: keyval/odigos-autoscaler
Expand Down
7 changes: 4 additions & 3 deletions k8sutils/pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ const (
)

const (
OdigosNodeCollectorDaemonSetName = "odigos-data-collection"
OdigosNodeCollectorConfigMapName = OdigosNodeCollectorDaemonSetName
OdigosNodeCollectorCollectorGroupName = OdigosNodeCollectorDaemonSetName
OdigosNodeCollectorDaemonSetName = "odigos-data-collection"
OdigosNodeCollectorConfigMapName = OdigosNodeCollectorDaemonSetName
OdigosNodeCollectorCollectorGroupName = OdigosNodeCollectorDaemonSetName
OdigosNodeCollectorOwnTelemetryPortDefault = int32(55682)

OdigosNodeCollectorConfigMapKey = "conf" // this key is different than the cluster collector value. not sure why
)
15 changes: 12 additions & 3 deletions k8sutils/pkg/utils/collectorgroup_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ package utils

import (
"context"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func CreateCollectorGroup(ctx context.Context, c client.Client, collectorGroup *odigosv1.CollectorsGroup) error {
log.FromContext(ctx).Info("Creating collector group", "collectorGroupName", collectorGroup.Name)
return c.Create(ctx, collectorGroup)
func ApplyCollectorGroup(ctx context.Context, c client.Client, collectorGroup *odigosv1.CollectorsGroup) error {
logger := log.FromContext(ctx)
logger.Info("Applying collector group", "collectorGroupName", collectorGroup.Name)

err := c.Patch(ctx, collectorGroup, client.Apply, client.ForceOwnership, client.FieldOwner("scheduler"))
if err != nil {
logger.Error(err, "Failed to apply collector group")
return err
}

return nil
}

func GetCollectorGroup(ctx context.Context, c client.Client, namespace string, collectorGroupName string) (*odigosv1.CollectorsGroup, error) {
Expand Down
Loading

0 comments on commit 34ca2c3

Please sign in to comment.