Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/add histogram aggregator in defaultAggregator #226

Merged
merged 20 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
979edfd
feat: support the histogram Gauge
NeJan2020 May 17, 2022
c56f5ba
fix: update Gauge model in each component
NeJan2020 May 17, 2022
0192eda
feat: add histogram aggregatorValues
NeJan2020 May 17, 2022
4b07104
fix: update Gauge model in each component
NeJan2020 May 17, 2022
7cc4751
fix: update test case
NeJan2020 May 17, 2022
75ee3ea
fix: support to aggregate the histogram Gauge
NeJan2020 May 18, 2022
cf0ab76
feat: a simple prometheus-exporter use prometheus-client
NeJan2020 May 17, 2022
7822c6d
feat:
NeJan2020 May 19, 2022
9b25ed7
feat: add explicit_boundaries config in histogram aggregation
NeJan2020 May 25, 2022
edfcb19
test: update test case
NeJan2020 May 26, 2022
f970214
style: Rename model.GaugeGroup
NeJan2020 May 26, 2022
faf0a1f
refactor: update swap function of labelKey
NeJan2020 May 26, 2022
ace35e5
rename: rename metricGroup to dataGroup
NeJan2020 May 26, 2022
9029ee8
config: remove histogram aggregation from default config
NeJan2020 May 26, 2022
f5aad62
perf: remove useless atomic exp
NeJan2020 May 26, 2022
f5a6075
fix: add a metric clear function for each type of metric
NeJan2020 May 26, 2022
02de63f
refactor: move defaultadapter to exporter.tools ; add histogram metri…
NeJan2020 May 27, 2022
c0356ce
rename: rename MetricGroupPool to DataGroupPool
NeJan2020 May 27, 2022
29a126c
fix: avoid `nullptr` when using otlp-exporter to export histogram data
NeJan2020 May 27, 2022
bf567ba
fix: add option to determine the attribute Type when using adapter
NeJan2020 May 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/analyzer/loganalyzer/loganalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (a *LogAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
)
}
for _, nextConsumer := range a.nextConsumers {
nextConsumer.Consume(&model.GaugeGroup{})
nextConsumer.Consume(&model.DataGroup{})
}
return nil
}
Expand Down
40 changes: 20 additions & 20 deletions collector/analyzer/network/gauge_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,34 @@ import (
"time"
)

