diff --git a/.idea/watcherTasks.xml b/.idea/watcherTasks.xml index 5ed8819..27aa2e8 100644 --- a/.idea/watcherTasks.xml +++ b/.idea/watcherTasks.xml @@ -3,7 +3,6 @@ diff --git a/schema/ast/metrics.go b/schema/ast/metrics.go index 573a278..1112a6b 100644 --- a/schema/ast/metrics.go +++ b/schema/ast/metrics.go @@ -23,9 +23,9 @@ type LabelMapForMetrics struct { } type SplitMetric struct { - ApplyToMetric types.MetricName `yaml:"apply_to_metric"` - ByLabel string `yaml:"by_label"` - LabelsToMetrics map[types.LabelValue]types.MetricName `yaml:"labels_to_metrics"` + ApplyToMetric types.MetricName `yaml:"apply_to_metric"` + ByAttribute string `yaml:"by_attribute"` + AttributesToMetrics map[types.LabelValue]types.MetricName `yaml:"attributes_to_metrics"` } type MergeMetric struct { diff --git a/schema/compiled/compiled_schema.go b/schema/compiled/compiled_schema.go index 295c0c3..9a1541b 100644 --- a/schema/compiled/compiled_schema.go +++ b/schema/compiled/compiled_schema.go @@ -6,7 +6,6 @@ import ( otlpmetric "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1" otlpresource "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1" otlptrace "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1" - "github.com/tigrannajaryan/telemetry-schema/schema/types" ) @@ -39,24 +38,25 @@ func (acts ResourceActions) Apply(resource *otlpresource.Resource) error { } type MetricActions struct { - ByName map[types.MetricName][]MetricAction - OtherMetrics []MetricAction + //ByName map[types.MetricName][]MetricAction + Actions []MetricAction } -func (acts MetricActions) Apply(metric *otlpmetric.Metric) error { - metricName := metric.MetricDescriptor.Name - actions, exists := acts.ByName[types.MetricName(metricName)] - if !exists { - actions = acts.OtherMetrics - } +func (acts MetricActions) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) { + //metricName := metric.MetricDescriptor.Name + //actions, exists := acts.ByName[types.MetricName(metricName)] + //if !exists { + // actions = acts.OtherMetrics + //} - for _, a := range actions { - err := a.Apply(metric) + for _, a := range acts.Actions { + var err error + metrics, err = a.Apply(metrics) if err != nil { - return err + return metrics, err } } - return nil + return metrics, nil } type ResourceAction interface { @@ -82,7 +82,7 @@ func (acts SpanActions) Apply(span *otlptrace.Span) error { } type MetricAction interface { - Apply(metric *otlpmetric.Metric) error + Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) } //type LogRecordAction interface { @@ -101,11 +101,15 @@ func (afv ActionsForVersions) Swap(i, j int) { afv[i], afv[j] = afv[j], afv[i] } -func (s *Schema) ConvertResourceToLatest(fromVersion types.TelemetryVersion, resource *otlpresource.Resource) error { - startIndex := sort.Search(len(s.Versions), func(i int) bool { - // TODO: use proper semver comparison. - return s.Versions[i].VersionNum > fromVersion - }) +func (s *Schema) ConvertResourceToLatest( + fromVersion types.TelemetryVersion, resource *otlpresource.Resource, +) error { + startIndex := sort.Search( + len(s.Versions), func(i int) bool { + // TODO: use proper semver comparison. + return s.Versions[i].VersionNum > fromVersion + }, + ) if startIndex > len(s.Versions) { // Nothing to do return nil @@ -120,11 +124,15 @@ func (s *Schema) ConvertResourceToLatest(fromVersion types.TelemetryVersion, res return nil } -func (s *Schema) ConvertSpansToLatest(fromVersion types.TelemetryVersion, spans []*otlptrace.Span) error { - startIndex := sort.Search(len(s.Versions), func(i int) bool { - // TODO: use proper semver comparison. - return s.Versions[i].VersionNum > fromVersion - }) +func (s *Schema) ConvertSpansToLatest( + fromVersion types.TelemetryVersion, spans []*otlptrace.Span, +) error { + startIndex := sort.Search( + len(s.Versions), func(i int) bool { + // TODO: use proper semver comparison. + return s.Versions[i].VersionNum > fromVersion + }, + ) if startIndex > len(s.Versions) { // Nothing to do return nil @@ -142,22 +150,25 @@ func (s *Schema) ConvertSpansToLatest(fromVersion types.TelemetryVersion, spans return nil } -func (s *Schema) ConvertMetricsToLatest(fromVersion types.TelemetryVersion, metrics []*otlpmetric.Metric) error { - startIndex := sort.Search(len(s.Versions), func(i int) bool { - // TODO: use proper semver comparison. - return s.Versions[i].VersionNum > fromVersion - }) +func (s *Schema) ConvertMetricsToLatest( + fromVersion types.TelemetryVersion, metrics *[]*otlpmetric.Metric, +) error { + startIndex := sort.Search( + len(s.Versions), func(i int) bool { + // TODO: use proper semver comparison. + return s.Versions[i].VersionNum > fromVersion + }, + ) if startIndex > len(s.Versions) { // Nothing to do return nil } for i := startIndex; i < len(s.Versions); i++ { - for j := 0; j < len(metrics); j++ { - metric := metrics[j] - if err := s.Versions[i].Metrics.Apply(metric); err != nil { - return err - } + var err error + *metrics, err = s.Versions[i].Metrics.Apply(*metrics) + if err != nil { + return err } } diff --git a/schema/compiled/metric_actions.go b/schema/compiled/metric_actions.go index f07e741..52a87b7 100644 --- a/schema/compiled/metric_actions.go +++ b/schema/compiled/metric_actions.go @@ -5,18 +5,19 @@ import ( otlpcommon "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1" otlpmetric "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1" - "github.com/tigrannajaryan/telemetry-schema/schema/types" ) type MetricRenameAction map[types.MetricName]types.MetricName -func (act MetricRenameAction) Apply(metric *otlpmetric.Metric) error { - newName, exists := act[types.MetricName(metric.MetricDescriptor.Name)] - if exists { - metric.MetricDescriptor.Name = string(newName) +func (act MetricRenameAction) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) { + for _, metric := range metrics { + newName, exists := act[types.MetricName(metric.MetricDescriptor.Name)] + if exists { + metric.MetricDescriptor.Name = string(newName) + } } - return nil + return metrics, nil } type MetricLabelRenameAction struct { @@ -26,27 +27,34 @@ type MetricLabelRenameAction struct { LabelMap map[string]string } -func (act MetricLabelRenameAction) Apply(metric *otlpmetric.Metric) error { - if len(act.ApplyOnlyToMetrics) > 0 { - if _, exists := act.ApplyOnlyToMetrics[types.MetricName(metric.MetricDescriptor.Name)]; !exists { - return nil +func (act MetricLabelRenameAction) Apply(metrics []*otlpmetric.Metric) ( + []*otlpmetric.Metric, error, +) { + var retErr error + for _, metric := range metrics { + + if len(act.ApplyOnlyToMetrics) > 0 { + if _, exists := act.ApplyOnlyToMetrics[types.MetricName(metric.MetricDescriptor.Name)]; !exists { + continue + } } - } - dt := metric.MetricDescriptor.Type - switch dt { - case otlpmetric.MetricDescriptor_INT64: - dps := metric.Int64DataPoints - for i := 0; i < len(dps); i++ { - dp := dps[i] - err := renameLabels(dp.Labels, act.LabelMap) - if err != nil { - return err + dt := metric.MetricDescriptor.Type + switch dt { + case otlpmetric.MetricDescriptor_INT64: + dps := metric.Int64DataPoints + for i := 0; i < len(dps); i++ { + dp := dps[i] + err := renameLabels(dp.Labels, act.LabelMap) + if err != nil { + retErr = err + } } } + } - return nil + return metrics, retErr } func renameLabels(labels []*otlpcommon.StringKeyValue, renameRules map[string]string) error { @@ -74,3 +82,63 @@ func renameLabels(labels []*otlpcommon.StringKeyValue, renameRules map[string]st } return err } + +type MetricSplitAction struct { + // ApplyOnlyToMetrics limits which metrics this action should apply to. If empty then + // there is no limitation. + MetricName types.MetricName + AttributeName string + SplitMap map[types.LabelValue]types.MetricName +} + +func (act MetricSplitAction) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) { + for i := 0; i < len(metrics); i++ { + metric := metrics[i] + if act.MetricName != types.MetricName(metric.MetricDescriptor.Name) { + continue + } + + var outputMetrics []*otlpmetric.Metric + dt := metric.MetricDescriptor.Type + switch dt { + case otlpmetric.MetricDescriptor_INT64: + dps := metric.Int64DataPoints + for j := 0; j < len(dps); j++ { + dp := dps[j] + outputMetric := splitMetric(act.AttributeName, act.SplitMap, metric, dp) + outputMetrics = append(outputMetrics, outputMetric) + } + } + + metrics = append(append(metrics[0:i], outputMetrics...), metrics[i+1:]...) + } + + return metrics, nil +} + +func splitMetric( + splitByAttr string, + splitRules map[types.LabelValue]types.MetricName, + input *otlpmetric.Metric, + inputDp *otlpmetric.Int64DataPoint, +) *otlpmetric.Metric { + output := &otlpmetric.Metric{} + descr := *input.MetricDescriptor + output.MetricDescriptor = &descr + + outputDp := *inputDp + outputDp.Labels = nil + + for _, label := range inputDp.Labels { + if label.Key == splitByAttr { + if convertTo, exists := splitRules[types.LabelValue(label.Value)]; exists { + newMetricName := string(convertTo) + output.MetricDescriptor.Name = newMetricName + } + continue + } + outputDp.Labels = append(outputDp.Labels, label) + } + output.Int64DataPoints = []*otlpmetric.Int64DataPoint{&outputDp} + return output +} diff --git a/schema/compiler.go b/schema/compiler.go index e7815ea..76cfb46 100644 --- a/schema/compiler.go +++ b/schema/compiler.go @@ -21,9 +21,15 @@ func Compile(schema *ast.Schema) *compiled.Schema { compiledActionsForVersion[versionNum] = actionsForVer } - actionsForVer.Resource = compileResourceActions(versionDescr.All.Changes, versionDescr.Resources.Changes) - actionsForVer.Metrics = compileMetricActions(versionDescr.All.Changes, versionDescr.Metrics.Changes) - actionsForVer.Spans = compileSpanActions(versionDescr.All.Changes, versionDescr.Spans.Changes) + actionsForVer.Resource = compileResourceActions( + versionDescr.All.Changes, versionDescr.Resources.Changes, + ) + actionsForVer.Metrics = compileMetricActions( + versionDescr.All.Changes, versionDescr.Metrics.Changes, + ) + actionsForVer.Spans = compileSpanActions( + versionDescr.All.Changes, versionDescr.Spans.Changes, + ) } // Convert map by version to a slice. @@ -69,63 +75,42 @@ func compileMetricActions( metricActions []ast.MetricTranslationAction, ) (result compiled.MetricActions) { - var compiledActionSeq []compiled.MetricAction - // First add actions in "all" section. for _, action := range allActions { if action.RenameAttributes != nil { compiledAction := compiled.MetricLabelRenameAction{ LabelMap: *action.RenameAttributes, } - compiledActionSeq = append(compiledActionSeq, compiledAction) // Should apply to all metrics. - result.OtherMetrics = append(result.OtherMetrics, compiledAction) + result.Actions = append(result.Actions, compiledAction) } } // Now compile metric actions and add one by one. - affectedMetrics := map[types.MetricName]bool{} for _, srcAction := range metricActions { var compiledAction compiled.MetricAction if srcAction.RenameMetrics != nil { compiledAction = compiled.MetricRenameAction(srcAction.RenameMetrics) - for metricName := range srcAction.RenameMetrics { - affectedMetrics[metricName] = true - } + result.Actions = append(result.Actions, compiledAction) } else if srcAction.RenameLabels != nil { compiledAction = compiled.MetricLabelRenameAction{ ApplyOnlyToMetrics: metricNamesToMap(srcAction.RenameLabels.ApplyToMetrics), LabelMap: srcAction.RenameLabels.LabelMap, } - if len(srcAction.RenameLabels.ApplyToMetrics) == 0 { - // Should apply to all metrics. - result.OtherMetrics = append(result.OtherMetrics, compiledAction) - } else { - // Applies to specific metrics only. - for _, metricName := range srcAction.RenameLabels.ApplyToMetrics { - affectedMetrics[metricName] = true - } + result.Actions = append(result.Actions, compiledAction) + } else if srcAction.Split != nil { + compiledAction = compiled.MetricSplitAction{ + MetricName: srcAction.Split.ApplyToMetric, + AttributeName: srcAction.Split.ByAttribute, + SplitMap: srcAction.Split.AttributesToMetrics, } - } - if compiledAction != nil { - compiledActionSeq = append(compiledActionSeq, compiledAction) + result.Actions = append(result.Actions, compiledAction) } } - result.ByName = map[types.MetricName][]compiled.MetricAction{} - - for metricName := range affectedMetrics { - result.ByName[metricName] = compiledActionSeq - // TODO: optimize compiledActionSeq by checking if metricName is in the - // ApplyOnlyToMetrics map that limits the application of particular action - // then ApplyOnlyToMetrics can be deleted since it has no effect. That will - // speed up the action execution since we no longer need to lookup the metric - // name in the limit map. - } - return result } diff --git a/schema/compiler_test.go b/schema/compiler_test.go index e630db9..dbd48d4 100644 --- a/schema/compiler_test.go +++ b/schema/compiler_test.go @@ -47,9 +47,18 @@ func TestResourceSchemaConversion(t *testing.T) { resource := &otlpresource.Resource{} resource.Attributes = []*otlpcommon.KeyValue{ - {Key: "unknown-attribute", Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_IntValue{123}}}, - {Key: "k8s.cluster.name", Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"OnlineShop"}}}, - {Key: "telemetry.auto.version", Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"1.2.3"}}}, + { + Key: "unknown-attribute", + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_IntValue{123}}, + }, + { + Key: "k8s.cluster.name", + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"OnlineShop"}}, + }, + { + Key: "telemetry.auto.version", + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"1.2.3"}}, + }, } resource2 := proto.Clone(resource).(*otlpresource.Resource) err := compiled.ConvertResourceToLatest("0.0.0", resource2) @@ -66,14 +75,18 @@ func TestResourceSchemaConversion(t *testing.T) { attrVal, exists = getAttr(resource2.Attributes, "kubernetes.cluster.name") assert.True(t, exists) - assert.EqualValues(t, &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"OnlineShop"}}, attrVal) + assert.EqualValues( + t, &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"OnlineShop"}}, attrVal, + ) _, exists = getAttr(resource2.Attributes, "telemetry.auto.version") assert.False(t, exists) attrVal, exists = getAttr(resource2.Attributes, "telemetry.auto_instr.version") assert.True(t, exists) - assert.EqualValues(t, &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"1.2.3"}}, attrVal) + assert.EqualValues( + t, &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"1.2.3"}}, attrVal, + ) } func getLabel(attrs []*otlpcommon.StringKeyValue, key string) (string, bool) { @@ -125,7 +138,30 @@ func TestMetricsSchemaConversion(t *testing.T) { metric2.Int64DataPoints = []*otlpmetric.Int64DataPoint{dp2} metrics = append(metrics, metric2) - err := compiled.ConvertMetricsToLatest("0.0.0", metrics) + metric3 := &otlpmetric.Metric{ + MetricDescriptor: &otlpmetric.MetricDescriptor{ + Name: "system.paging.operations", + Type: otlpmetric.MetricDescriptor_INT64, + }, + } + + dp3 := &otlpmetric.Int64DataPoint{ + Labels: []*otlpcommon.StringKeyValue{ + {Key: "direction", Value: "in"}, + {Key: "http.status_code", Value: "abc"}, + }, + } + dp4 := &otlpmetric.Int64DataPoint{ + Labels: []*otlpcommon.StringKeyValue{ + {Key: "direction", Value: "out"}, + {Key: "http.status_code", Value: "abc"}, + }, + } + + metric3.Int64DataPoints = []*otlpmetric.Int64DataPoint{dp3, dp4} + metrics = append(metrics, metric3) + + err := compiled.ConvertMetricsToLatest("0.0.0", &metrics) assert.NoError(t, err) assert.EqualValues(t, "cpu.usage.total", metric1.MetricDescriptor.Name) @@ -143,6 +179,16 @@ func TestMetricsSchemaConversion(t *testing.T) { assert.EqualValues(t, "abc", v) v, _ = getLabel(dp2.Labels, "status") assert.EqualValues(t, "234", v) + + assert.EqualValues(t, "system.paging.operations.in", metrics[2].MetricDescriptor.Name) + assert.Len(t, metrics[2].Int64DataPoints[0].Labels, 1) + v, _ = getLabel(metrics[2].Int64DataPoints[0].Labels, "http.response_status_code") + assert.EqualValues(t, "abc", v) + + assert.EqualValues(t, "system.paging.operations.out", metrics[3].MetricDescriptor.Name) + assert.Len(t, metrics[3].Int64DataPoints[0].Labels, 1) + v, _ = getLabel(metrics[3].Int64DataPoints[0].Labels, "http.response_status_code") + assert.EqualValues(t, "abc", v) } func BenchmarkResourceSchemaConversion(b *testing.B) { @@ -152,14 +198,28 @@ func BenchmarkResourceSchemaConversion(b *testing.B) { for i := 0; i < b.N; i++ { resource := &otlpresource.Resource{} resource.Attributes = []*otlpcommon.KeyValue{ - {Key: "k8s.container.name", Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_IntValue{123}}}, - {Key: "k8s.cluster.name", Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"OnlineShop"}}}, - {Key: "telemetry.auto.version", Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"1.2.3"}}}, + { + Key: "k8s.container.name", + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_IntValue{123}}, + }, + { + Key: "k8s.cluster.name", + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"OnlineShop"}}, + }, + { + Key: "telemetry.auto.version", + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{"1.2.3"}}, + }, } for j := len(resource.Attributes); j < 20; j++ { - resource.Attributes = append(resource.Attributes, - &otlpcommon.KeyValue{Key: "attribute" + strconv.Itoa(j), Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_IntValue{int64(j)}}}) + resource.Attributes = append( + resource.Attributes, + &otlpcommon.KeyValue{ + Key: "attribute" + strconv.Itoa(j), + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_IntValue{int64(j)}}, + }, + ) } resources = append(resources, resource) diff --git a/schema/converter/converter.go b/schema/converter/converter.go index bd217d9..9d95432 100644 --- a/schema/converter/converter.go +++ b/schema/converter/converter.go @@ -26,11 +26,13 @@ func convertTraceRequest(request *otlptracecol.ExportTraceServiceRequest, schema } } -func convertMetricRequest(request *otlpmetriccol.ExportMetricsServiceRequest, schema *compiled.Schema) { +func convertMetricRequest( + request *otlpmetriccol.ExportMetricsServiceRequest, schema *compiled.Schema, +) { for _, rss := range request.ResourceMetrics { convertResource(rss.Resource, schema) for _, ils := range rss.InstrumentationLibraryMetrics { - if err := schema.ConvertMetricsToLatest("0.0.0", ils.Metrics); err != nil { + if err := schema.ConvertMetricsToLatest("0.0.0", &ils.Metrics); err != nil { // logger.Debug("Conversion error", zap.Error(err)) } } diff --git a/schema/testdata/schema-example.yaml b/schema/testdata/schema-example.yaml index 3e7dc4f..8e4d022 100644 --- a/schema/testdata/schema-example.yaml +++ b/schema/testdata/schema-example.yaml @@ -134,6 +134,22 @@ versions: # names. The value of the new label is set equal to the value of existing label. container.name: plugin_instance + - split: + # Rules to split a metric into several metrics using an attribute for split. + # Example from the change done by https://github.com/open-telemetry/opentelemetry-specification/pull/2617 + # Name of old metric to split. + apply_to_metric: system.paging.operations + # Name of attribute in the old metric to use for splitting. The attribute will be + # eliminated, the new metric will not have it. + # Note: this can be enhanced in the future to allow an array of attributes + # so that splitting can happen across more than one dimension. + by_attribute: direction + # Names of new metrics to create, one for each possible value of attribute. + attributes_to_metrics: + # If "direction" attribute equals "in" create a new metric called "system.paging.operations.in". + in: system.paging.operations.in + out: system.paging.operations.out + - split: # Rules to split a metric into several metrics using a label for split. #