Skip to content

Commit

Permalink
add cleanup method to otelexporter (#577)
Browse files Browse the repository at this point in the history
Signed-off-by: YDMsama <ydmsama@gmail.com>
  • Loading branch information
YDMsama authored Nov 3, 2023
1 parent 4330373 commit 19b8017
Show file tree
Hide file tree
Showing 9 changed files with 543 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

## Unreleased
### Enhancements
- Add periodic memory cleanup for OtelExporter. Users can configure the restart period in hours. Disabled by deafult. ([#577](https://github.com/KindlingProject/kindling/pull/577))

## v0.8.1 - 2023-09-01
### Enhancements
- Improve the Grafana plugin's performance by reducing the amount of data requiring queries. Now the plugin queries through Grafana's api proxy. ([#555](https://github.com/KindlingProject/kindling/pull/555))
- Expand the histogram bucket of otelexpoerter (Add 1500ms). ([#563](https://github.com/KindlingProject/kindling/pull/563))
- Set default values of `store_external_src_ip` and `StoreExternalSrcIP` to false to reduce occurrences of unexpected src IP data. ([#562](https://github.com/KindlingProject/kindling/pull/562))
Expand Down
7 changes: 7 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ exporters:
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
memcleanup:
# If set to true, prometheus server will restart every `restart_period`
# to free up memory.
enable: false
# Specifies the frequency (in hours) to restart the server.
# A value of 0 disables the restart.
restart_period: 12

observability:
logger:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Config struct {
CustomLabels map[string]string `mapstructure:"custom_labels"`
MetricAggregationMap map[string]MetricAggregationKind `mapstructure:"metric_aggregation_map"`
AdapterConfig *AdapterConfig `mapstructure:"adapter_config"`
MemCleanUpConfig *MemCleanUpConfig `mapstructure:"memcleanup"`
}

type PrometheusConfig struct {
Expand All @@ -34,3 +35,9 @@ type AdapterConfig struct {
NeedPodDetail bool `mapstructure:"need_pod_detail"`
StoreExternalSrcIP bool `mapstructure:"store_external_src_ip"`
}

type MemCleanUpConfig struct{
Enabled bool `mapstructure:"enable,omitempty"`
RestartPeriod int `mapstructure:"restart_period,omitempty"`
RestartEveryNDays int `mapstructure:"restart_every_n_days,omitempty"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
)

func (e *OtelExporter) Consume(dataGroup *model.DataGroup) error {
e.mu.Lock()
defer e.mu.Unlock()

if dataGroup == nil {
// no need consume
return nil
Expand Down
120 changes: 120 additions & 0 deletions collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -44,6 +47,8 @@ const (
)

var serviceName string
var expCounter int = 0
var lastMetricsData string

type OtelOutputExporters struct {
metricExporter exportmetric.Exporter
Expand All @@ -59,6 +64,10 @@ type OtelExporter struct {
customLabels []attribute.KeyValue
instrumentFactory *instrumentFactory
telemetry *component.TelemetryTools
exp *prometheus.Exporter
rs *resource.Resource
mu sync.Mutex
restart bool

adapters []adapter.Adapter
}
Expand Down Expand Up @@ -135,6 +144,9 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
instrumentFactory: newInstrumentFactory(exp.MeterProvider().Meter(MeterName), telemetry, customLabels),
metricAggregationMap: cfg.MetricAggregationMap,
telemetry: telemetry,
exp: exp,
rs: rs,
restart: false,
adapters: []adapter.Adapter{
adapter.NewNetAdapter(customLabels, &adapter.NetAdapterConfig{
StoreTraceAsMetric: cfg.AdapterConfig.NeedTraceAsMetric,
Expand All @@ -153,6 +165,59 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
telemetry.Logger.Warn("error starting otelexporter prometheus server: ", zap.Error(err))
}
}()

if cfg.MemCleanUpConfig != nil && cfg.MemCleanUpConfig.Enabled {
if cfg.MemCleanUpConfig.RestartPeriod != 0 {
ticker := time.NewTicker(time.Duration(cfg.MemCleanUpConfig.RestartPeriod) * time.Hour)
go func() {
for {
select {
case <-ticker.C:
otelexporter.restart = true
}
}
}()
}

go func() {
metricsTicker := time.NewTicker(250 * time.Millisecond)
for {
<-metricsTicker.C
currentMetricsData, err := fetchMetricsFromEndpoint()
if err != nil {
fmt.Println("Error fetching metrics:", err)
continue
}

if currentMetricsData != lastMetricsData {
lastMetricsData = currentMetricsData

if(otelexporter.restart == true){
otelexporter.NewMeter(otelexporter.telemetry)
otelexporter.restart = false
expCounter++
}
}
}
}()

cfg.MemCleanUpConfig.RestartEveryNDays = 0
if cfg.MemCleanUpConfig.RestartEveryNDays != 0 {
go func() {
for {
now := time.Now()
next := now.Add(time.Duration(cfg.MemCleanUpConfig.RestartEveryNDays) * time.Hour * 24)
next = time.Date(next.Year(), next.Month(), next.Day(), 0, 0, 0, 0, next.Location())
duration := next.Sub(now)

time.Sleep(duration)

otelexporter.restart = true
}
}()
}
}

} else {
var collectPeriod time.Duration

Expand Down Expand Up @@ -301,3 +366,58 @@ var exponentialInt64NanosecondsBoundaries = func(bounds []float64) (asint []floa
}
return
}(exponentialInt64Boundaries)

func (e *OtelExporter) NewMeter(telemetry *component.TelemetryTools) error {
e.mu.Lock()
defer e.mu.Unlock()
config := prometheus.Config{}

newController := controller.New(
otelprocessor.NewFactory(
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries(exponentialInt64NanosecondsBoundaries),
),
aggregation.CumulativeTemporalitySelector(),
otelprocessor.WithMemory(e.cfg.PromCfg.WithMemory),
),
controller.WithResource(e.rs),
)

exp, err := prometheus.New(config, newController)
if err != nil {
telemetry.Logger.Panic("failed to initialize prometheus exporter %v", zap.Error(err))
return nil
}

if err := e.metricController.Stop(context.Background()); err != nil {
return fmt.Errorf("failed to stop old controller: %w", err)
}

e.exp = exp
e.metricController = newController
e.instrumentFactory = newInstrumentFactory(e.exp.MeterProvider().Meter(MeterName), e.telemetry, e.customLabels)

go func() {
if err := StartServer(e.exp, e.telemetry, e.cfg.PromCfg.Port); err != nil {
telemetry.Logger.Warn("error starting otelexporter prometheus server: ", zap.Error(err))
}
}()

return nil
}

func fetchMetricsFromEndpoint() (string, error) {
resp, err := http.Get("http://127.0.0.1:9500/metrics")
if err != nil {
return "", err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

return string(body), nil
}

Loading

0 comments on commit 19b8017

Please sign in to comment.