Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cleanup method to otelexporter #577

Merged
merged 8 commits into from
Nov 3, 2023
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

## Unreleased
### Enhancements
- Add periodic memory cleanup for OtelExporter. Users can configure the restart period in hours or days. Default setting is a restart every 12 hours. ([#577](https://github.com/KindlingProject/kindling/pull/577))

## v0.8.1 - 2023-09-01
### Enhancements
- Improve the Grafana plugin's performance by reducing the amount of data requiring queries. Now the plugin queries through Grafana's api proxy. ([#555](https://github.com/KindlingProject/kindling/pull/555))
- Expand the histogram bucket of otelexpoerter (Add 1500ms). ([#563](https://github.com/KindlingProject/kindling/pull/563))
- Set default values of `store_external_src_ip` and `StoreExternalSrcIP` to false to reduce occurrences of unexpected src IP data. ([#562](https://github.com/KindlingProject/kindling/pull/562))
Expand Down
7 changes: 7 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ exporters:
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
memcleanup:
# unit: Hour
# set 0 to disable
restart_period: 12
# unit: Day
# set 0 to disable (restart at night)
restart_every_n_days: 0

observability:
logger:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Config struct {
CustomLabels map[string]string `mapstructure:"custom_labels"`
MetricAggregationMap map[string]MetricAggregationKind `mapstructure:"metric_aggregation_map"`
AdapterConfig *AdapterConfig `mapstructure:"adapter_config"`
MemCleanUpConfig *MemCleanUpConfig `mapstructure:"memcleanup"`
}

type PrometheusConfig struct {
Expand All @@ -34,3 +35,8 @@ type AdapterConfig struct {
NeedPodDetail bool `mapstructure:"need_pod_detail"`
StoreExternalSrcIP bool `mapstructure:"store_external_src_ip"`
}

type MemCleanUpConfig struct{
RestartPeriod int `mapstructure:"restart_period,omitempty"`
RestartEveryNDays int `mapstructure:"restart_every_n_days,omitempty"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
)

func (e *OtelExporter) Consume(dataGroup *model.DataGroup) error {
e.mu.Lock()
defer e.mu.Unlock()

if dataGroup == nil {
// no need consume
return nil
Expand Down
119 changes: 119 additions & 0 deletions collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -44,6 +47,8 @@ const (
)

var serviceName string
var expCounter int = 0
var lastMetricsData string

type OtelOutputExporters struct {
metricExporter exportmetric.Exporter
Expand All @@ -59,6 +64,10 @@ type OtelExporter struct {
customLabels []attribute.KeyValue
instrumentFactory *instrumentFactory
telemetry *component.TelemetryTools
exp *prometheus.Exporter
rs *resource.Resource
mu sync.Mutex
restart bool

adapters []adapter.Adapter
}
Expand Down Expand Up @@ -135,6 +144,9 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
instrumentFactory: newInstrumentFactory(exp.MeterProvider().Meter(MeterName), telemetry, customLabels),
metricAggregationMap: cfg.MetricAggregationMap,
telemetry: telemetry,
exp: exp,
rs: rs,
restart: false,
adapters: []adapter.Adapter{
adapter.NewNetAdapter(customLabels, &adapter.NetAdapterConfig{
StoreTraceAsMetric: cfg.AdapterConfig.NeedTraceAsMetric,
Expand All @@ -153,6 +165,58 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
telemetry.Logger.Warn("error starting otelexporter prometheus server: ", zap.Error(err))
}
}()

if cfg.MemCleanUpConfig != nil {
if cfg.MemCleanUpConfig.RestartPeriod != 0 {
ticker := time.NewTicker(time.Duration(cfg.MemCleanUpConfig.RestartPeriod) * time.Hour)
go func() {
for {
select {
case <-ticker.C:
otelexporter.restart = true
}
}
}()
}

go func() {
metricsTicker := time.NewTicker(250 * time.Millisecond)
for {
<-metricsTicker.C
currentMetricsData, err := fetchMetricsFromEndpoint()
if err != nil {
fmt.Println("Error fetching metrics:", err)
continue
}

if currentMetricsData != lastMetricsData {
lastMetricsData = currentMetricsData

if(otelexporter.restart == true){
otelexporter.NewMeter(otelexporter.telemetry)
otelexporter.restart = false
expCounter++
}
}
}
}()

if cfg.MemCleanUpConfig.RestartEveryNDays != 0 {
go func() {
for {
now := time.Now()
next := now.Add(time.Duration(cfg.MemCleanUpConfig.RestartEveryNDays) * time.Hour * 24)
next = time.Date(next.Year(), next.Month(), next.Day(), 0, 0, 0, 0, next.Location())
duration := next.Sub(now)

time.Sleep(duration)

otelexporter.restart = true
}
}()
}
}

} else {
var collectPeriod time.Duration

Expand Down Expand Up @@ -301,3 +365,58 @@ var exponentialInt64NanosecondsBoundaries = func(bounds []float64) (asint []floa
}
return
}(exponentialInt64Boundaries)

func (e *OtelExporter) NewMeter(telemetry *component.TelemetryTools) error {
e.mu.Lock()
defer e.mu.Unlock()
config := prometheus.Config{}

newController := controller.New(
otelprocessor.NewFactory(
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries(exponentialInt64NanosecondsBoundaries),
),
aggregation.CumulativeTemporalitySelector(),
otelprocessor.WithMemory(e.cfg.PromCfg.WithMemory),
),
controller.WithResource(e.rs),
)

exp, err := prometheus.New(config, newController)
if err != nil {
telemetry.Logger.Panic("failed to initialize prometheus exporter %v", zap.Error(err))
return nil
}

if err := e.metricController.Stop(context.Background()); err != nil {
return fmt.Errorf("failed to stop old controller: %w", err)
}

e.exp = exp
e.metricController = newController
e.instrumentFactory = newInstrumentFactory(e.exp.MeterProvider().Meter(MeterName), e.telemetry, e.customLabels)

go func() {
if err := StartServer(e.exp, e.telemetry, e.cfg.PromCfg.Port); err != nil {
telemetry.Logger.Warn("error starting otelexporter prometheus server: ", zap.Error(err))
}
}()

return nil
}

func fetchMetricsFromEndpoint() (string, error) {
resp, err := http.Get("http://127.0.0.1:9500/metrics")
if err != nil {
return "", err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

return string(body), nil
}

Loading