Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor collectd from opensus to pdata #1

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ jobs:
else
echo "One or more matrix jobs failed."
false
fi
fi
240 changes: 122 additions & 118 deletions receiver/collectdreceiver/collectd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,33 @@ package collectdreceiver // import "github.com/open-telemetry/opentelemetry-coll
import (
"encoding/json"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"strings"
"time"
)

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"google.golang.org/protobuf/types/known/timestamppb"
type TargetMetricType string

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
const (
GaugeMetricType = TargetMetricType("gauge")
CumulativeMetricType = TargetMetricType("cumulative")
)

const (
collectDMetricDerive = "derive"
collectDMetricGauge = "gauge"
collectDMetricCounter = "counter"
collectDMetricAbsolute = "absolute"
collectDMetricDerive = "derive"
collectDMetricCounter = "counter"
)

type collectDRecord struct {
Dsnames []*string `json:"dsnames"`
Dstypes []*string `json:"dstypes"`
Host *string `json:"host"`
Interval *float64 `json:"interval"`
Interval *json.Number `json:"interval"`
Plugin *string `json:"plugin"`
PluginInstance *string `json:"plugin_instance"`
Time *float64 `json:"time"`
Time *json.Number `json:"time"`
TypeS *string `json:"type"`
TypeInstance *string `json:"type_instance"`
Values []*json.Number `json:"values"`
Expand All @@ -38,158 +41,174 @@ type collectDRecord struct {
Severity *string `json:"severity"`
}

func (r *collectDRecord) isEvent() bool {
return r.Time != nil && r.Severity != nil && r.Message != nil
type createMetricInfo struct {
Name string
DsType *string
Val *json.Number
}

func (r *collectDRecord) protoTime() *timestamppb.Timestamp {
if r.Time == nil {
return nil
func (collectd *collectDRecord) isEvent() bool {
return collectd.Time != nil && collectd.Severity != nil && collectd.Message != nil
}

func (collectd *collectDRecord) protoTime() pcommon.Timestamp {
if collectd.Time == nil {
return pcommon.NewTimestampFromTime(time.Time{})
}
ts := time.Unix(0, int64(float64(time.Second)**r.Time))
return timestamppb.New(ts)
collectedTime, _ := parseTime(*collectd.Time)
timeStamp := time.Unix(0, 0).Add(collectedTime)
return pcommon.NewTimestampFromTime(timeStamp)
}

func (r *collectDRecord) startTimestamp(mdType metricspb.MetricDescriptor_Type) *timestamppb.Timestamp {
if mdType == metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION || mdType == metricspb.MetricDescriptor_CUMULATIVE_DOUBLE || mdType == metricspb.MetricDescriptor_CUMULATIVE_INT64 {
return timestamppb.New(time.Unix(0, int64((*r.Time-*r.Interval)*float64(time.Second))))
func (collectd *collectDRecord) startTimestamp(metricType TargetMetricType) pcommon.Timestamp {

collectedTime, _ := parseTime(*collectd.Time)
collectdInterval, _ := parseTime(*collectd.Interval)
timeDiff := collectedTime - collectdInterval
if metricType == CumulativeMetricType {
return pcommon.NewTimestampFromTime(time.Unix(0, 0).Add(timeDiff))
}
return nil
return pcommon.NewTimestampFromTime(time.Time{})
}

func (r *collectDRecord) appendToMetrics(metrics []*metricspb.Metric, defaultLabels map[string]string) ([]*metricspb.Metric, error) {
// Ignore if record is an event instead of data point
if r.isEvent() {
recordEventsReceived()
return metrics, nil
func parseTime(timeValue json.Number) (time.Duration, error) {
timeStamp := timeValue.String()
duration, err := time.ParseDuration(timeStamp + "s")
return duration, err
}

func (collectd *collectDRecord) appendToMetrics(scopeMetrics pmetric.ScopeMetrics, defaultLabels map[string]string) error {

if collectd.isEvent() {
recordEventsReceived()
return nil
}

recordMetricsReceived()

labels := make(map[string]string, len(defaultLabels))

for k, v := range defaultLabels {
labels[k] = v
}

for i := range r.Dsnames {
if i < len(r.Dstypes) && i < len(r.Values) && r.Values[i] != nil {
dsType, dsName, val := r.Dstypes[i], r.Dsnames[i], r.Values[i]
metricName, usedDsName := r.getReasonableMetricName(i, labels)
for i := range collectd.Dsnames {

addIfNotNullOrEmpty(labels, "plugin", r.Plugin)
parseAndAddLabels(labels, r.PluginInstance, r.Host)
if i < len(collectd.Dstypes) && i < len(collectd.Values) && collectd.Values[i] != nil {
dsType, dsName, val := collectd.Dstypes[i], collectd.Dsnames[i], collectd.Values[i]
metricName, usedDsName := collectd.getReasonableMetricName(i, labels)
createMetric := createMetricInfo{
Name: metricName,
DsType: dsType,
Val: val,
}

addIfNotNullOrEmpty(labels, "plugin", collectd.Plugin)
parseAndAddLabels(labels, collectd.PluginInstance, collectd.Host)
if !usedDsName {
addIfNotNullOrEmpty(labels, "dsname", dsName)
}

metric, err := r.newMetric(metricName, dsType, val, labels)
metric, err := collectd.newMetric(createMetric, labels)
if err != nil {
return metrics, fmt.Errorf("error processing metric %s: %w", sanitize.String(metricName), err)
return fmt.Errorf("error processing metric %s: %w", sanitize.String(metricName), err)
}
metrics = append(metrics, metric)

newMetric := scopeMetrics.Metrics().AppendEmpty()
metric.MoveTo(newMetric)
}
}
return metrics, nil
return nil
}

func (r *collectDRecord) newMetric(name string, dsType *string, val *json.Number, labels map[string]string) (*metricspb.Metric, error) {
metric := &metricspb.Metric{}
point, isDouble, err := r.newPoint(val)
// Create new metric, get labels, then setting attribute and metric info
// Returns:
// A new Metric
func (collectd *collectDRecord) newMetric(createMetric createMetricInfo, labels map[string]string) (pmetric.Metric, error) {
attributes := setAttributes(labels)
metric, err := collectd.setMetric(createMetric, attributes)
if err != nil {
return metric, fmt.Errorf("error processing metric %s: %w", name, err)
}

lKeys, lValues := labelKeysAndValues(labels)
metricType := r.metricType(dsType, isDouble)
metric.MetricDescriptor = &metricspb.MetricDescriptor{
Name: name,
Type: metricType,
LabelKeys: lKeys,
return pmetric.Metric{}, fmt.Errorf("error processing metric %s: %w", createMetric.Name, err)
}
metric.Timeseries = []*metricspb.TimeSeries{
{
StartTimestamp: r.startTimestamp(metricType),
LabelValues: lValues,
Points: []*metricspb.Point{point},
},
}

return metric, nil
}

func (r *collectDRecord) metricType(dsType *string, isDouble bool) metricspb.MetricDescriptor_Type {
val := ""
if dsType != nil {
val = *dsType
// Set new metric info with name, datapoint, time, attributes
// Returns:
// new Metric
func (collectd *collectDRecord) setMetric(createMetric createMetricInfo, attributes pcommon.Map) (pmetric.Metric, error) {
typ := ""
metric := pmetric.NewMetric()

if createMetric.DsType != nil {
typ = *createMetric.DsType
}

switch val {
case collectDMetricCounter, collectDMetricDerive:
return metricCumulative(isDouble)
metric.SetName(createMetric.Name)
dataPoint := collectd.setDataPoint(typ, metric)
// todo: ask from pst to utc is ok???
dataPoint.SetTimestamp(collectd.protoTime())
attributes.CopyTo(dataPoint.Attributes())

// Prometheus collectd exporter just ignores it. We use gauge for it as it seems the
// closes type. https://github.com/prometheus/collectd_exporter/blob/master/main.go#L109-L129
case collectDMetricGauge, collectDMetricAbsolute:
return metricGauge(isDouble)
if val, err := createMetric.Val.Int64(); err == nil {
dataPoint.SetIntValue(val)
} else if val, err := createMetric.Val.Float64(); err == nil {
dataPoint.SetDoubleValue(val)
} else {
return pmetric.Metric{}, fmt.Errorf("value could not be decoded: %w", err)
}
return metricGauge(isDouble)
}

func (r *collectDRecord) newPoint(val *json.Number) (*metricspb.Point, bool, error) {
p := &metricspb.Point{
Timestamp: r.protoTime(),
}
return metric, nil
}

isDouble := true
if v, err := val.Int64(); err == nil {
isDouble = false
p.Value = &metricspb.Point_Int64Value{Int64Value: v}
} else {
v, err := val.Float64()
if err != nil {
return nil, isDouble, fmt.Errorf("value could not be decoded: %w", err)
}
p.Value = &metricspb.Point_DoubleValue{DoubleValue: v}
func (collectd *collectDRecord) setDataPoint(typ string, metric pmetric.Metric) pmetric.NumberDataPoint {
var dataPoint pmetric.NumberDataPoint
switch typ {
case collectDMetricCounter, collectDMetricDerive:
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
dataPoint = sum.DataPoints().AppendEmpty()
default:
dataPoint = metric.SetEmptyGauge().DataPoints().AppendEmpty()
}
return p, isDouble, nil
return dataPoint
}

// getReasonableMetricName creates metrics names by joining them (if non empty) type.typeinstance
// if there are more than one dsname append .dsname for the particular uint. if there's only one it
// becomes a dimension.
func (r *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
func (collectd *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
usedDsName := false
capacity := 0
if r.TypeS != nil {
capacity += len(*r.TypeS)
if collectd.TypeS != nil {
capacity += len(*collectd.TypeS)
}
if r.TypeInstance != nil {
capacity += len(*r.TypeInstance)
if collectd.TypeInstance != nil {
capacity += len(*collectd.TypeInstance)
}
parts := make([]byte, 0, capacity)

if !isNilOrEmpty(r.TypeS) {
parts = append(parts, *r.TypeS...)
if !isNilOrEmpty(collectd.TypeS) {
parts = append(parts, *collectd.TypeS...)
}
parts = r.pointTypeInstance(attrs, parts)
if r.Dsnames != nil && !isNilOrEmpty(r.Dsnames[index]) && len(r.Dsnames) > 1 {
parts = collectd.pointTypeInstance(attrs, parts)
if collectd.Dsnames != nil && !isNilOrEmpty(collectd.Dsnames[index]) && len(collectd.Dsnames) > 1 {
if len(parts) > 0 {
parts = append(parts, '.')
}
parts = append(parts, *r.Dsnames[index]...)
parts = append(parts, *collectd.Dsnames[index]...)
usedDsName = true
}
return string(parts), usedDsName
}

// pointTypeInstance extracts information from the TypeInstance field and appends to the metric name when possible.
func (r *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
if isNilOrEmpty(r.TypeInstance) {
func (collectd *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
if isNilOrEmpty(collectd.TypeInstance) {
return parts
}

instanceName, extractedAttrs := LabelsFromName(r.TypeInstance)
instanceName, extractedAttrs := LabelsFromName(collectd.TypeInstance)
if instanceName != "" {
if len(parts) > 0 {
parts = append(parts, '.')
Expand All @@ -215,10 +234,12 @@ func (r *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte
// to functions like strings.Slice.
func LabelsFromName(val *string) (metricName string, labels map[string]string) {
metricName = *val

index := strings.Index(*val, "[")
if index > -1 {
left := (*val)[:index]
rest := (*val)[index+1:]

index = strings.Index(rest, "]")
if index > -1 {
working := make(map[string]string)
Expand Down Expand Up @@ -276,28 +297,11 @@ func parseNameForLabels(labels map[string]string, key string, val *string) {
addIfNotNullOrEmpty(labels, key, &instanceName)
}

func labelKeysAndValues(labels map[string]string) ([]*metricspb.LabelKey, []*metricspb.LabelValue) {
keys := make([]*metricspb.LabelKey, len(labels))
values := make([]*metricspb.LabelValue, len(labels))
i := 0
for k, v := range labels {
keys[i] = &metricspb.LabelKey{Key: k}
values[i] = &metricspb.LabelValue{Value: v, HasValue: true}
i++
}
return keys, values
}

func metricCumulative(isDouble bool) metricspb.MetricDescriptor_Type {
if isDouble {
return metricspb.MetricDescriptor_CUMULATIVE_DOUBLE
}
return metricspb.MetricDescriptor_CUMULATIVE_INT64
}
func setAttributes(labels map[string]string) pcommon.Map {

func metricGauge(isDouble bool) metricspb.MetricDescriptor_Type {
if isDouble {
return metricspb.MetricDescriptor_GAUGE_DOUBLE
attributes := pcommon.NewMap()
for k, v := range labels {
attributes.PutStr(k, v)
}
return metricspb.MetricDescriptor_GAUGE_INT64
return attributes
}
Loading