Skip to content

Commit

Permalink
Generalize translation method (accumulate + emit) to JVM metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jlvoiseux committed Jan 25, 2022
1 parent 0c18600 commit 39a0bdc
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 135 deletions.
278 changes: 147 additions & 131 deletions processor/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,87 +90,168 @@ func (c *Consumer) convertResourceMetrics(resourceMetrics pdata.ResourceMetrics,
}
}

// This builder groups system metrics with the same name in a map entry, allowing to perform aggregation
// Otel specification : https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/system-metrics.md
// ex : total memory = system.memory.usage (state = free) + system.memory.usage (state = used)
type apmMetricBuilder struct {
metricList []pdata.Metric
metricName string
type apmMetricsBuilder struct {
// Host CPU metrics
cpuCount int // from system.cpu.utilization's cpu attribute
nonIdleCPUUtilizationSum apmMetricValue

// Host memory metrics
freeMemoryBytes apmMetricValue
usedMemoryBytes apmMetricValue

// JVM metrics
jvmGCTime map[string]apmMetricValue
jvmGCCount map[string]apmMetricValue
jvmMemory map[jvmMemoryKey]apmMetricValue
}

func (b *apmMetricBuilder) build(ms metricsets) {
switch b.metricName {
// Compute and upsert system.memory.total
case "system.memory.usage":
var freeDp pdata.NumberDataPoint
var freeSample model.MetricsetSample
var usedDp pdata.NumberDataPoint
var usedSample model.MetricsetSample
for _, metric := range b.metricList {
dps := metric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
if sample, ok := numberSample(dp, model.MetricTypeCounter); ok {
dp.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
if k == "state" {
switch v.StringVal() {
case "used":
usedDp = dp
usedSample = sample
case "free":
freeDp = dp
freeSample = sample
}
func NewApmMetricsBuilder() *apmMetricsBuilder {
var b apmMetricsBuilder
b.jvmGCTime = make(map[string]apmMetricValue)
b.jvmGCCount = make(map[string]apmMetricValue)
b.jvmMemory = make(map[jvmMemoryKey]apmMetricValue)
return &b
}

type apmMetricValue struct {
timestamp time.Time
value float64
}

type jvmMemoryKey struct {
area string
type_ string
pool string // will be "" for non-pool specific memory metrics
}

// accumulate processes m, translating to and accumulating equivalent Elastic APM metrics in b.
func (b *apmMetricsBuilder) accumulate(m pdata.Metric) {

switch m.DataType() {
case pdata.MetricDataTypeSum:
dpsCounter := m.Sum().DataPoints()
for i := 0; i < dpsCounter.Len(); i++ {
dp := dpsCounter.At(i)
if sample, ok := numberSample(dp, model.MetricTypeCounter); ok {
switch m.Name() {
case "system.memory.usage":
if memoryState, exists := dp.Attributes().Get("state"); exists {
switch memoryState.StringVal() {
case "used":
b.usedMemoryBytes = apmMetricValue{dp.Timestamp().AsTime(), sample.Value}
case "free":
b.freeMemoryBytes = apmMetricValue{dp.Timestamp().AsTime(), sample.Value}
}
return true
})
}
case "runtime.jvm.gc.collection", "runtime.jvm.gc.time":
if gcName, exists := dp.Attributes().Get("gc"); exists {
b.jvmGCTime[gcName.StringVal()] = apmMetricValue{dp.Timestamp().AsTime(), sample.Value}
}
case "runtime.jvm.gc.count":
if gcName, exists := dp.Attributes().Get("gc"); exists {
b.jvmGCCount[gcName.StringVal()] = apmMetricValue{dp.Timestamp().AsTime(), sample.Value}
}
}
}
}
if freeDp != (pdata.NumberDataPoint{}) && usedDp != (pdata.NumberDataPoint{}) {
ms.upsertOne(
freeDp.Timestamp().AsTime(),
"system.memory.total",
pdata.NewAttributeMap(),
model.MetricsetSample{Type: model.MetricTypeCounter, Value: freeSample.Value + usedSample.Value},
)
}
// Compute system.cpu.total.norm.pct
// Sum all non-idle utilization metrics and average over the number of cores
case "system.cpu.utilization":
activeProp := float64(0)
numberCpus := 1
var bufferDp pdata.NumberDataPoint
for _, metric := range b.metricList {
dps := metric.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
bufferDp = dp
if sample, ok := numberSample(dp, model.MetricTypeCounter); ok {
dp.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
if k == "state" && v.StringVal() != "idle" {
case pdata.MetricDataTypeGauge:
// Gauge metrics accumulation
dpsGauge := m.Gauge().DataPoints()
for i := 0; i < dpsGauge.Len(); i++ {
dp := dpsGauge.At(i)
if sample, ok := numberSample(dp, model.MetricTypeGauge); ok {
switch m.Name() {
case "system.cpu.utilization":
if cpuState, exists := dp.Attributes().Get("state"); exists {
if cpuState.StringVal() != "idle" {
if sample.Value > 1 {
activeProp += 1
b.nonIdleCPUUtilizationSum.value += 1
} else {
activeProp += sample.Value
b.nonIdleCPUUtilizationSum.value += sample.Value
}
b.nonIdleCPUUtilizationSum.timestamp = dp.Timestamp().AsTime()
}
if k == "cpu" {
cpuId, _ := strconv.Atoi(v.StringVal())
if cpuId+1 > numberCpus {
numberCpus = cpuId + 1
}
}
if cpuIdStr, exists := dp.Attributes().Get("cpu"); exists {
cpuId, _ := strconv.Atoi(cpuIdStr.StringVal())
if cpuId+1 > b.cpuCount {
b.cpuCount = cpuId + 1
}
}
case "runtime.jvm.memory.area":
var key jvmMemoryKey
dp.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
switch k {
case "area":
key.area = v.AsString()
case "type":
key.type_ = v.AsString()
}
return true
})
if key.area != "" && key.type_ != "" {
b.jvmMemory[key] = apmMetricValue{dp.Timestamp().AsTime(), sample.Value}
}
}
}
}
}
}

// emit upserts Elastic APM metrics into ms from information accumulated in b.
func (b *apmMetricsBuilder) emit(ms metricsets) {
// system.memory.actual.free
// Direct translation of system.memory.usage (state = free)
if b.freeMemoryBytes.value > 0 {
ms.upsertOne(
b.freeMemoryBytes.timestamp, "system.memory.actual.free", pdata.NewAttributeMap(),
model.MetricsetSample{Type: model.MetricTypeCounter, Value: b.freeMemoryBytes.value},
)
}
// system.memory.total
// system.memory.usage (state = free) + system.memory.usage (state = used)
totalMemoryBytes := b.freeMemoryBytes.value + b.usedMemoryBytes.value
if totalMemoryBytes > 0 {
ms.upsertOne(
b.freeMemoryBytes.timestamp, "system.memory.total", pdata.NewAttributeMap(),
model.MetricsetSample{Type: model.MetricTypeCounter, Value: totalMemoryBytes},
)
}
// system.cpu.total.norm.pct
// Averaging of non-idle CPU utilization over all CPU cores
if b.nonIdleCPUUtilizationSum.value > 0 && b.cpuCount > 0 {
ms.upsertOne(
b.nonIdleCPUUtilizationSum.timestamp, "system.cpu.total.norm.pct", pdata.NewAttributeMap(),
model.MetricsetSample{Type: model.MetricTypeGauge, Value: b.nonIdleCPUUtilizationSum.value / float64(b.cpuCount)},
)
}
// jvm.gc.time
// Direct translation of runtime.jvm.gc.time or runtime.jvm.gc.collection
for k, v := range b.jvmGCTime {
elasticapmAttributes := pdata.NewAttributeMap()
elasticapmAttributes.Insert("name", pdata.NewAttributeValueString(k))
ms.upsertOne(
v.timestamp, "jvm.gc.time", elasticapmAttributes,
model.MetricsetSample{Type: model.MetricTypeCounter, Value: v.value},
)
}
// jvm.gc.count
// Direct translation of runtime.jvm.gc.count
for k, v := range b.jvmGCCount {
elasticapmAttributes := pdata.NewAttributeMap()
elasticapmAttributes.Insert("name", pdata.NewAttributeValueString(k))
ms.upsertOne(
v.timestamp, "jvm.gc.count", elasticapmAttributes,
model.MetricsetSample{Type: model.MetricTypeCounter, Value: v.value},
)
}
// jvm.gc.count
// Direct translation of runtime.jvm.memory.area (area = xxx, type = xxx)
for k, v := range b.jvmMemory {
ms.upsertOne(
bufferDp.Timestamp().AsTime(),
"system.cpu.total.norm.pct",
pdata.NewAttributeMap(),
model.MetricsetSample{Type: model.MetricTypeGauge, Value: activeProp / float64(numberCpus)},
v.timestamp, fmt.Sprintf("jvm.memory.%s.%s", k.area, k.type_), pdata.NewAttributeMap(),
model.MetricsetSample{Type: model.MetricTypeGauge, Value: v.value},
)
}
}
Expand All @@ -184,23 +265,14 @@ func (c *Consumer) convertInstrumentationLibraryMetrics(
ms := make(metricsets)
otelMetrics := in.Metrics()
var unsupported int64
apmMetricBuilderTracker := make(map[string]*apmMetricBuilder)
var builder = NewApmMetricsBuilder()
for i := 0; i < otelMetrics.Len(); i++ {
currentBuilder, exists := apmMetricBuilderTracker[otelMetrics.At(i).Name()]
if exists {
currentBuilder.metricList = append(currentBuilder.metricList, otelMetrics.At(i))
} else {
currentBuilder := apmMetricBuilder{metricList: make([]pdata.Metric, 0), metricName: otelMetrics.At(i).Name()}
currentBuilder.metricList = append(currentBuilder.metricList, otelMetrics.At(i))
apmMetricBuilderTracker[otelMetrics.At(i).Name()] = &currentBuilder
}
builder.accumulate(otelMetrics.At(i))
if !c.addMetric(otelMetrics.At(i), ms) {
unsupported++
}
}
for key := range apmMetricBuilderTracker {
apmMetricBuilderTracker[key].build(ms)
}
builder.emit(ms)
for key, ms := range ms {
event := baseEvent
event.Processor = model.MetricsetProcessor
Expand Down Expand Up @@ -376,62 +448,6 @@ func (ms metricsets) upsert(timestamp time.Time, name string, attributes pdata.A
// We always record metrics as they are given. We also copy some
// well-known OpenTelemetry metrics to their Elastic APM equivalents.
ms.upsertOne(timestamp, name, attributes, sample)

switch name {
case "runtime.jvm.memory.area":
// runtime.jvm.memory.area -> jvm.memory.{area}.{type}
// Copy label "gc" to "name".
var areaValue, typeValue string
attributes.Range(func(k string, v pdata.AttributeValue) bool {
switch k {
case "area":
areaValue = v.AsString()
case "type":
typeValue = v.AsString()
}
return true
})
if areaValue != "" && typeValue != "" {
elasticapmName := fmt.Sprintf("jvm.memory.%s.%s", areaValue, typeValue)
ms.upsertOne(timestamp, elasticapmName, pdata.NewAttributeMap(), sample)
}
case "runtime.jvm.gc.collection":
// This is the old name for runtime.jvm.gc.time.
name = "runtime.jvm.gc.time"
fallthrough
case "runtime.jvm.gc.time", "runtime.jvm.gc.count":
// Chop off the "runtime." prefix, i.e. runtime.jvm.gc.time -> jvm.gc.time.
// OpenTelemetry and Elastic APM metrics are both defined in milliseconds.
elasticapmName := name[len("runtime."):]

// Copy label "gc" to "name".
elasticapmAttributes := pdata.NewAttributeMap()
attributes.Range(func(k string, v pdata.AttributeValue) bool {
if k == "gc" {
elasticapmAttributes.Insert("name", v)
return false
}
return true
})
ms.upsertOne(timestamp, elasticapmName, elasticapmAttributes, sample)
case "system.memory.usage":
// Translation of Otel memory metrics
// system.memory.usage (state=free) -> system.memory.actual.free
// system.memory.usage (state=used) -> system.memory.actual.used.bytes
var elasticapmName string
attributes.Range(func(k string, v pdata.AttributeValue) bool {
if k == "state" {
switch v.StringVal() {
case "used":
elasticapmName = "system.memory.actual.used.bytes"
case "free":
elasticapmName = "system.memory.actual.free"
}
}
return true
})
ms.upsertOne(timestamp, elasticapmName, pdata.NewAttributeMap(), sample)
}
}

func (ms metricsets) upsertOne(timestamp time.Time, name string, attributes pdata.AttributeMap, sample model.MetricsetSample) {
Expand Down
4 changes: 0 additions & 4 deletions processor/otel/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,10 +558,6 @@ func TestConsumeMetricsHostMemory(t *testing.T) {
Type: "counter",
Value: 4773351424,
},
"system.memory.actual.used.bytes": {
Type: "counter",
Value: 3563778048,
},
"system.memory.total": {
Type: "counter",
Value: 8337129472,
Expand Down

0 comments on commit 39a0bdc

Please sign in to comment.