func createGaugeGroup() interface{} {
values := []*model.Gauge{
{Name: constvalues.ConnectTime, Value: 0},
{Name: constvalues.RequestSentTime, Value: 0},
{Name: constvalues.WaitingTtfbTime, Value: 0},
{Name: constvalues.ContentDownloadTime, Value: 0},
{Name: constvalues.RequestTotalTime, Value: 0},
{Name: constvalues.RequestIo, Value: 0},
{Name: constvalues.ResponseIo, Value: 0},
func createMetricGroup() interface{} {
values := []*model.Metric{
model.NewIntMetric(constvalues.ConnectTime, 0),
model.NewIntMetric(constvalues.RequestSentTime, 0),
model.NewIntMetric(constvalues.WaitingTtfbTime, 0),
model.NewIntMetric(constvalues.ContentDownloadTime, 0),
model.NewIntMetric(constvalues.RequestTotalTime, 0),
model.NewIntMetric(constvalues.RequestIo, 0),
model.NewIntMetric(constvalues.ResponseIo, 0),
}
gaugeGroup := model.NewGaugeGroup(constnames.NetRequestGaugeGroupName, model.NewAttributeMap(), uint64(time.Now().UnixNano()), values...)
return gaugeGroup
metricGroup := model.NewDataGroup(constnames.NetRequestMetricGroupName, model.NewAttributeMap(), uint64(time.Now().UnixNano()), values...)
return metricGroup
}

type GaugeGroupPool struct {
type MetricGroupPool struct {
pool *sync.Pool
}

func NewGaugePool() *GaugeGroupPool {
return &GaugeGroupPool{pool: &sync.Pool{New: createGaugeGroup}}
func NewMetricPool() *MetricGroupPool {
return &MetricGroupPool{pool: &sync.Pool{New: createMetricGroup}}
}

func (p *GaugeGroupPool) Get() *model.GaugeGroup {
return p.pool.Get().(*model.GaugeGroup)
func (p *MetricGroupPool) Get() *model.DataGroup {
return p.pool.Get().(*model.DataGroup)
}

func (p *GaugeGroupPool) Free(gaugeGroup *model.GaugeGroup) {
gaugeGroup.Reset()
gaugeGroup.Name = constnames.NetRequestGaugeGroupName
p.pool.Put(gaugeGroup)
func (p *MetricGroupPool) Free(metricGroup *model.DataGroup) {
metricGroup.Reset()
metricGroup.Name = constnames.NetRequestMetricGroupName
p.pool.Put(metricGroup)
}
68 changes: 34 additions & 34 deletions collector/analyzer/network/network_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type NetworkAnalyzer struct {
protocolMap map[string]*protocol.ProtocolParser
parsers []*protocol.ProtocolParser

gaugeGroupPool *GaugeGroupPool
metricGroupPool *MetricGroupPool
requestMonitor sync.Map
tcpMessagePairSize int64
udpMessagePairSize int64
Expand All @@ -50,10 +50,10 @@ type NetworkAnalyzer struct {
func NewNetworkAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consumers []consumer.Consumer) analyzer.Analyzer {
config, _ := cfg.(*Config)
na := &NetworkAnalyzer{
cfg: config,
gaugeGroupPool: NewGaugePool(),
nextConsumers: consumers,
telemetry: telemetry,
cfg: config,
metricGroupPool: NewMetricPool(),
nextConsumers: consumers,
telemetry: telemetry,
}
if config.EnableConntrack {
connConfig := &conntracker.Config{
Expand Down Expand Up @@ -312,12 +312,12 @@ func (na *NetworkAnalyzer) distributeTraceMetric(oldPairs *messagePairs, newPair
for _, nexConsumer := range na.nextConsumers {
nexConsumer.Consume(record)
}
na.gaugeGroupPool.Free(record)
na.metricGroupPool.Free(record)
}
return nil
}

func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.GaugeGroup {
func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.DataGroup {
// Step 1: Static Config for port and protocol set in config file
port := mps.getPort()
staticProtocol, found := na.staticPortMap[port]
Expand Down Expand Up @@ -377,7 +377,7 @@ func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.GaugeGroup
return na.getRecords(mps, protocol.NOSUPPORT, nil)
}

func (na *NetworkAnalyzer) parseProtocol(mps *messagePairs, parser *protocol.ProtocolParser) []*model.GaugeGroup {
func (na *NetworkAnalyzer) parseProtocol(mps *messagePairs, parser *protocol.ProtocolParser) []*model.DataGroup {
if parser.MultiRequests() {
// Not mergable requests
return na.parseMultipleRequests(mps, parser)
Expand All @@ -404,7 +404,7 @@ func (na *NetworkAnalyzer) parseProtocol(mps *messagePairs, parser *protocol.Pro

// parseMultipleRequests parses the messagePairs when we know there could be multiple read requests.
// This is used only when the protocol is DNS now.
func (na *NetworkAnalyzer) parseMultipleRequests(mps *messagePairs, parser *protocol.ProtocolParser) []*model.GaugeGroup {
func (na *NetworkAnalyzer) parseMultipleRequests(mps *messagePairs, parser *protocol.ProtocolParser) []*model.DataGroup {
// Match with key when disordering.
size := mps.requests.size()
parsedReqMsgs := make([]*protocol.PayloadMessage, size)
Expand All @@ -418,7 +418,7 @@ func (na *NetworkAnalyzer) parseMultipleRequests(mps *messagePairs, parser *prot
parsedReqMsgs[i] = requestMsg
}

records := make([]*model.GaugeGroup, 0)
records := make([]*model.DataGroup, 0)
if mps.responses == nil {
size := mps.requests.size()
for i := 0; i < size; i++ {
Expand Down Expand Up @@ -469,11 +469,11 @@ func (na *NetworkAnalyzer) parseMultipleRequests(mps *messagePairs, parser *prot
}
}

func (na *NetworkAnalyzer) getConnectFailRecords(mps *messagePairs) []*model.GaugeGroup {
func (na *NetworkAnalyzer) getConnectFailRecords(mps *messagePairs) []*model.DataGroup {
evt := mps.connects.event
ret := na.gaugeGroupPool.Get()
ret.UpdateAddGauge(constvalues.ConnectTime, int64(mps.connects.getDuration()))
ret.UpdateAddGauge(constvalues.RequestTotalTime, int64(mps.connects.getDuration()))
ret := na.metricGroupPool.Get()
ret.UpdateAddIntMetric(constvalues.ConnectTime, int64(mps.connects.getDuration()))
ret.UpdateAddIntMetric(constvalues.RequestTotalTime, int64(mps.connects.getDuration()))
ret.Labels.UpdateAddIntValue(constlabels.Pid, int64(evt.GetPid()))
ret.Labels.UpdateAddStringValue(constlabels.SrcIp, evt.GetSip())
ret.Labels.UpdateAddStringValue(constlabels.DstIp, evt.GetDip())
Expand All @@ -487,18 +487,18 @@ func (na *NetworkAnalyzer) getConnectFailRecords(mps *messagePairs) []*model.Gau
ret.Labels.UpdateAddBoolValue(constlabels.IsSlow, false)
ret.Labels.UpdateAddBoolValue(constlabels.IsServer, evt.GetCtx().GetFdInfo().Role)
ret.Timestamp = evt.GetStartTime()
return []*model.GaugeGroup{ret}
return []*model.DataGroup{ret}
}

func (na *NetworkAnalyzer) getRecords(mps *messagePairs, protocol string, attributes *model.AttributeMap) []*model.GaugeGroup {
func (na *NetworkAnalyzer) getRecords(mps *messagePairs, protocol string, attributes *model.AttributeMap) []*model.DataGroup {
evt := mps.requests.event

slow := false
if mps.responses != nil {
slow = na.isSlow(mps.getDuration(), protocol)
}

ret := na.gaugeGroupPool.Get()
ret := na.metricGroupPool.Get()
labels := ret.Labels
labels.UpdateAddIntValue(constlabels.Pid, int64(evt.GetPid()))
labels.UpdateAddStringValue(constlabels.SrcIp, evt.GetSip())
Expand Down Expand Up @@ -526,27 +526,27 @@ func (na *NetworkAnalyzer) getRecords(mps *messagePairs, protocol string, attrib
labels.UpdateAddIntValue(constlabels.DnatPort, int64(mps.natTuple.ReplSrcPort))
}

ret.UpdateAddGauge(constvalues.ConnectTime, int64(mps.getConnectDuration()))
ret.UpdateAddGauge(constvalues.RequestSentTime, mps.getSentTime())
ret.UpdateAddGauge(constvalues.WaitingTtfbTime, mps.getWaitingTime())
ret.UpdateAddGauge(constvalues.ContentDownloadTime, mps.getDownloadTime())
ret.UpdateAddGauge(constvalues.RequestTotalTime, int64(mps.getConnectDuration()+mps.getDuration()))
ret.UpdateAddGauge(constvalues.RequestIo, int64(mps.getRquestSize()))
ret.UpdateAddGauge(constvalues.ResponseIo, int64(mps.getResponseSize()))
ret.UpdateAddIntMetric(constvalues.ConnectTime, int64(mps.getConnectDuration()))
ret.UpdateAddIntMetric(constvalues.RequestSentTime, mps.getSentTime())
ret.UpdateAddIntMetric(constvalues.WaitingTtfbTime, mps.getWaitingTime())
ret.UpdateAddIntMetric(constvalues.ContentDownloadTime, mps.getDownloadTime())
ret.UpdateAddIntMetric(constvalues.RequestTotalTime, int64(mps.getConnectDuration()+mps.getDuration()))
ret.UpdateAddIntMetric(constvalues.RequestIo, int64(mps.getRquestSize()))
ret.UpdateAddIntMetric(constvalues.ResponseIo, int64(mps.getResponseSize()))

ret.Timestamp = evt.GetStartTime()

return []*model.GaugeGroup{ret}
return []*model.DataGroup{ret}
}

// getRecordWithSinglePair generates a record whose metrics are copied from the input messagePair,
// instead of messagePairs. This is used only when there could be multiple real requests in messagePairs.
// For now, only messagePairs with DNS protocol would run into this method.
func (na *NetworkAnalyzer) getRecordWithSinglePair(mps *messagePairs, mp *messagePair, protocol string, attributes *model.AttributeMap) *model.GaugeGroup {
func (na *NetworkAnalyzer) getRecordWithSinglePair(mps *messagePairs, mp *messagePair, protocol string, attributes *model.AttributeMap) *model.DataGroup {
evt := mp.request

slow := na.isSlow(mp.getDuration(), protocol)
ret := na.gaugeGroupPool.Get()
ret := na.metricGroupPool.Get()
labels := ret.Labels
labels.UpdateAddIntValue(constlabels.Pid, int64(evt.GetPid()))
labels.UpdateAddStringValue(constlabels.SrcIp, evt.GetSip())
Expand Down Expand Up @@ -574,13 +574,13 @@ func (na *NetworkAnalyzer) getRecordWithSinglePair(mps *messagePairs, mp *messag
labels.UpdateAddIntValue(constlabels.DnatPort, int64(mps.natTuple.ReplSrcPort))
}

ret.UpdateAddGauge(constvalues.ConnectTime, 0)
ret.UpdateAddGauge(constvalues.RequestSentTime, mp.getSentTime())
ret.UpdateAddGauge(constvalues.WaitingTtfbTime, mp.getWaitingTime())
ret.UpdateAddGauge(constvalues.ContentDownloadTime, mp.getDownloadTime())
ret.UpdateAddGauge(constvalues.RequestTotalTime, int64(mp.getDuration()))
ret.UpdateAddGauge(constvalues.RequestIo, int64(mp.getRquestSize()))
ret.UpdateAddGauge(constvalues.ResponseIo, int64(mp.getResponseSize()))
ret.UpdateAddIntMetric(constvalues.ConnectTime, 0)
ret.UpdateAddIntMetric(constvalues.RequestSentTime, mp.getSentTime())
ret.UpdateAddIntMetric(constvalues.WaitingTtfbTime, mp.getWaitingTime())
ret.UpdateAddIntMetric(constvalues.ContentDownloadTime, mp.getDownloadTime())
ret.UpdateAddIntMetric(constvalues.RequestTotalTime, int64(mp.getDuration()))
ret.UpdateAddIntMetric(constvalues.RequestIo, int64(mp.getRquestSize()))
ret.UpdateAddIntMetric(constvalues.ResponseIo, int64(mp.getResponseSize()))

ret.Timestamp = evt.GetStartTime()
return ret
Expand Down
28 changes: 14 additions & 14 deletions collector/analyzer/network/network_analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func TestDubboProtocol(t *testing.T) {
type NopProcessor struct {
}

func (n NopProcessor) Consume(gaugeGroup *model.GaugeGroup) error {
// fmt.Printf("Consume %v\n", gaugeGroup)
func (n NopProcessor) Consume(metricGroup *model.DataGroup) error {
// fmt.Printf("Consume %v\n", metricGroup)
return nil
}

Expand All @@ -78,10 +78,10 @@ func prepareNetworkAnalyzer() *NetworkAnalyzer {
viper.UnmarshalKey("analyzers.networkanalyzer", config)

na = &NetworkAnalyzer{
cfg: config,
gaugeGroupPool: NewGaugePool(),
nextConsumers: []consumer.Consumer{&NopProcessor{}},
telemetry: component.NewDefaultTelemetryTools(),
cfg: config,
metricGroupPool: NewMetricPool(),
nextConsumers: []consumer.Consumer{&NopProcessor{}},
telemetry: component.NewDefaultTelemetryTools(),
}
na.Start()
}
Expand Down Expand Up @@ -205,25 +205,25 @@ func (trace *Trace) PrepareMessagePairs(common *EventCommon) *messagePairs {
return mps
}

func (trace *Trace) Validate(t *testing.T, results []*model.GaugeGroup) {
func (trace *Trace) Validate(t *testing.T, results []*model.DataGroup) {
checkSize(t, "Expect Size", len(trace.Expects), len(results))

for i, result := range results {
expect := trace.Expects[i]
checkUint64Equal(t, "Timestamp", expect.Timestamp, result.Timestamp)

// Validate Gauges Values
checkSize(t, "Values Size", len(expect.Values), len(result.Values))
for _, value := range result.Values {
// Validate Metrics Metrics
checkSize(t, "Metrics Size", len(expect.Values), len(result.Metrics))
for _, value := range result.Metrics {
expectValue, ok := expect.Values[value.Name]
if !ok {
t.Errorf("[Miss %s] want=nil, got=%d", value.Name, value.Value)
t.Errorf("[Miss %s] want=nil, got=%d", value.Name, value.GetInt().Value)
} else {
checkInt64Equal(t, value.Name, expectValue, value.Value)
checkInt64Equal(t, value.Name, expectValue, value.GetInt().Value)
}
}

// Validate Gauges Attributes
// Validate Metrics Attributes
checkSize(t, "Labels Size", len(expect.Labels), result.Labels.Size())
for labelKey, labelValue := range expect.Labels {
if reflect.TypeOf(labelValue).Name() == "int" {
Expand Down Expand Up @@ -385,6 +385,6 @@ func Int64ToBytes(value int64) []byte {

type TraceExpect struct {
Timestamp uint64 `mapstructure:"Timestamp"`
Values map[string]int64 `mapstructure:"Values"`
Values map[string]int64 `mapstructure:"Metrics"`
Labels map[string]interface{} `mapstructure:"Labels"`
}
39 changes: 15 additions & 24 deletions collector/analyzer/tcpmetricanalyzer/tcp_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ func (a *TcpMetricAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
if !ok {
return nil
}
var gaugeGroup *model.GaugeGroup
var metricGroup *model.DataGroup
var err error
switch event.Name {
case constnames.TcpCloseEvent:
case constnames.TcpRcvEstablishedEvent:
gaugeGroup, err = a.generateRtt(event)
metricGroup, err = a.generateRtt(event)
case constnames.TcpDropEvent:
gaugeGroup, err = a.generateDrop(event)
metricGroup, err = a.generateDrop(event)
case constnames.TcpRetransmitSkbEvent:
gaugeGroup, err = a.generateRetransmit(event)
metricGroup, err = a.generateRetransmit(event)
}
if err != nil {
if ce := a.telemetry.Logger.Check(zapcore.DebugLevel, "Event Skip, "); ce != nil {
Expand All @@ -74,20 +74,20 @@ func (a *TcpMetricAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
}
return nil
}
if gaugeGroup == nil {
if metricGroup == nil {
return nil
}
var retError error
for _, nextConsumer := range a.consumers {
err := nextConsumer.Consume(gaugeGroup)
err := nextConsumer.Consume(metricGroup)
if err != nil {
retError = multierror.Append(retError, err)
}
}
return retError
}

func (a *TcpMetricAnalyzer) generateRtt(event *model.KindlingEvent) (*model.GaugeGroup, error) {
func (a *TcpMetricAnalyzer) generateRtt(event *model.KindlingEvent) (*model.DataGroup, error) {
// Only client-side has rtt metric
labels, err := a.getTupleLabels(event)
if err != nil {
Expand All @@ -99,35 +99,26 @@ func (a *TcpMetricAnalyzer) generateRtt(event *model.KindlingEvent) (*model.Gaug
if rtt == 0 {
return nil, nil
}
gauge := &model.Gauge{
Name: constnames.TcpRttMetricName,
Value: int64(rtt),
}
return model.NewGaugeGroup(constnames.TcpGaugeGroupName, labels, event.Timestamp, gauge), nil
metric := model.NewIntMetric(constnames.TcpRttMetricName, int64(rtt))
return model.NewDataGroup(constnames.TcpMetricGroupName, labels, event.Timestamp, metric), nil
}

func (a *TcpMetricAnalyzer) generateRetransmit(event *model.KindlingEvent) (*model.GaugeGroup, error) {
func (a *TcpMetricAnalyzer) generateRetransmit(event *model.KindlingEvent) (*model.DataGroup, error) {
labels, err := a.getTupleLabels(event)
if err != nil {
return nil, err
}
gauge := &model.Gauge{
Name: constnames.TcpRetransmitMetricName,
Value: 1,
}
return model.NewGaugeGroup(constnames.TcpGaugeGroupName, labels, event.Timestamp, gauge), nil
metric := model.NewIntMetric(constnames.TcpRetransmitMetricName, 1)
return model.NewDataGroup(constnames.TcpMetricGroupName, labels, event.Timestamp, metric), nil
}

func (a *TcpMetricAnalyzer) generateDrop(event *model.KindlingEvent) (*model.GaugeGroup, error) {
func (a *TcpMetricAnalyzer) generateDrop(event *model.KindlingEvent) (*model.DataGroup, error) {
labels, err := a.getTupleLabels(event)
if err != nil {
return nil, err
}
gauge := &model.Gauge{
Name: constnames.TcpDropMetricName,
Value: 1,
}
return model.NewGaugeGroup(constnames.TcpGaugeGroupName, labels, event.Timestamp, gauge), nil
metric := model.NewIntMetric(constnames.TcpDropMetricName, 1)
return model.NewDataGroup(constnames.TcpMetricGroupName, labels, event.Timestamp, metric), nil
}

func (a *TcpMetricAnalyzer) getTupleLabels(event *model.KindlingEvent) (*model.AttributeMap, error) {
Expand Down
2 changes: 1 addition & 1 deletion collector/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (a *Application) buildPipeline() error {
otelExporterFactory := a.componentsFactory.Exporters[otelexporter.Otel]
otelExporter := otelExporterFactory.NewFunc(otelExporterFactory.Config, a.telemetry.Telemetry)
// Initialize all processors
// 1. GaugeGroup Aggregator
// 1. DataGroup Aggregator
aggregateProcessorFactory := a.componentsFactory.Processors[aggregateprocessor.Type]
aggregateProcessor := aggregateProcessorFactory.NewFunc(aggregateProcessorFactory.Config, a.telemetry.Telemetry, otelExporter)
// 2. Kubernetes metadata processor
Expand Down
2 changes: 1 addition & 1 deletion collector/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package consumer
import "github.com/Kindling-project/kindling/collector/model"

type Consumer interface {
Consume(gaugeGroup *model.GaugeGroup) error
Consume(dataGroup *model.DataGroup) error
}
Loading