Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor collectd from opensus to pdata #7

Merged
merged 4 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 91 additions & 117 deletions receiver/collectdreceiver/collectd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ import (
"strings"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"google.golang.org/protobuf/types/known/timestamppb"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
)

type TargetMetricType string

const (
collectDMetricDerive = "derive"
collectDMetricGauge = "gauge"
collectDMetricCounter = "counter"
collectDMetricAbsolute = "absolute"
GaugeMetricType = TargetMetricType("gauge")
CumulativeMetricType = TargetMetricType("cumulative")
)

const (
collectDMetricDerive = "derive"
collectDMetricCounter = "counter"
)

type collectDRecord struct {
Expand All @@ -38,31 +43,31 @@ type collectDRecord struct {
Severity *string `json:"severity"`
}

func (r *collectDRecord) isEvent() bool {
return r.Time != nil && r.Severity != nil && r.Message != nil
func (cdr *collectDRecord) isEvent() bool {
return cdr.Time != nil && cdr.Severity != nil && cdr.Message != nil
}

func (r *collectDRecord) protoTime() *timestamppb.Timestamp {
if r.Time == nil {
return nil
func (cdr *collectDRecord) protoTime() pcommon.Timestamp {
// Return 1970-01-01 00:00:00 +0000 UTC.
if cdr.Time == nil {
return pcommon.NewTimestampFromTime(time.Unix(0, 0))
}
ts := time.Unix(0, int64(float64(time.Second)**r.Time))
return timestamppb.New(ts)
ts := time.Unix(0, int64(float64(time.Second)**cdr.Time))
return pcommon.NewTimestampFromTime(ts)
}

func (r *collectDRecord) startTimestamp(mdType metricspb.MetricDescriptor_Type) *timestamppb.Timestamp {
if mdType == metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION || mdType == metricspb.MetricDescriptor_CUMULATIVE_DOUBLE || mdType == metricspb.MetricDescriptor_CUMULATIVE_INT64 {
return timestamppb.New(time.Unix(0, int64((*r.Time-*r.Interval)*float64(time.Second))))
func (cdr *collectDRecord) startTimestamp(metricType TargetMetricType) pcommon.Timestamp {
if metricType == CumulativeMetricType {
return pcommon.NewTimestampFromTime(time.Unix(0, int64((*cdr.Time-*cdr.Interval)*float64(time.Second))))
}
return nil
return pcommon.NewTimestampFromTime(time.Unix(0, 0))
}

func (r *collectDRecord) appendToMetrics(metrics []*metricspb.Metric, defaultLabels map[string]string) ([]*metricspb.Metric, error) {
func (cdr *collectDRecord) appendToMetrics(scopeMetrics pmetric.ScopeMetrics, defaultLabels map[string]string) error {
// Ignore if record is an event instead of data point
if r.isEvent() {
if cdr.isEvent() {
recordEventsReceived()
return metrics, nil

return nil
}

recordMetricsReceived()
Expand All @@ -71,125 +76,120 @@ func (r *collectDRecord) appendToMetrics(metrics []*metricspb.Metric, defaultLab
labels[k] = v
}

for i := range r.Dsnames {
if i < len(r.Dstypes) && i < len(r.Values) && r.Values[i] != nil {
dsType, dsName, val := r.Dstypes[i], r.Dsnames[i], r.Values[i]
metricName, usedDsName := r.getReasonableMetricName(i, labels)
for i := range cdr.Dsnames {
if i < len(cdr.Dstypes) && i < len(cdr.Values) && cdr.Values[i] != nil {
dsType, dsName, val := cdr.Dstypes[i], cdr.Dsnames[i], cdr.Values[i]
metricName, usedDsName := cdr.getReasonableMetricName(i, labels)

addIfNotNullOrEmpty(labels, "plugin", r.Plugin)
parseAndAddLabels(labels, r.PluginInstance, r.Host)
addIfNotNullOrEmpty(labels, "plugin", cdr.Plugin)
parseAndAddLabels(labels, cdr.PluginInstance, cdr.Host)
if !usedDsName {
addIfNotNullOrEmpty(labels, "dsname", dsName)
}

metric, err := r.newMetric(metricName, dsType, val, labels)
metric, err := cdr.newMetric(metricName, dsType, val, labels)
if err != nil {
return metrics, fmt.Errorf("error processing metric %s: %w", sanitize.String(metricName), err)
return fmt.Errorf("error processing metric %s: %w", sanitize.String(metricName), err)
}
metrics = append(metrics, metric)

newMetric := scopeMetrics.Metrics().AppendEmpty()
metric.MoveTo(newMetric)
}
}
return metrics, nil
return nil
}

func (r *collectDRecord) newMetric(name string, dsType *string, val *json.Number, labels map[string]string) (*metricspb.Metric, error) {
metric := &metricspb.Metric{}
point, isDouble, err := r.newPoint(val)
// Create new metric, get labels, then setting attribute and metric info
func (cdr *collectDRecord) newMetric(name string, dsType *string, val *json.Number, labels map[string]string) (pmetric.Metric, error) {
attributes := setAttributes(labels)
metric, err := cdr.setMetric(name, dsType, val, attributes)
if err != nil {
return metric, fmt.Errorf("error processing metric %s: %w", name, err)
return pmetric.Metric{}, fmt.Errorf("error processing metric %s: %w", name, err)
}
return metric, nil
}

lKeys, lValues := labelKeysAndValues(labels)
metricType := r.metricType(dsType, isDouble)
metric.MetricDescriptor = &metricspb.MetricDescriptor{
Name: name,
Type: metricType,
LabelKeys: lKeys,
}
metric.Timeseries = []*metricspb.TimeSeries{
{
StartTimestamp: r.startTimestamp(metricType),
LabelValues: lValues,
Points: []*metricspb.Point{point},
},
func setAttributes(labels map[string]string) pcommon.Map {
attributes := pcommon.NewMap()
for k, v := range labels {
attributes.PutStr(k, v)
}

return metric, nil
return attributes
}

func (r *collectDRecord) metricType(dsType *string, isDouble bool) metricspb.MetricDescriptor_Type {
val := ""
// Set new metric info with name, datapoint, time, attributes
func (cdr *collectDRecord) setMetric(name string, dsType *string, val *json.Number, atr pcommon.Map) (pmetric.Metric, error) {

typ := ""
metric := pmetric.NewMetric()

if dsType != nil {
val = *dsType
typ = *dsType
}

switch val {
case collectDMetricCounter, collectDMetricDerive:
return metricCumulative(isDouble)
metric.SetName(name)
dataPoint := setDataPoint(typ, metric)
dataPoint.SetTimestamp(cdr.protoTime())
atr.CopyTo(dataPoint.Attributes())

// Prometheus collectd exporter just ignores it. We use gauge for it as it seems the
// closes type. https://github.com/prometheus/collectd_exporter/blob/master/main.go#L109-L129
case collectDMetricGauge, collectDMetricAbsolute:
return metricGauge(isDouble)
if pointVal, err := val.Int64(); err == nil {
dataPoint.SetIntValue(pointVal)
} else if pointVal, err := val.Float64(); err == nil {
dataPoint.SetDoubleValue(pointVal)
} else {
return pmetric.Metric{}, fmt.Errorf("value could not be decoded: %w", err)
}
return metricGauge(isDouble)
return metric, nil
}

func (r *collectDRecord) newPoint(val *json.Number) (*metricspb.Point, bool, error) {
p := &metricspb.Point{
Timestamp: r.protoTime(),
}

isDouble := true
if v, err := val.Int64(); err == nil {
isDouble = false
p.Value = &metricspb.Point_Int64Value{Int64Value: v}
} else {
v, err := val.Float64()
if err != nil {
return nil, isDouble, fmt.Errorf("value could not be decoded: %w", err)
}
p.Value = &metricspb.Point_DoubleValue{DoubleValue: v}
// check type to decide metric type and return data point
func setDataPoint(typ string, metric pmetric.Metric) pmetric.NumberDataPoint {
var dataPoint pmetric.NumberDataPoint
switch typ {
case collectDMetricCounter, collectDMetricDerive:
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
dataPoint = sum.DataPoints().AppendEmpty()
default:
dataPoint = metric.SetEmptyGauge().DataPoints().AppendEmpty()
}
return p, isDouble, nil
return dataPoint
}

// getReasonableMetricName creates metrics names by joining them (if non empty) type.typeinstance
// if there are more than one dsname append .dsname for the particular uint. if there's only one it
// becomes a dimension.
func (r *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
func (cdr *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
usedDsName := false
capacity := 0
if r.TypeS != nil {
capacity += len(*r.TypeS)
if cdr.TypeS != nil {
capacity += len(*cdr.TypeS)
}
if r.TypeInstance != nil {
capacity += len(*r.TypeInstance)
if cdr.TypeInstance != nil {
capacity += len(*cdr.TypeInstance)
}
parts := make([]byte, 0, capacity)

if !isNilOrEmpty(r.TypeS) {
parts = append(parts, *r.TypeS...)
if !isNilOrEmpty(cdr.TypeS) {
parts = append(parts, *cdr.TypeS...)
}
parts = r.pointTypeInstance(attrs, parts)
if r.Dsnames != nil && !isNilOrEmpty(r.Dsnames[index]) && len(r.Dsnames) > 1 {
parts = cdr.pointTypeInstance(attrs, parts)
if cdr.Dsnames != nil && !isNilOrEmpty(cdr.Dsnames[index]) && len(cdr.Dsnames) > 1 {
if len(parts) > 0 {
parts = append(parts, '.')
}
parts = append(parts, *r.Dsnames[index]...)
parts = append(parts, *cdr.Dsnames[index]...)
usedDsName = true
}
return string(parts), usedDsName
}

// pointTypeInstance extracts information from the TypeInstance field and appends to the metric name when possible.
func (r *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
if isNilOrEmpty(r.TypeInstance) {
func (cdr *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
if isNilOrEmpty(cdr.TypeInstance) {
return parts
}

instanceName, extractedAttrs := LabelsFromName(r.TypeInstance)
instanceName, extractedAttrs := LabelsFromName(cdr.TypeInstance)
if instanceName != "" {
if len(parts) > 0 {
parts = append(parts, '.')
Expand Down Expand Up @@ -275,29 +275,3 @@ func parseNameForLabels(labels map[string]string, key string, val *string) {
}
addIfNotNullOrEmpty(labels, key, &instanceName)
}

func labelKeysAndValues(labels map[string]string) ([]*metricspb.LabelKey, []*metricspb.LabelValue) {
keys := make([]*metricspb.LabelKey, len(labels))
values := make([]*metricspb.LabelValue, len(labels))
i := 0
for k, v := range labels {
keys[i] = &metricspb.LabelKey{Key: k}
values[i] = &metricspb.LabelValue{Value: v, HasValue: true}
i++
}
return keys, values
}

func metricCumulative(isDouble bool) metricspb.MetricDescriptor_Type {
if isDouble {
return metricspb.MetricDescriptor_CUMULATIVE_DOUBLE
}
return metricspb.MetricDescriptor_CUMULATIVE_INT64
}

func metricGauge(isDouble bool) metricspb.MetricDescriptor_Type {
if isDouble {
return metricspb.MetricDescriptor_GAUGE_DOUBLE
}
return metricspb.MetricDescriptor_GAUGE_INT64
}
Loading