diff --git a/CHANGELOG.md b/CHANGELOG.md index aa7f70b8a..fb41baebd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ ## Unreleased ### Enhancements +- Add periodic memory cleanup for OtelExporter. Users can configure the restart period in hours. Disabled by deafult. ([#577](https://github.com/KindlingProject/kindling/pull/577)) + +## v0.8.1 - 2023-09-01 +### Enhancements - Improve the Grafana plugin's performance by reducing the amount of data requiring queries. Now the plugin queries through Grafana's api proxy. ([#555](https://github.com/KindlingProject/kindling/pull/555)) - Expand the histogram bucket of otelexpoerter (Add 1500ms). ([#563](https://github.com/KindlingProject/kindling/pull/563)) - Set default values of `store_external_src_ip` and `StoreExternalSrcIP` to false to reduce occurrences of unexpected src IP data. ([#562](https://github.com/KindlingProject/kindling/pull/562)) diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 00add4165..709f5c486 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -228,6 +228,13 @@ exporters: endpoint: 10.10.10.10:8080 stdout: collect_period: 15s + memcleanup: + # If set to true, prometheus server will restart every `restart_period` + # to free up memory. + enable: false + # Specifies the frequency (in hours) to restart the server. + # A value of 0 disables the restart. + restart_period: 12 observability: logger: diff --git a/collector/pkg/component/consumer/exporter/otelexporter/config.go b/collector/pkg/component/consumer/exporter/otelexporter/config.go index fecd802dd..dbec2956f 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/config.go +++ b/collector/pkg/component/consumer/exporter/otelexporter/config.go @@ -12,6 +12,7 @@ type Config struct { CustomLabels map[string]string `mapstructure:"custom_labels"` MetricAggregationMap map[string]MetricAggregationKind `mapstructure:"metric_aggregation_map"` AdapterConfig *AdapterConfig `mapstructure:"adapter_config"` + MemCleanUpConfig *MemCleanUpConfig `mapstructure:"memcleanup"` } type PrometheusConfig struct { @@ -34,3 +35,9 @@ type AdapterConfig struct { NeedPodDetail bool `mapstructure:"need_pod_detail"` StoreExternalSrcIP bool `mapstructure:"store_external_src_ip"` } + +type MemCleanUpConfig struct{ + Enabled bool `mapstructure:"enable,omitempty"` + RestartPeriod int `mapstructure:"restart_period,omitempty"` + RestartEveryNDays int `mapstructure:"restart_every_n_days,omitempty"` +} \ No newline at end of file diff --git a/collector/pkg/component/consumer/exporter/otelexporter/consume.go b/collector/pkg/component/consumer/exporter/otelexporter/consume.go index cab7d3f46..5a567dcd6 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/consume.go +++ b/collector/pkg/component/consumer/exporter/otelexporter/consume.go @@ -14,6 +14,9 @@ import ( ) func (e *OtelExporter) Consume(dataGroup *model.DataGroup) error { + e.mu.Lock() + defer e.mu.Unlock() + if dataGroup == nil { // no need consume return nil diff --git a/collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go b/collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go index 20e473a6d..dce3c00d8 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go +++ b/collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go @@ -4,7 +4,10 @@ import ( "context" "errors" "fmt" + "io" + "net/http" "os" + "sync" "time" "go.opentelemetry.io/otel/attribute" @@ -44,6 +47,8 @@ const ( ) var serviceName string +var expCounter int = 0 +var lastMetricsData string type OtelOutputExporters struct { metricExporter exportmetric.Exporter @@ -59,6 +64,10 @@ type OtelExporter struct { customLabels []attribute.KeyValue instrumentFactory *instrumentFactory telemetry *component.TelemetryTools + exp *prometheus.Exporter + rs *resource.Resource + mu sync.Mutex + restart bool adapters []adapter.Adapter } @@ -135,6 +144,9 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export instrumentFactory: newInstrumentFactory(exp.MeterProvider().Meter(MeterName), telemetry, customLabels), metricAggregationMap: cfg.MetricAggregationMap, telemetry: telemetry, + exp: exp, + rs: rs, + restart: false, adapters: []adapter.Adapter{ adapter.NewNetAdapter(customLabels, &adapter.NetAdapterConfig{ StoreTraceAsMetric: cfg.AdapterConfig.NeedTraceAsMetric, @@ -153,6 +165,59 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export telemetry.Logger.Warn("error starting otelexporter prometheus server: ", zap.Error(err)) } }() + + if cfg.MemCleanUpConfig != nil && cfg.MemCleanUpConfig.Enabled { + if cfg.MemCleanUpConfig.RestartPeriod != 0 { + ticker := time.NewTicker(time.Duration(cfg.MemCleanUpConfig.RestartPeriod) * time.Hour) + go func() { + for { + select { + case <-ticker.C: + otelexporter.restart = true + } + } + }() + } + + go func() { + metricsTicker := time.NewTicker(250 * time.Millisecond) + for { + <-metricsTicker.C + currentMetricsData, err := fetchMetricsFromEndpoint() + if err != nil { + fmt.Println("Error fetching metrics:", err) + continue + } + + if currentMetricsData != lastMetricsData { + lastMetricsData = currentMetricsData + + if(otelexporter.restart == true){ + otelexporter.NewMeter(otelexporter.telemetry) + otelexporter.restart = false + expCounter++ + } + } + } + }() + + cfg.MemCleanUpConfig.RestartEveryNDays = 0 + if cfg.MemCleanUpConfig.RestartEveryNDays != 0 { + go func() { + for { + now := time.Now() + next := now.Add(time.Duration(cfg.MemCleanUpConfig.RestartEveryNDays) * time.Hour * 24) + next = time.Date(next.Year(), next.Month(), next.Day(), 0, 0, 0, 0, next.Location()) + duration := next.Sub(now) + + time.Sleep(duration) + + otelexporter.restart = true + } + }() + } + } + } else { var collectPeriod time.Duration @@ -301,3 +366,58 @@ var exponentialInt64NanosecondsBoundaries = func(bounds []float64) (asint []floa } return }(exponentialInt64Boundaries) + +func (e *OtelExporter) NewMeter(telemetry *component.TelemetryTools) error { + e.mu.Lock() + defer e.mu.Unlock() + config := prometheus.Config{} + + newController := controller.New( + otelprocessor.NewFactory( + selector.NewWithHistogramDistribution( + histogram.WithExplicitBoundaries(exponentialInt64NanosecondsBoundaries), + ), + aggregation.CumulativeTemporalitySelector(), + otelprocessor.WithMemory(e.cfg.PromCfg.WithMemory), + ), + controller.WithResource(e.rs), + ) + + exp, err := prometheus.New(config, newController) + if err != nil { + telemetry.Logger.Panic("failed to initialize prometheus exporter %v", zap.Error(err)) + return nil + } + + if err := e.metricController.Stop(context.Background()); err != nil { + return fmt.Errorf("failed to stop old controller: %w", err) + } + + e.exp = exp + e.metricController = newController + e.instrumentFactory = newInstrumentFactory(e.exp.MeterProvider().Meter(MeterName), e.telemetry, e.customLabels) + + go func() { + if err := StartServer(e.exp, e.telemetry, e.cfg.PromCfg.Port); err != nil { + telemetry.Logger.Warn("error starting otelexporter prometheus server: ", zap.Error(err)) + } + }() + + return nil +} + +func fetchMetricsFromEndpoint() (string, error) { + resp, err := http.Get("http://127.0.0.1:9500/metrics") + if err != nil { + return "", err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + + return string(body), nil +} + diff --git a/collector/pkg/component/consumer/exporter/otelexporter/otelexporter_test.go b/collector/pkg/component/consumer/exporter/otelexporter/otelexporter_test.go index 878800a41..97a548851 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/otelexporter_test.go +++ b/collector/pkg/component/consumer/exporter/otelexporter/otelexporter_test.go @@ -2,7 +2,12 @@ package otelexporter import ( "context" + "fmt" "log" + "net/http" + "os" + _ "net/http/pprof" + "runtime" "strconv" "testing" "time" @@ -138,6 +143,17 @@ func makePreAggNetMetricGroup(i int) *model.DataGroup { return metricsGroup } +func appendToFile(filename, text string) error { + f, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + _, err = f.WriteString(text) + return err +} + func BenchmarkOtelExporter_Consume(b *testing.B) { dimension := 100000 @@ -262,3 +278,350 @@ func BenchmarkOtelExporter_Consume(b *testing.B) { log.Printf("Test Finished!") } + +func BenchmarkMemTest(b *testing.B) { + testDuration := time.Minute + endTime := time.Now().Add(30*testDuration) + metricCounter := 4 + recordCounter := 0 + memTicker := time.NewTicker(1 * time.Second) + + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + + go func() { + for range memTicker.C { + var m runtime.MemStats + runtime.ReadMemStats(&m) + stats := fmt.Sprintf("Alloc = %v MiB, Metric Count: %d, Record Count: %d\n", m.Alloc/1024/1024, metricCounter, recordCounter) + log.Printf(stats) + err := appendToFile("metrics_data.txt", stats) + if err != nil { + log.Println("Error writing to file:", err) + } + } + }() + + cfg := &Config{ + ExportKind: PrometheusKindExporter, + PromCfg: &PrometheusConfig{ + Port: ":9500", + WithMemory: false, + }, + OtlpGrpcCfg: &OtlpGrpcConfig{CollectPeriod: 15 * time.Second, Endpoint: "10.10.10.10:8080"}, + StdoutCfg: &StdoutConfig{CollectPeriod: 15 * time.Second}, + CustomLabels: nil, + MemCleanUpConfig: &MemCleanUpConfig{ + RestartPeriod: 12, + RestartEveryNDays: 12, + }, + + MetricAggregationMap: map[string]MetricAggregationKind{ + "kindling_entity_request_total": MACounterKind, + "kindling_entity_request_duration_nanoseconds_total": MACounterKind, + "kindling_entity_request_average_duration_nanoseconds": MAHistogramKind, + "kindling_entity_request_send_bytes_total": MACounterKind, + "kindling_entity_request_receive_bytes_total": MACounterKind, + "kindling_topology_request_total": MACounterKind, + "kindling_topology_request_duration_nanoseconds_total": MACounterKind, + "kindling_topology_request_average_duration_nanoseconds": MAHistogramKind, + "kindling_topology_request_request_bytes_total": MACounterKind, + "kindling_topology_request_response_bytes_total": MACounterKind, + "kindling_trace_request_duration_nanoseconds": MAGaugeKind, + "kindling_tcp_srtt_microseconds": MAGaugeKind, + "kindling_tcp_retransmit_total": MACounterKind, + "kindling_tcp_packet_loss_total": MACounterKind, + }, + AdapterConfig: &AdapterConfig{ + NeedTraceAsResourceSpan: false, + NeedTraceAsMetric: true, + NeedPodDetail: true, + StoreExternalSrcIP: false, + }, + } + + telemetry := component.NewDefaultTelemetryTools() + otelexporter := NewExporter(cfg, telemetry) + + i := 0 + + metricsGroup := &model.DataGroup{ + Name: constnames.AggregatedNetRequestMetricGroup, + Metrics: []*model.Metric{ + model.NewIntMetric(constvalues.ResponseIo, 1234567891), + model.NewIntMetric(constvalues.RequestTotalTime, int64(i)), + model.NewIntMetric(constvalues.RequestIo, 4500), + model.NewIntMetric(constvalues.RequestCount, 4500), + }, + Labels: model.NewAttributeMap(), + Timestamp: 19900909090, + } + + metricsGroup.Labels.AddStringValue(constlabels.SrcNode, "test-SrcNode"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.SrcNamespace, "test-SrcNamespace"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.SrcPod, "test-SrcPod"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.SrcWorkloadName, "test-SrcWorkloadName"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.SrcWorkloadKind, "test-SrcWorkloadKind"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.SrcService, "test-SrcService"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.SrcIp, "test-SrcIp"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.DstNode, "test-DstNode"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.DstNamespace, "test-DstNamespace"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.DstPod, "test-DstPod"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.DstWorkloadName, "test-DstWorkloadName"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.DstWorkloadKind, "test-DstWorkloadKind"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.DstService, "test-DstService"+strconv.Itoa(i)) + + metricsGroup.Labels.AddStringValue(constlabels.SrcContainer, "test-SrcContainer"+strconv.Itoa(i)) + metricsGroup.Labels.AddStringValue(constlabels.SrcContainerId, "test-SrcContainerId"+strconv.Itoa(i)) + + metricsGroup.Labels.AddStringValue(constlabels.Protocol, "http") + metricsGroup.Labels.AddStringValue(constlabels.StatusCode, "200") + + // Topology data preferentially use D Nat Ip and D Nat Port + metricsGroup.Labels.AddStringValue(constlabels.DstIp, "test-DnatIp") + + for time.Now().Before(endTime){ + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcNode, "test-SrcNode"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcNamespace, "test-SrcNamespace"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcPod, "test-SrcPod"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcWorkloadName, "test-SrcWorkloadName"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcWorkloadKind, "test-SrcWorkloadKind"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcService, "test-SrcService"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcIp, "test-SrcIp"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.DstNode, "test-DstNode"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.DstNamespace, "test-DstNamespace"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.DstPod, "test-DstPod"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.DstWorkloadName, "test-DstWorkloadName"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.DstWorkloadKind, "test-DstWorkloadKind"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.DstService, "test-DstService"+strconv.Itoa(i)) + + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcContainer, "test-SrcContainer"+strconv.Itoa(i)) + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcContainerId, "test-SrcContainerId"+strconv.Itoa(i)) + + metricsGroup.Labels.UpdateAddStringValue(constlabels.Protocol, "http") + metricsGroup.Labels.UpdateAddStringValue(constlabels.StatusCode, "200") + + newMetricsGroup := &model.DataGroup{ + Name: constnames.AggregatedNetRequestMetricGroup, + Metrics: []*model.Metric{ + model.NewIntMetric(constvalues.ResponseIo, 1234567891), + model.NewIntMetric(constvalues.RequestTotalTime, int64(i)), + model.NewIntMetric(constvalues.RequestIo, 4500), + model.NewIntMetric(constvalues.RequestCount, 4500), + }, + Labels: metricsGroup.Labels, + Timestamp: 19900909090, + } + + _ = otelexporter.Consume(newMetricsGroup) + recordCounter++ + i++ + time.Sleep(time.Second) //event amount control + } + + log.Printf("Test Finished!") +} + +func BenchmarkLabelTest(b *testing.B) { + testDuration := time.Minute + endTime := time.Now().Add(30*testDuration) + metricCounter := 4 + recordCounter := 0 + memTicker := time.NewTicker(1 * time.Second) + + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + + go func() { + for range memTicker.C { + var m runtime.MemStats + runtime.ReadMemStats(&m) + stats := fmt.Sprintf("Alloc = %v MiB, Metric Count: %d, Record Count: %d\n", m.Alloc/1024/1024, metricCounter, recordCounter) + log.Printf(stats) + err := appendToFile("metrics_data.txt", stats) + if err != nil { + log.Println("Error writing to file:", err) + } + } + }() + + cfg := &Config{ + ExportKind: PrometheusKindExporter, + PromCfg: &PrometheusConfig{ + Port: ":9500", + WithMemory: false, + }, + OtlpGrpcCfg: &OtlpGrpcConfig{CollectPeriod: 15 * time.Second, Endpoint: "10.10.10.10:8080"}, + StdoutCfg: &StdoutConfig{CollectPeriod: 15 * time.Second}, + CustomLabels: nil, + MemCleanUpConfig: &MemCleanUpConfig{ + RestartPeriod: 12, + RestartEveryNDays: 12, + }, + + MetricAggregationMap: map[string]MetricAggregationKind{ + "kindling_entity_request_total": MACounterKind, + "kindling_entity_request_duration_nanoseconds_total": MACounterKind, + "kindling_entity_request_average_duration_nanoseconds": MAHistogramKind, + "kindling_entity_request_send_bytes_total": MACounterKind, + "kindling_entity_request_receive_bytes_total": MACounterKind, + "kindling_topology_request_total": MACounterKind, + "kindling_topology_request_duration_nanoseconds_total": MACounterKind, + "kindling_topology_request_average_duration_nanoseconds": MAHistogramKind, + "kindling_topology_request_request_bytes_total": MACounterKind, + "kindling_topology_request_response_bytes_total": MACounterKind, + "kindling_trace_request_duration_nanoseconds": MAGaugeKind, + "kindling_tcp_srtt_microseconds": MAGaugeKind, + "kindling_tcp_retransmit_total": MACounterKind, + "kindling_tcp_packet_loss_total": MACounterKind, + }, + AdapterConfig: &AdapterConfig{ + NeedTraceAsResourceSpan: false, + NeedTraceAsMetric: true, + NeedPodDetail: true, + StoreExternalSrcIP: false, + }, + } + + telemetry := component.NewDefaultTelemetryTools() + otelexporter := NewExporter(cfg, telemetry) + + i := 0 + + metricsGroup := &model.DataGroup{ + Name: constnames.AggregatedNetRequestMetricGroup, + Metrics: []*model.Metric{ + model.NewIntMetric(constvalues.RequestTotalTime, int64(i)), + }, + Labels: model.NewAttributeMap(), + Timestamp: 19900909090, + } + + metricsGroup.Labels.AddStringValue(constlabels.SrcNode, "test-SrcNode"+strconv.Itoa(i)) + + + for time.Now().Before(endTime){ + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcNode, "test-SrcNode"+strconv.Itoa(i)) + + + newMetricsGroup := &model.DataGroup{ + Name: constnames.AggregatedNetRequestMetricGroup, + Metrics: []*model.Metric{ + model.NewIntMetric(constvalues.RequestTotalTime, int64(i)), + }, + Labels: metricsGroup.Labels, + Timestamp: 19900909090, + } + + _ = otelexporter.Consume(newMetricsGroup) + recordCounter++ + i++ + time.Sleep(time.Second) //event amount control + } + + log.Printf("Test Finished!") +} + + +func BenchmarkMetricTest(b *testing.B) { + testDuration := time.Minute + endTime := time.Now().Add(30*testDuration) + metricCounter := 4 + recordCounter := 0 + memTicker := time.NewTicker(1 * time.Second) + + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + + go func() { + for range memTicker.C { + var m runtime.MemStats + runtime.ReadMemStats(&m) + stats := fmt.Sprintf("Alloc = %v MiB, Metric Count: %d, Record Count: %d\n", m.Alloc/1024/1024, metricCounter, recordCounter) + log.Printf(stats) + err := appendToFile("metrics_data.txt", stats) + if err != nil { + log.Println("Error writing to file:", err) + } + } + }() + + cfg := &Config{ + ExportKind: PrometheusKindExporter, + PromCfg: &PrometheusConfig{ + Port: ":9500", + WithMemory: false, + }, + OtlpGrpcCfg: &OtlpGrpcConfig{CollectPeriod: 15 * time.Second, Endpoint: "10.10.10.10:8080"}, + StdoutCfg: &StdoutConfig{CollectPeriod: 15 * time.Second}, + CustomLabels: nil, + MemCleanUpConfig: &MemCleanUpConfig{ + RestartPeriod: 30, + RestartEveryNDays: 12, + }, + + MetricAggregationMap: map[string]MetricAggregationKind{ + "kindling_entity_request_total": MACounterKind, + "kindling_entity_request_duration_nanoseconds_total": MACounterKind, + "kindling_entity_request_average_duration_nanoseconds": MAHistogramKind, + "kindling_entity_request_send_bytes_total": MACounterKind, + "kindling_entity_request_receive_bytes_total": MACounterKind, + "kindling_topology_request_total": MACounterKind, + "kindling_topology_request_duration_nanoseconds_total": MACounterKind, + "kindling_topology_request_average_duration_nanoseconds": MAHistogramKind, + "kindling_topology_request_request_bytes_total": MACounterKind, + "kindling_topology_request_response_bytes_total": MACounterKind, + "kindling_trace_request_duration_nanoseconds": MAGaugeKind, + "kindling_tcp_srtt_microseconds": MAGaugeKind, + "kindling_tcp_retransmit_total": MACounterKind, + "kindling_tcp_packet_loss_total": MACounterKind, + }, + AdapterConfig: &AdapterConfig{ + NeedTraceAsResourceSpan: false, + NeedTraceAsMetric: true, + NeedPodDetail: true, + StoreExternalSrcIP: false, + }, + } + + telemetry := component.NewDefaultTelemetryTools() + otelexporter := NewExporter(cfg, telemetry) + + i := 0 + + metricsGroup := &model.DataGroup{ + Name: constnames.AggregatedNetRequestMetricGroup, + Metrics: []*model.Metric{ + model.NewIntMetric(constvalues.RequestCount, 1), + }, + Labels: model.NewAttributeMap(), + Timestamp: 19900909090, + } + + metricsGroup.Labels.AddStringValue(constlabels.SrcNode, "test-SrcNode") + + + for time.Now().Before(endTime){ + metricsGroup.Labels.UpdateAddStringValue(constlabels.SrcNode, "test-SrcNode") + + + newMetricsGroup := &model.DataGroup{ + Name: constnames.AggregatedNetRequestMetricGroup, + Metrics: []*model.Metric{ + model.NewIntMetric(constvalues.RequestCount, 1), + }, + Labels: metricsGroup.Labels, + Timestamp: 19900909090, + } + + _ = otelexporter.Consume(newMetricsGroup) + recordCounter++ + i++ + time.Sleep(time.Second) //event amount control + } + + log.Printf("Test Finished!") +} \ No newline at end of file diff --git a/collector/pkg/component/consumer/exporter/otelexporter/prometheus.go b/collector/pkg/component/consumer/exporter/otelexporter/prometheus.go index 77b353259..1560e14ec 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/prometheus.go +++ b/collector/pkg/component/consumer/exporter/otelexporter/prometheus.go @@ -1,28 +1,44 @@ package otelexporter import ( + "context" + "fmt" "net/http" + "sync" "github.com/Kindling-project/kindling/collector/pkg/component" "go.opentelemetry.io/otel/exporters/prometheus" ) +var ( + mu sync.Mutex + srv *http.Server +) + func StartServer(exporter *prometheus.Exporter, telemetry *component.TelemetryTools, port string) error { - http.HandleFunc("/metrics", exporter.ServeHTTP) + mu.Lock() + defer mu.Unlock() - srv := http.Server{ - Addr: port, - Handler: http.DefaultServeMux, + if srv != nil { + if err := srv.Shutdown(context.Background()); err != nil { + return fmt.Errorf("failed to stop server: %w", err) + } } - telemetry.Logger.Infof("Prometheus Server listening at port: [%s]", port) - err := srv.ListenAndServe() + mux := http.NewServeMux() + mux.HandleFunc("/metrics", exporter.ServeHTTP) - if err != nil && err != http.ErrServerClosed { - return err + srv = &http.Server{ + Addr: port, + Handler: mux, } - telemetry.Logger.Infof("Prometheus gracefully shutdown the http server...\n") + telemetry.Logger.Infof("Prometheus Server listening at port: [%s]", port) + + go func() { + srv.ListenAndServe() + telemetry.Logger.Infof("Prometheus gracefully shutdown the http server...\n") + }() return nil } diff --git a/collector/pkg/component/consumer/exporter/otelexporter/testdata/kindling-collector-config.yml b/collector/pkg/component/consumer/exporter/otelexporter/testdata/kindling-collector-config.yml index 3fa258ef1..b35142a2f 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/testdata/kindling-collector-config.yml +++ b/collector/pkg/component/consumer/exporter/otelexporter/testdata/kindling-collector-config.yml @@ -32,3 +32,10 @@ exporters: endpoint: 10.10.10.10:8080 stdout: collect_period: 15s + memcleanup: + # If set to true, prometheus server will restart every `restart_period` + # to free up memory. + enable: false + # Specifies the frequency (in hours) to restart the server. + # A value of 0 disables the restart. + restart_period: 12 \ No newline at end of file diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 80c455ad6..39b93e99d 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -227,6 +227,13 @@ exporters: endpoint: 10.10.10.10:8080 stdout: collect_period: 15s + memcleanup: + # If set to true, prometheus server will restart every `restart_period` + # to free up memory. + enable: false + # Specifies the frequency (in hours) to restart the server. + # A value of 0 disables the restart. + restart_period: 12 observability: logger: