Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[awsemfexporter] Restructure Metric Translator Logic #1353

Merged
merged 6 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 109 additions & 158 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,50 @@ type CWMetricStats struct {
Sum float64
}

// Wrapper interface for:
// - pdata.IntDataPointSlice
// - pdata.DoubleDataPointSlice
// - pdata.IntHistogramDataPointSlice
// - pdata.DoubleHistogramDataPointSlice
type DataPoints interface {
Len() int
At(int) DataPoint
}

// Wrapper interface for:
// - pdata.IntDataPoint
// - pdata.DoubleDataPoint
// - pdata.IntHistogramDataPoint
// - pdata.DoubleHistogramDataPoint
type DataPoint interface {
IsNil() bool
LabelsMap() pdata.StringMap
}

// Define wrapper interfaces such that At(i) returns a `DataPoint`
type IntDataPointSlice struct {
pdata.IntDataPointSlice
}
type DoubleDataPointSlice struct {
pdata.DoubleDataPointSlice
}
type DoubleHistogramDataPointSlice struct {
pdata.DoubleHistogramDataPointSlice
}

func (dps IntDataPointSlice) At(i int) DataPoint {
return dps.IntDataPointSlice.At(i)
}
func (dps DoubleDataPointSlice) At(i int) DataPoint {
return dps.DoubleDataPointSlice.At(i)
}
func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
return dps.DoubleHistogramDataPointSlice.At(i)
}

// TranslateOtToCWMetric converts OT metrics to CloudWatch Metric format
func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption string, namespace string) ([]*CWMetrics, int) {
var cwMetricLists []*CWMetrics
var cwMetricList []*CWMetrics
totalDroppedMetrics := 0
var instrumentationLibName string

Expand Down Expand Up @@ -117,11 +158,11 @@ func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption stri
totalDroppedMetrics++
continue
}
cwMetricList := getMeasurements(&metric, namespace, instrumentationLibName, dimensionRollupOption)
cwMetricLists = append(cwMetricLists, cwMetricList...)
cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, dimensionRollupOption)
cwMetricList = append(cwMetricList, cwMetrics...)
}
}
return cwMetricLists, totalDroppedMetrics
return cwMetricList, totalDroppedMetrics
}

func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent {
Expand Down Expand Up @@ -150,217 +191,127 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent {
return ples
}

func getMeasurements(metric *pdata.Metric, namespace string, instrumentationLibName string, dimensionRollupOption string) []*CWMetrics {
// Translates OTLP Metric to list of CW Metrics
func getCWMetrics(metric *pdata.Metric, namespace string, instrumentationLibName string, dimensionRollupOption string) []*CWMetrics {
var result []*CWMetrics
var dps DataPoints

// metric measure data from OT
metricMeasure := make(map[string]string)
// metric measure slice could include multiple metric measures
metricSlice := []map[string]string{}
metricMeasure["Name"] = metric.Name()
metricMeasure["Unit"] = metric.Unit()
metricSlice = append(metricSlice, metricMeasure)
// metric measure slice could include multiple metric measures
metricSlice := []map[string]string{metricMeasure}

// Retrieve data points
switch metric.DataType() {
case pdata.MetricDataTypeIntGauge:
dps := metric.IntGauge().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = IntDataPointSlice{metric.IntGauge().DataPoints()}
case pdata.MetricDataTypeDoubleGauge:
dps := metric.DoubleGauge().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = DoubleDataPointSlice{metric.DoubleGauge().DataPoints()}
case pdata.MetricDataTypeIntSum:
dps := metric.IntSum().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = IntDataPointSlice{metric.IntSum().DataPoints()}
case pdata.MetricDataTypeDoubleSum:
dps := metric.DoubleSum().DataPoints()
if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
dps = DoubleDataPointSlice{metric.DoubleSum().DataPoints()}
case pdata.MetricDataTypeDoubleHistogram:
dps := metric.DoubleHistogram().DataPoints()
if dps.Len() == 0 {
return result
dps = DoubleHistogramDataPointSlice{metric.DoubleHistogram().DataPoints()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a great improvement - separate from this PR would be interesting if this is useful to other exporters and could be in collector core somewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, would be useful to see if we can move this interface to collector core for exporters to use.

}

if dps.Len() == 0 {
return result
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromHistogram(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
cwMetric := buildCWMetric(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
}
return result
}

func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics {
// fields contains metric and dimensions key/value pairs
fieldsPairs := make(map[string]interface{})
var dimensionArray [][]string
// Dimensions Slice
var dimensionSlice []string
var dimensionKV pdata.StringMap
switch metric := dp.(type) {
case pdata.IntDataPoint:
dimensionKV = metric.LabelsMap()
case pdata.DoubleDataPoint:
dimensionKV = metric.LabelsMap()
}

dimensionKV.ForEach(func(k string, v pdata.StringValue) {
fieldsPairs[k] = v.Value()
dimensionSlice = append(dimensionSlice, k)
})
// add OTel instrumentation lib name as an additional dimension if it is defined
if instrumentationLibName != noInstrumentationLibraryName {
fieldsPairs[OTellibDimensionKey] = instrumentationLibName
dimensionArray = append(dimensionArray, append(dimensionSlice, OTellibDimensionKey))
} else {
dimensionArray = append(dimensionArray, dimensionSlice)
// Build CWMetric from DataPoint
func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics {
dimensions, fields := createDimensions(dp, instrumentationLibName, dimensionRollupOption)
cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensions,
Metrics: metricSlice,
}

metricList := []CwMeasurement{*cwMeasurement}
timestamp := time.Now().UnixNano() / int64(time.Millisecond)

// Extract metric
var metricVal interface{}
switch metric := dp.(type) {
case pdata.IntDataPoint:
// Put a fake but identical metric value here in order to add metric name into fieldsPairs
// Put a fake but identical metric value here in order to add metric name into fields
// since calculateRate() needs metric name as one of metric identifiers
fieldsPairs[pmd.Name()] = int64(FakeMetricValue)
fields[pmd.Name()] = int64(FakeMetricValue)
metricVal = metric.Value()
if needsCalculateRate(pmd) {
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp)
metricVal = calculateRate(fields, metric.Value(), timestamp)
}
case pdata.DoubleDataPoint:
fieldsPairs[pmd.Name()] = float64(FakeMetricValue)
fields[pmd.Name()] = float64(FakeMetricValue)
metricVal = metric.Value()
if needsCalculateRate(pmd) {
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp)
metricVal = calculateRate(fields, metric.Value(), timestamp)
}
case pdata.DoubleHistogramDataPoint:
bucketBounds := metric.ExplicitBounds()
metricVal = &CWMetricStats{
Min: bucketBounds[0],
Max: bucketBounds[len(bucketBounds)-1],
Count: metric.Count(),
Sum: metric.Sum(),
}
}
if metricVal == nil {
return nil
}
fieldsPairs[pmd.Name()] = metricVal
fields[pmd.Name()] = metricVal

// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice, instrumentationLibName)
if len(rollupDimensionArray) > 0 {
dimensionArray = append(dimensionArray, rollupDimensionArray...)
}

cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensionArray,
Metrics: metricSlice,
}
metricList := make([]CwMeasurement, 1)
metricList[0] = *cwMeasurement
cwMetric := &CWMetrics{
Measurements: metricList,
Timestamp: timestamp,
Fields: fieldsPairs,
Fields: fields,
}
return cwMetric
}

func buildCWMetricFromHistogram(metric pdata.DoubleHistogramDataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, instrumentationLibName string, dimensionRollupOption string) *CWMetrics {
// Create dimensions from DataPoint labels, where dimensions is a 2D array of dimension names,
// and initialize fields with dimension key/value pairs
func createDimensions(dp DataPoint, instrumentationLibName string, dimensionRollupOption string) (dimensions [][]string, fields map[string]interface{}) {
// fields contains metric and dimensions key/value pairs
fieldsPairs := make(map[string]interface{})
var dimensionArray [][]string
// Dimensions Slice
var dimensionSlice []string
dimensionKV := metric.LabelsMap()
fields = make(map[string]interface{})
dimensionKV := dp.LabelsMap()

dimensionSlice := make([]string, dimensionKV.Len(), dimensionKV.Len()+1)
idx := 0
dimensionKV.ForEach(func(k string, v pdata.StringValue) {
fieldsPairs[k] = v.Value()
dimensionSlice = append(dimensionSlice, k)
fields[k] = v.Value()
dimensionSlice[idx] = k
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure but FWIU, it's still idiomatic to stick with append even when preallocating the array in Go.

Copy link
Contributor Author

@kohrapha kohrapha Oct 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, I can change it if it makes more sense. I just did some research during my implementation phase and got the sense that the general opinion on SO was to use indexing if you know the length ahead of time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do this, you avoid checking for capacity all the time, so it is indeed a small improvement.

idx++
})
// add OTel instrumentation lib name as an additional dimension if it is defined
// Add OTel instrumentation lib name as an additional dimension if it is defined
if instrumentationLibName != noInstrumentationLibraryName {
fieldsPairs[OTellibDimensionKey] = instrumentationLibName
dimensionArray = append(dimensionArray, append(dimensionSlice, OTellibDimensionKey))
fields[OTellibDimensionKey] = instrumentationLibName
dimensions = append(dimensions, append(dimensionSlice, OTellibDimensionKey))
} else {
dimensionArray = append(dimensionArray, dimensionSlice)
}

timestamp := time.Now().UnixNano() / int64(time.Millisecond)

bucketBounds := metric.ExplicitBounds()
metricStats := &CWMetricStats{
Min: bucketBounds[0],
Max: bucketBounds[len(bucketBounds)-1],
Count: metric.Count(),
Sum: metric.Sum(),
dimensions = append(dimensions, dimensionSlice)
}
fieldsPairs[pmd.Name()] = metricStats

// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice, instrumentationLibName)
if len(rollupDimensionArray) > 0 {
dimensionArray = append(dimensionArray, rollupDimensionArray...)
dimensions = append(dimensions, rollupDimensionArray...)
}

cwMeasurement := &CwMeasurement{
Namespace: namespace,
Dimensions: dimensionArray,
Metrics: metricSlice,
}
metricList := make([]CwMeasurement, 1)
metricList[0] = *cwMeasurement
cwMetric := &CWMetrics{
Measurements: metricList,
Timestamp: timestamp,
Fields: fieldsPairs,
}
return cwMetric
return
}

// rate is calculated by valDelta / timeDelta
Expand Down
Loading