Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/add filters for the debug level logs #300

Merged
merged 12 commits into from
Aug 9, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

## v0.3.0 - 2022-06-29
### New features
- Add an option name `debug_selector` to filter debug_log from different components ([#300](https://github.com/CloudDectective-Harmonycloud/kindling/pull/300))
- Add a URL clustering method to reduce the cardinality of the entity metrics. Configuration options are provided to choose which method to use. ([#268](https://github.com/CloudDectective-Harmonycloud/kindling/pull/268))
- Display connection failure metrics in the Grafana-plugin ([#255](https://github.com/CloudDectective-Harmonycloud/kindling/pull/255))
- Add the metrics that describe how many times the TCP connections have been made ([#234](https://github.com/CloudDectective-Harmonycloud/kindling/pull/234) [#235](https://github.com/CloudDectective-Harmonycloud/kindling/pull/235) [#236](https://github.com/CloudDectective-Harmonycloud/kindling/pull/236) [#237](https://github.com/CloudDectective-Harmonycloud/kindling/pull/237))
6 changes: 6 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
@@ -159,6 +159,12 @@ observability:
logger:
console_level: info # debug,info,warn,error,none
file_level: info
# debug_selector is used to filter debug message from different components
# 1. This filter will not take effect when there is no element in the debug_selector list
# 2. If the list is not empty, only the components contained in this list will print debug message
# 3. The name of each component is defined above, such as `receiver.cgoreceiver`, `exporter.otelexporter`, only the second part of the config name needed
# e.g debug_selector: ["cgoreceiver","otelexporter"]
debug_selector: []
file_rotation:
filename: agent.log
maxsize: 512 #MB
22 changes: 11 additions & 11 deletions collector/internal/application/application.go
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ func New() (*Application, error) {
}

func (a *Application) Run() error {
err := a.analyzerManager.StartAll(a.telemetry.Telemetry.Logger)
err := a.analyzerManager.StartAll(a.telemetry.GetGlobalTelemetryTools().Logger)
if err != nil {
return fmt.Errorf("failed to start application: %v", err)
}
@@ -65,7 +65,7 @@ func (a *Application) Run() error {
}

func (a *Application) Shutdown() error {
return multierr.Combine(a.receiver.Shutdown(), a.analyzerManager.ShutdownAll(a.telemetry.Telemetry.Logger))
return multierr.Combine(a.receiver.Shutdown(), a.analyzerManager.ShutdownAll(a.telemetry.GetGlobalTelemetryTools().Logger))
}

func initFlags() error {
@@ -103,27 +103,27 @@ func (a *Application) buildPipeline() error {
// TODO: Build pipeline via configuration to implement dependency injection
// Initialize exporters
otelExporterFactory := a.componentsFactory.Exporters[otelexporter.Otel]
otelExporter := otelExporterFactory.NewFunc(otelExporterFactory.Config, a.telemetry.Telemetry)
otelExporter := otelExporterFactory.NewFunc(otelExporterFactory.Config, a.telemetry.GetTelemetryTools(otelexporter.Otel))
// Initialize all processors
// 1. DataGroup Aggregator
aggregateProcessorFactory := a.componentsFactory.Processors[aggregateprocessor.Type]
aggregateProcessor := aggregateProcessorFactory.NewFunc(aggregateProcessorFactory.Config, a.telemetry.Telemetry, otelExporter)
aggregateProcessor := aggregateProcessorFactory.NewFunc(aggregateProcessorFactory.Config, a.telemetry.GetTelemetryTools(aggregateprocessor.Type), otelExporter)
// 2. Kubernetes metadata processor
k8sProcessorFactory := a.componentsFactory.Processors[k8sprocessor.K8sMetadata]
k8sMetadataProcessor := k8sProcessorFactory.NewFunc(k8sProcessorFactory.Config, a.telemetry.Telemetry, aggregateProcessor)
k8sMetadataProcessor := k8sProcessorFactory.NewFunc(k8sProcessorFactory.Config, a.telemetry.GetTelemetryTools(k8sprocessor.K8sMetadata), aggregateProcessor)
// Initialize all analyzers
// 1. Common network request analyzer
networkAnalyzerFactory := a.componentsFactory.Analyzers[network.Network.String()]
// Now NetworkAnalyzer must be initialized before any other analyzers, because it will
// use its configuration to initialize the conntracker module which is also used by others.
networkAnalyzer := networkAnalyzerFactory.NewFunc(networkAnalyzerFactory.Config, a.telemetry.Telemetry, []consumer.Consumer{k8sMetadataProcessor})
networkAnalyzer := networkAnalyzerFactory.NewFunc(networkAnalyzerFactory.Config, a.telemetry.GetTelemetryTools(network.Network.String()), []consumer.Consumer{k8sMetadataProcessor})
// 2. Layer 4 TCP events analyzer
aggregateProcessorForTcp := aggregateProcessorFactory.NewFunc(aggregateProcessorFactory.Config, a.telemetry.Telemetry, otelExporter)
k8sMetadataProcessor2 := k8sProcessorFactory.NewFunc(k8sProcessorFactory.Config, a.telemetry.Telemetry, aggregateProcessorForTcp)
aggregateProcessorForTcp := aggregateProcessorFactory.NewFunc(aggregateProcessorFactory.Config, a.telemetry.GetTelemetryTools(aggregateprocessor.Type), otelExporter)
k8sMetadataProcessor2 := k8sProcessorFactory.NewFunc(k8sProcessorFactory.Config, a.telemetry.GetTelemetryTools(aggregateprocessor.Type), aggregateProcessorForTcp)
tcpAnalyzerFactory := a.componentsFactory.Analyzers[tcpmetricanalyzer.TcpMetric.String()]
tcpAnalyzer := tcpAnalyzerFactory.NewFunc(tcpAnalyzerFactory.Config, a.telemetry.Telemetry, []consumer.Consumer{k8sMetadataProcessor2})
tcpAnalyzer := tcpAnalyzerFactory.NewFunc(tcpAnalyzerFactory.Config, a.telemetry.GetTelemetryTools(tcpconnectanalyzer.Type.String()), []consumer.Consumer{k8sMetadataProcessor2})
tcpConnectAnalyzerFactory := a.componentsFactory.Analyzers[tcpconnectanalyzer.Type.String()]
tcpConnectAnalyzer := tcpConnectAnalyzerFactory.NewFunc(tcpConnectAnalyzerFactory.Config, a.telemetry.Telemetry, []consumer.Consumer{k8sMetadataProcessor})
tcpConnectAnalyzer := tcpConnectAnalyzerFactory.NewFunc(tcpConnectAnalyzerFactory.Config, a.telemetry.GetTelemetryTools(tcpconnectanalyzer.Type.String()), []consumer.Consumer{k8sMetadataProcessor})
// Initialize receiver packaged with multiple analyzers
analyzerManager, err := analyzer.NewManager(networkAnalyzer, tcpAnalyzer, tcpConnectAnalyzer)
if err != nil {
@@ -132,7 +132,7 @@ func (a *Application) buildPipeline() error {
a.analyzerManager = analyzerManager

cgoReceiverFactory := a.componentsFactory.Receivers[cgoreceiver.Cgo]
cgoReceiver := cgoReceiverFactory.NewFunc(cgoReceiverFactory.Config, a.telemetry.Telemetry, analyzerManager)
cgoReceiver := cgoReceiverFactory.NewFunc(cgoReceiverFactory.Config, a.telemetry.GetTelemetryTools(cgoreceiver.Cgo), analyzerManager)
a.receiver = cgoReceiver
return nil
}
10 changes: 5 additions & 5 deletions collector/pkg/component/analyzer/manager.go
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@ package analyzer
import (
"errors"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
)

const ConsumeAllEvents = "consumeAllEvents"
@@ -54,9 +54,9 @@ func NewManager(analyzers ...Analyzer) (*Manager, error) {
}, nil
}

func (m *Manager) StartAll(logger *zap.Logger) error {
func (m *Manager) StartAll(logger *component.TelemetryLogger) error {
for _, analyzer := range m.allAnalyzers {
logger.Sugar().Infof("Starting analyzer [%s]", analyzer.Type())
logger.Infof("Starting analyzer [%s]", analyzer.Type())
err := analyzer.Start()
if err != nil {
return err
@@ -65,10 +65,10 @@ func (m *Manager) StartAll(logger *zap.Logger) error {
return nil
}

func (m *Manager) ShutdownAll(logger *zap.Logger) error {
func (m *Manager) ShutdownAll(logger *component.TelemetryLogger) error {
var retErr error = nil
for _, analyzer := range m.allAnalyzers {
logger.Sugar().Infof("Shutdown analyzer [%s]", analyzer.Type())
logger.Infof("Shutdown analyzer [%s]", analyzer.Type())
err := analyzer.Shutdown()
if err != nil {
retErr = multierror.Append(retErr, err)
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"os"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/model"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@@ -17,12 +18,12 @@ type ConnectMonitor struct {
connMap map[ConnKey]*ConnectionStats
statesResource StatesResource
hostProcPath string
logger *zap.Logger
logger *component.TelemetryLogger
}

const HostProc = "HOST_PROC_PATH"

func NewConnectMonitor(logger *zap.Logger) *ConnectMonitor {
func NewConnectMonitor(logger *component.TelemetryLogger) *ConnectMonitor {
path, ok := os.LookupEnv(HostProc)
if !ok {
path = "/proc"
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package otelexporter

import (
"context"
"reflect"

"github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter"
"github.com/Kindling-project/kindling/collector/pkg/model"
@@ -89,8 +88,8 @@ func (e *OtelExporter) exportMetric(result *adapter.AdaptedResult) {
} else if metric.DataType() == model.HistogramMetricType {
e.telemetry.Logger.Warn("Failed to exporter Metric: can not use otlp-exporter to export histogram Data", zap.String("MetricName", metric.Name))
} else {
if ce := e.telemetry.Logger.Check(zapcore.DebugLevel, "Undefined metricKind for this Metric"); ce != nil {
ce.Write(zap.String("MetricName", metric.Name), zap.String("MetricType", reflect.TypeOf(metric).String()))
if ce := e.telemetry.Logger.Check(zapcore.DebugLevel, "Undefined metricKind for this Metric in metric_aggregation_map"); ce != nil {
ce.Write(zap.String("MetricName", metric.Name))
}
}
}
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"fmt"
"os"

"github.com/Kindling-project/kindling/collector/pkg/component"
"go.uber.org/zap"

"go.opentelemetry.io/otel/attribute"
@@ -46,7 +47,7 @@ var commonLabels = []attribute.KeyValue{
attribute.String("instance", GetHostname()),
}

func GetCommonLabels(withUserInfo bool, logger *zap.Logger) []attribute.KeyValue {
func GetCommonLabels(withUserInfo bool, logger *component.TelemetryLogger) []attribute.KeyValue {
var clusterId, userId string
var err error

Original file line number Diff line number Diff line change
@@ -8,12 +8,12 @@ import (

"github.com/Kindling-project/kindling/collector/pkg/aggregator"
"github.com/Kindling-project/kindling/collector/pkg/aggregator/defaultaggregator"
"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"

"go.opentelemetry.io/otel/metric"
)
@@ -25,20 +25,20 @@ type instrumentFactory struct {
instruments sync.Map
meter metric.Meter
customLabels []attribute.KeyValue
logger *zap.Logger
telemetry *component.TelemetryTools

aggregator *defaultaggregator.DefaultAggregator

traceAsMetricSelector *aggregator.LabelSelectors
TcpRttMillsSelector *aggregator.LabelSelectors
}

func newInstrumentFactory(meter metric.Meter, logger *zap.Logger, customLabels []attribute.KeyValue) *instrumentFactory {
func newInstrumentFactory(meter metric.Meter, telemetry *component.TelemetryTools, customLabels []attribute.KeyValue) *instrumentFactory {
return &instrumentFactory{
instruments: sync.Map{},
meter: meter,
customLabels: customLabels,
logger: logger,
telemetry: telemetry,
aggregator: defaultaggregator.NewDefaultAggregator(&defaultaggregator.AggregatedConfig{
KindMap: map[string][]defaultaggregator.KindConfig{
constnames.TcpRttMetricName: {
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ import (
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"github.com/Kindling-project/kindling/collector/pkg/observability/logger"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
@@ -50,7 +49,7 @@ func Test_instrumentFactory_recordLastValue(t *testing.T) {
},
}

loggerInstance := logger.CreateConsoleLogger()
loggerInstance := component.NewDefaultTelemetryTools()
exporter, _ := newExporters(context.Background(), cfg, loggerInstance)

cont := controller.New(
@@ -64,7 +63,7 @@ func Test_instrumentFactory_recordLastValue(t *testing.T) {

cont.Start(context.Background())

ins := newInstrumentFactory(cont.Meter("test"), component.NewDefaultTelemetryTools().Logger, nil)
ins := newInstrumentFactory(cont.Meter("test"), component.NewDefaultTelemetryTools(), nil)

for i := 0; i < 10000; i++ {
time.Sleep(1 * time.Second)
@@ -150,7 +149,7 @@ func makeTraceAsMetricGroup(requestLatency int64, timestamp uint64, dstIp string
}

func Test_instrumentFactory_recordTraceAsMetric(t *testing.T) {
ins := newInstrumentFactory(metric.Meter{}, component.NewDefaultTelemetryTools().Logger, nil)
ins := newInstrumentFactory(metric.Meter{}, component.NewDefaultTelemetryTools(), nil)
metricName := constnames.TraceAsMetric
var randTime int64
var timestamp uint64
Original file line number Diff line number Diff line change
@@ -141,7 +141,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
traceProvider: nil,
defaultTracer: nil,
customLabels: customLabels,
instrumentFactory: newInstrumentFactory(exp.MeterProvider().Meter(MeterName), telemetry.Logger, customLabels),
instrumentFactory: newInstrumentFactory(exp.MeterProvider().Meter(MeterName), telemetry, customLabels),
metricAggregationMap: cfg.MetricAggregationMap,
telemetry: telemetry,
adapters: []adapter.Adapter{
@@ -155,7 +155,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
},
}
go func() {
err := StartServer(exp, telemetry.Logger, cfg.PromCfg.Port)
err := StartServer(exp, telemetry, cfg.PromCfg.Port)
if err != nil {
telemetry.Logger.Warn("error starting otelexporter prometheus server: ", zap.Error(err))
}
@@ -172,7 +172,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
return nil
}

exporters, err := newExporters(context.Background(), cfg, telemetry.Logger)
exporters, err := newExporters(context.Background(), cfg, telemetry)
if err != nil {
telemetry.Logger.Panic("Error happened when creating otel exporter:", zap.Error(err))
return nil
@@ -208,7 +208,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
traceProvider: tracerProvider,
defaultTracer: tracer,
customLabels: customLabels,
instrumentFactory: newInstrumentFactory(cont.Meter(MeterName), telemetry.Logger, customLabels),
instrumentFactory: newInstrumentFactory(cont.Meter(MeterName), telemetry, customLabels),
metricAggregationMap: cfg.MetricAggregationMap,
telemetry: telemetry,
adapters: []adapter.Adapter{
@@ -237,9 +237,9 @@ func (e *OtelExporter) findInstrumentKind(metricName string) (MetricAggregationK
}

// Crete new opentelemetry-go exporter.
func newExporters(context context.Context, cfg *Config, logger *zap.Logger) (*OtelOutputExporters, error) {
func newExporters(context context.Context, cfg *Config, telemetry *component.TelemetryTools) (*OtelOutputExporters, error) {
var retExporters *OtelOutputExporters
logger.Sugar().Infof("Initializing OpenTelemetry exporter whose type is %s", cfg.ExportKind)
telemetry.Logger.Infof("Initializing OpenTelemetry exporter whose type is %s", cfg.ExportKind)
switch cfg.ExportKind {
case StdoutKindExporter:
metricExp, err := stdoutmetric.New(
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ import (
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"github.com/Kindling-project/kindling/collector/pkg/model/constvalues"
"github.com/Kindling-project/kindling/collector/pkg/observability/logger"
"github.com/spf13/viper"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
@@ -171,8 +170,8 @@ func BenchmarkOtelExporter_Consume(b *testing.B) {
},
}

logger := logger.CreateConsoleLogger()
exporter, _ := newExporters(context.Background(), cfg, logger)
telemetry := component.NewDefaultTelemetryTools()
exporter, _ := newExporters(context.Background(), cfg, telemetry)

cont := controller.New(
otelprocessor.NewFactory(simple.NewWithHistogramDistribution(
@@ -189,7 +188,7 @@ func BenchmarkOtelExporter_Consume(b *testing.B) {
traceProvider: nil,
defaultTracer: nil,
customLabels: nil,
instrumentFactory: newInstrumentFactory(cont.Meter(MeterName), logger, nil),
instrumentFactory: newInstrumentFactory(cont.Meter(MeterName), telemetry, nil),
metricAggregationMap: cfg.MetricAggregationMap,
telemetry: component.NewDefaultTelemetryTools(),
adapters: []adapter.Adapter{
@@ -204,7 +203,7 @@ func BenchmarkOtelExporter_Consume(b *testing.B) {
}

if err := cont.Start(context.Background()); err != nil {
logger.Panic("failed to start controller:", zap.Error(err))
telemetry.Logger.Panic("failed to start controller:", zap.Error(err))
}
newSelfMetrics(otelexporter.telemetry.MeterProvider)

Original file line number Diff line number Diff line change
@@ -3,26 +3,26 @@ package otelexporter
import (
"net/http"

"github.com/Kindling-project/kindling/collector/pkg/component"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.uber.org/zap"
)

func StartServer(exporter *prometheus.Exporter, logger *zap.Logger, port string) error {
func StartServer(exporter *prometheus.Exporter, telemetry *component.TelemetryTools, port string) error {
http.HandleFunc("/metrics", exporter.ServeHTTP)

srv := http.Server{
Addr: port,
Handler: http.DefaultServeMux,
}

logger.Sugar().Infof("Prometheus Server listening at port: [%s]", port)
telemetry.Logger.Infof("Prometheus Server listening at port: [%s]", port)
err := srv.ListenAndServe()

if err != nil && err != http.ErrServerClosed {
return err
}

logger.Sugar().Infof("Prometheus gracefully shutdown the http server...\n")
telemetry.Logger.Infof("Prometheus gracefully shutdown the http server...\n")

return nil
}
Original file line number Diff line number Diff line change
@@ -4,11 +4,11 @@ import (
"time"

"github.com/Kindling-project/kindling/collector/pkg/aggregator/defaultaggregator"
"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"github.com/Kindling-project/kindling/collector/pkg/model/constvalues"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

type collector struct {
@@ -69,7 +69,7 @@ func (c *collector) recordMetricGroups(group *model.DataGroup) {
c.aggregator.AggregatorWithAllLabelsAndMetric(group, time.Now())
}

func newCollector(config *Config, logger *zap.Logger) *collector {
func newCollector(config *Config, _ *component.TelemetryLogger) *collector {
// TODO Do this in config later !!!!
requestTimeHistogramTopologyMetric := constnames.ToKindlingNetMetricName(constvalues.RequestTimeHistogram, false)
requestTimeHistogramEntityMetric := constnames.ToKindlingNetMetricName(constvalues.RequestTimeHistogram, true)
Loading