-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[awsemfexporter] Restructure Metric Translator Logic #1353
Changes from all commits
a587a19
f3ad7e9
820f516
927ed26
4360a09
10fb898
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,9 +76,50 @@ type CWMetricStats struct { | |
Sum float64 | ||
} | ||
|
||
// Wrapper interface for: | ||
// - pdata.IntDataPointSlice | ||
// - pdata.DoubleDataPointSlice | ||
// - pdata.IntHistogramDataPointSlice | ||
// - pdata.DoubleHistogramDataPointSlice | ||
type DataPoints interface { | ||
Len() int | ||
At(int) DataPoint | ||
} | ||
|
||
// Wrapper interface for: | ||
// - pdata.IntDataPoint | ||
// - pdata.DoubleDataPoint | ||
// - pdata.IntHistogramDataPoint | ||
// - pdata.DoubleHistogramDataPoint | ||
type DataPoint interface { | ||
IsNil() bool | ||
LabelsMap() pdata.StringMap | ||
} | ||
|
||
// Define wrapper interfaces such that At(i) returns a `DataPoint` | ||
type IntDataPointSlice struct { | ||
pdata.IntDataPointSlice | ||
} | ||
type DoubleDataPointSlice struct { | ||
pdata.DoubleDataPointSlice | ||
} | ||
type DoubleHistogramDataPointSlice struct { | ||
pdata.DoubleHistogramDataPointSlice | ||
} | ||
|
||
func (dps IntDataPointSlice) At(i int) DataPoint { | ||
return dps.IntDataPointSlice.At(i) | ||
} | ||
func (dps DoubleDataPointSlice) At(i int) DataPoint { | ||
return dps.DoubleDataPointSlice.At(i) | ||
} | ||
func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint { | ||
return dps.DoubleHistogramDataPointSlice.At(i) | ||
} | ||
|
||
// TranslateOtToCWMetric converts OT metrics to CloudWatch Metric format | ||
func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption string, namespace string) ([]*CWMetrics, int) { | ||
var cwMetricLists []*CWMetrics | ||
var cwMetricList []*CWMetrics | ||
totalDroppedMetrics := 0 | ||
var instrumentationLibName string | ||
|
||
|
@@ -117,11 +158,11 @@ func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption stri | |
totalDroppedMetrics++ | ||
continue | ||
} | ||
cwMetricList := getMeasurements(&metric, namespace, instrumentationLibName, dimensionRollupOption) | ||
cwMetricLists = append(cwMetricLists, cwMetricList...) | ||
cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, dimensionRollupOption) | ||
cwMetricList = append(cwMetricList, cwMetrics...) | ||
} | ||
} | ||
return cwMetricLists, totalDroppedMetrics | ||
return cwMetricList, totalDroppedMetrics | ||
} | ||
|
||
func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent { | ||
|
@@ -150,217 +191,127 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent { | |
return ples | ||
} | ||
|
||
func getMeasurements(metric *pdata.Metric, namespace string, instrumentationLibName string, dimensionRollupOption string) []*CWMetrics { | ||
// Translates OTLP Metric to list of CW Metrics | ||
func getCWMetrics(metric *pdata.Metric, namespace string, instrumentationLibName string, dimensionRollupOption string) []*CWMetrics { | ||
var result []*CWMetrics | ||
var dps DataPoints | ||
|
||
// metric measure data from OT | ||
metricMeasure := make(map[string]string) | ||
// metric measure slice could include multiple metric measures | ||
metricSlice := []map[string]string{} | ||
metricMeasure["Name"] = metric.Name() | ||
metricMeasure["Unit"] = metric.Unit() | ||
metricSlice = append(metricSlice, metricMeasure) | ||
// metric measure slice could include multiple metric measures | ||
metricSlice := []map[string]string{metricMeasure} | ||
|
||
// Retrieve data points | ||
switch metric.DataType() { | ||
case pdata.MetricDataTypeIntGauge: | ||
dps := metric.IntGauge().DataPoints() | ||
if dps.Len() == 0 { | ||
return result | ||
} | ||
for m := 0; m < dps.Len(); m++ { | ||
dp := dps.At(m) | ||
if dp.IsNil() { | ||
continue | ||
} | ||
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption) | ||
if cwMetric != nil { | ||
result = append(result, cwMetric) | ||
} | ||
} | ||
dps = IntDataPointSlice{metric.IntGauge().DataPoints()} | ||
case pdata.MetricDataTypeDoubleGauge: | ||
dps := metric.DoubleGauge().DataPoints() | ||
if dps.Len() == 0 { | ||
return result | ||
} | ||
for m := 0; m < dps.Len(); m++ { | ||
dp := dps.At(m) | ||
if dp.IsNil() { | ||
continue | ||
} | ||
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption) | ||
if cwMetric != nil { | ||
result = append(result, cwMetric) | ||
} | ||
} | ||
dps = DoubleDataPointSlice{metric.DoubleGauge().DataPoints()} | ||
case pdata.MetricDataTypeIntSum: | ||
dps := metric.IntSum().DataPoints() | ||
if dps.Len() == 0 { | ||
return result | ||
} | ||
for m := 0; m < dps.Len(); m++ { | ||
dp := dps.At(m) | ||
if dp.IsNil() { | ||
continue | ||
} | ||
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption) | ||
if cwMetric != nil { | ||
result = append(result, cwMetric) | ||
} | ||
} | ||
dps = IntDataPointSlice{metric.IntSum().DataPoints()} | ||
case pdata.MetricDataTypeDoubleSum: | ||
dps := metric.DoubleSum().DataPoints() | ||
if dps.Len() == 0 { | ||
return result | ||
} | ||
for m := 0; m < dps.Len(); m++ { | ||
dp := dps.At(m) | ||
if dp.IsNil() { | ||
continue | ||
} | ||
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption) | ||
if cwMetric != nil { | ||
result = append(result, cwMetric) | ||
} | ||
} | ||
dps = DoubleDataPointSlice{metric.DoubleSum().DataPoints()} | ||
case pdata.MetricDataTypeDoubleHistogram: | ||
dps := metric.DoubleHistogram().DataPoints() | ||
if dps.Len() == 0 { | ||
return result | ||
dps = DoubleHistogramDataPointSlice{metric.DoubleHistogram().DataPoints()} | ||
} | ||
|
||
if dps.Len() == 0 { | ||
return result | ||
} | ||
for m := 0; m < dps.Len(); m++ { | ||
dp := dps.At(m) | ||
if dp.IsNil() { | ||
continue | ||
} | ||
for m := 0; m < dps.Len(); m++ { | ||
dp := dps.At(m) | ||
if dp.IsNil() { | ||
continue | ||
} | ||
cwMetric := buildCWMetricFromHistogram(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption) | ||
if cwMetric != nil { | ||
result = append(result, cwMetric) | ||
} | ||
cwMetric := buildCWMetric(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption) | ||
if cwMetric != nil { | ||
result = append(result, cwMetric) | ||
} | ||
} | ||
return result | ||
} | ||
|
||
func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics { | ||
// fields contains metric and dimensions key/value pairs | ||
fieldsPairs := make(map[string]interface{}) | ||
var dimensionArray [][]string | ||
// Dimensions Slice | ||
var dimensionSlice []string | ||
var dimensionKV pdata.StringMap | ||
switch metric := dp.(type) { | ||
case pdata.IntDataPoint: | ||
dimensionKV = metric.LabelsMap() | ||
case pdata.DoubleDataPoint: | ||
dimensionKV = metric.LabelsMap() | ||
} | ||
|
||
dimensionKV.ForEach(func(k string, v pdata.StringValue) { | ||
fieldsPairs[k] = v.Value() | ||
dimensionSlice = append(dimensionSlice, k) | ||
}) | ||
// add OTel instrumentation lib name as an additional dimension if it is defined | ||
if instrumentationLibName != noInstrumentationLibraryName { | ||
fieldsPairs[OTellibDimensionKey] = instrumentationLibName | ||
dimensionArray = append(dimensionArray, append(dimensionSlice, OTellibDimensionKey)) | ||
} else { | ||
dimensionArray = append(dimensionArray, dimensionSlice) | ||
// Build CWMetric from DataPoint | ||
func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics { | ||
dimensions, fields := createDimensions(dp, instrumentationLibName, dimensionRollupOption) | ||
cwMeasurement := &CwMeasurement{ | ||
Namespace: namespace, | ||
Dimensions: dimensions, | ||
Metrics: metricSlice, | ||
} | ||
|
||
metricList := []CwMeasurement{*cwMeasurement} | ||
timestamp := time.Now().UnixNano() / int64(time.Millisecond) | ||
|
||
// Extract metric | ||
var metricVal interface{} | ||
switch metric := dp.(type) { | ||
case pdata.IntDataPoint: | ||
// Put a fake but identical metric value here in order to add metric name into fieldsPairs | ||
// Put a fake but identical metric value here in order to add metric name into fields | ||
// since calculateRate() needs metric name as one of metric identifiers | ||
fieldsPairs[pmd.Name()] = int64(FakeMetricValue) | ||
fields[pmd.Name()] = int64(FakeMetricValue) | ||
metricVal = metric.Value() | ||
if needsCalculateRate(pmd) { | ||
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp) | ||
metricVal = calculateRate(fields, metric.Value(), timestamp) | ||
} | ||
case pdata.DoubleDataPoint: | ||
fieldsPairs[pmd.Name()] = float64(FakeMetricValue) | ||
fields[pmd.Name()] = float64(FakeMetricValue) | ||
metricVal = metric.Value() | ||
if needsCalculateRate(pmd) { | ||
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp) | ||
metricVal = calculateRate(fields, metric.Value(), timestamp) | ||
} | ||
case pdata.DoubleHistogramDataPoint: | ||
bucketBounds := metric.ExplicitBounds() | ||
metricVal = &CWMetricStats{ | ||
Min: bucketBounds[0], | ||
Max: bucketBounds[len(bucketBounds)-1], | ||
Count: metric.Count(), | ||
Sum: metric.Sum(), | ||
} | ||
} | ||
if metricVal == nil { | ||
return nil | ||
} | ||
fieldsPairs[pmd.Name()] = metricVal | ||
fields[pmd.Name()] = metricVal | ||
|
||
// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup | ||
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice, instrumentationLibName) | ||
if len(rollupDimensionArray) > 0 { | ||
dimensionArray = append(dimensionArray, rollupDimensionArray...) | ||
} | ||
|
||
cwMeasurement := &CwMeasurement{ | ||
Namespace: namespace, | ||
Dimensions: dimensionArray, | ||
Metrics: metricSlice, | ||
} | ||
metricList := make([]CwMeasurement, 1) | ||
metricList[0] = *cwMeasurement | ||
cwMetric := &CWMetrics{ | ||
Measurements: metricList, | ||
Timestamp: timestamp, | ||
Fields: fieldsPairs, | ||
Fields: fields, | ||
} | ||
return cwMetric | ||
} | ||
|
||
func buildCWMetricFromHistogram(metric pdata.DoubleHistogramDataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics { | ||
// Create dimensions from DataPoint labels, where dimensions is a 2D array of dimension names, | ||
// and initialize fields with dimension key/value pairs | ||
func createDimensions(dp DataPoint, instrumentationLibName string, dimensionRollupOption string) (dimensions [][]string, fields map[string]interface{}) { | ||
// fields contains metric and dimensions key/value pairs | ||
fieldsPairs := make(map[string]interface{}) | ||
var dimensionArray [][]string | ||
// Dimensions Slice | ||
var dimensionSlice []string | ||
dimensionKV := metric.LabelsMap() | ||
fields = make(map[string]interface{}) | ||
dimensionKV := dp.LabelsMap() | ||
|
||
dimensionSlice := make([]string, dimensionKV.Len(), dimensionKV.Len()+1) | ||
idx := 0 | ||
dimensionKV.ForEach(func(k string, v pdata.StringValue) { | ||
fieldsPairs[k] = v.Value() | ||
dimensionSlice = append(dimensionSlice, k) | ||
fields[k] = v.Value() | ||
dimensionSlice[idx] = k | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure but FWIU, it's still idiomatic to stick with append even when preallocating the array in Go. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh interesting, I can change it if it makes more sense. I just did some research during my implementation phase and got the sense that the general opinion on SO was to use indexing if you know the length ahead of time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you do this, you avoid checking for capacity all the time, so it is indeed a small improvement. |
||
idx++ | ||
}) | ||
// add OTel instrumentation lib name as an additional dimension if it is defined | ||
// Add OTel instrumentation lib name as an additional dimension if it is defined | ||
if instrumentationLibName != noInstrumentationLibraryName { | ||
fieldsPairs[OTellibDimensionKey] = instrumentationLibName | ||
dimensionArray = append(dimensionArray, append(dimensionSlice, OTellibDimensionKey)) | ||
fields[OTellibDimensionKey] = instrumentationLibName | ||
dimensions = append(dimensions, append(dimensionSlice, OTellibDimensionKey)) | ||
} else { | ||
dimensionArray = append(dimensionArray, dimensionSlice) | ||
} | ||
|
||
timestamp := time.Now().UnixNano() / int64(time.Millisecond) | ||
|
||
bucketBounds := metric.ExplicitBounds() | ||
metricStats := &CWMetricStats{ | ||
Min: bucketBounds[0], | ||
Max: bucketBounds[len(bucketBounds)-1], | ||
Count: metric.Count(), | ||
Sum: metric.Sum(), | ||
dimensions = append(dimensions, dimensionSlice) | ||
} | ||
fieldsPairs[pmd.Name()] = metricStats | ||
|
||
// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup | ||
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice, instrumentationLibName) | ||
if len(rollupDimensionArray) > 0 { | ||
dimensionArray = append(dimensionArray, rollupDimensionArray...) | ||
dimensions = append(dimensions, rollupDimensionArray...) | ||
} | ||
|
||
cwMeasurement := &CwMeasurement{ | ||
Namespace: namespace, | ||
Dimensions: dimensionArray, | ||
Metrics: metricSlice, | ||
} | ||
metricList := make([]CwMeasurement, 1) | ||
metricList[0] = *cwMeasurement | ||
cwMetric := &CWMetrics{ | ||
Measurements: metricList, | ||
Timestamp: timestamp, | ||
Fields: fieldsPairs, | ||
} | ||
return cwMetric | ||
return | ||
} | ||
|
||
// rate is calculated by valDelta / timeDelta | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a great improvement - separate from this PR would be interesting if this is useful to other exporters and could be in collector core somewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, would be useful to see if we can move this interface to collector core for exporters to use.