Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Add a new Test and refactor code function for collectdreceiver #25120

Merged
merged 3 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions receiver/collectdreceiver/collectd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type collectDRecord struct {
Severity *string `json:"severity"`
}

type createMetricInfo struct {
Name string
DsType *string
Val *json.Number
}

func (cdr *collectDRecord) isEvent() bool {
return cdr.Time != nil && cdr.Severity != nil && cdr.Message != nil
}
Expand Down Expand Up @@ -68,14 +74,19 @@ func (cdr *collectDRecord) appendToMetrics(scopeMetrics pmetric.ScopeMetrics, de
if i < len(cdr.Dstypes) && i < len(cdr.Values) && cdr.Values[i] != nil {
dsType, dsName, val := cdr.Dstypes[i], cdr.Dsnames[i], cdr.Values[i]
metricName, usedDsName := cdr.getReasonableMetricName(i, labels)
createMetric := createMetricInfo{
Name: metricName,
DsType: dsType,
Val: val,
}

addIfNotNullOrEmpty(labels, "plugin", cdr.Plugin)
parseAndAddLabels(labels, cdr.PluginInstance, cdr.Host)
if !usedDsName {
addIfNotNullOrEmpty(labels, "dsname", dsName)
}

metric, err := cdr.newMetric(metricName, dsType, val, labels)
metric, err := cdr.newMetric(createMetric, labels)
if err != nil {
return fmt.Errorf("error processing metric %s: %w", sanitize.String(metricName), err)
}
Expand All @@ -87,11 +98,11 @@ func (cdr *collectDRecord) appendToMetrics(scopeMetrics pmetric.ScopeMetrics, de
}

// Create new metric, get labels, then setting attribute and metric info
func (cdr *collectDRecord) newMetric(name string, dsType *string, val *json.Number, labels map[string]string) (pmetric.Metric, error) {
func (cdr *collectDRecord) newMetric(createMetric createMetricInfo, labels map[string]string) (pmetric.Metric, error) {
attributes := setAttributes(labels)
metric, err := cdr.setMetric(name, dsType, val, attributes)
metric, err := cdr.setMetric(createMetric, attributes)
if err != nil {
return pmetric.Metric{}, fmt.Errorf("error processing metric %s: %w", name, err)
return pmetric.Metric{}, fmt.Errorf("error processing metric %s: %w", createMetric.Name, err)
}
return metric, nil
}
Expand All @@ -105,24 +116,23 @@ func setAttributes(labels map[string]string) pcommon.Map {
}

// Set new metric info with name, datapoint, time, attributes
func (cdr *collectDRecord) setMetric(name string, dsType *string, val *json.Number, atr pcommon.Map) (pmetric.Metric, error) {

func (cdr *collectDRecord) setMetric(createMetric createMetricInfo, atr pcommon.Map) (pmetric.Metric, error) {
typ := ""
metric := pmetric.NewMetric()

if dsType != nil {
typ = *dsType
if createMetric.DsType != nil {
typ = *createMetric.DsType
}

metric.SetName(name)
metric.SetName(createMetric.Name)
dataPoint := setDataPoint(typ, metric)
dataPoint.SetTimestamp(cdr.protoTime())
atr.CopyTo(dataPoint.Attributes())

if pointVal, err := val.Int64(); err == nil {
dataPoint.SetIntValue(pointVal)
} else if pointVal, err := val.Float64(); err == nil {
dataPoint.SetDoubleValue(pointVal)
if val, err := createMetric.Val.Int64(); err == nil {
dataPoint.SetIntValue(val)
} else if val, err := createMetric.Val.Float64(); err == nil {
dataPoint.SetDoubleValue(val)
} else {
return pmetric.Metric{}, fmt.Errorf("value could not be decoded: %w", err)
}
Expand Down
123 changes: 69 additions & 54 deletions receiver/collectdreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,19 @@ func TestNewReceiver(t *testing.T) {
}

func TestCollectDServer(t *testing.T) {
const endpoint = "localhost:8081"
defaultAttrsPrefix := "dap_"

t.Parallel()
type testCase struct {
name string
queryParams string
requestBody string
responseCode int
wantData []pmetric.Metrics
Name string
HTTPMethod string
QueryParams string
RequestBody string
ResponseCode int
WantData []pmetric.Metrics
}

var endpoint = "localhost:8081"
defaultAttrsPrefix := "dap_"

wantedRequestBody := wantedBody{
Name: "memory.free",
Time: 1415062577.4949999,
Expand All @@ -93,38 +95,54 @@ func TestCollectDServer(t *testing.T) {
},
Value: 2.1474,
}

wantedRequestBodyMetrics := createWantedMetrics(wantedRequestBody)
testCases := []testCase{{
name: "valid-request-body",
queryParams: "dap_attr1=attr1val",
requestBody: `[
{
"dsnames": [
"value"
],
"dstypes": [
"derive"
],
"host": "i-b13d1e5f",
"interval": 10.0,
"plugin": "memory",
"plugin_instance": "",
"time": 1415062577.4949999,
"type": "memory",
"type_instance": "free",
"values": [
2.1474
]
}
]`,
responseCode: 200,
wantData: []pmetric.Metrics{wantedRequestBodyMetrics},
}, {
name: "invalid-request-body",
requestBody: `invalid-body`,
responseCode: 400,
wantData: []pmetric.Metrics{},
}}

testInvalidHTTPMethodCase := testCase{
Name: "invalid-http-method",
HTTPMethod: "GET",
RequestBody: `invalid-body`,
ResponseCode: 400,
WantData: []pmetric.Metrics{},
}

testValidRequestBodyCase := testCase{
Name: "valid-request-body",
HTTPMethod: "POST",
QueryParams: "dap_attr1=attr1val",
RequestBody: `[
{
"dsnames": [
"value"
],
"dstypes": [
"derive"
],
"host": "i-b13d1e5f",
"interval": 10.0,
"plugin": "memory",
"plugin_instance": "",
"time": 1415062577.4949999,
"type": "memory",
"type_instance": "free",
"values": [
2.1474
]
}
]`,
ResponseCode: 200,
WantData: []pmetric.Metrics{wantedRequestBodyMetrics},
}

testInValidRequestBodyCase := testCase{
Name: "invalid-request-body",
HTTPMethod: "POST",
RequestBody: `invalid-body`,
ResponseCode: 400,
WantData: []pmetric.Metrics{},
}

testCases := []testCase{testInvalidHTTPMethodCase, testValidRequestBodyCase, testInValidRequestBodyCase}

sink := new(consumertest.MetricsSink)

Expand All @@ -145,22 +163,22 @@ func TestCollectDServer(t *testing.T) {
time.Sleep(time.Second)

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
sink.Reset()
req, err := http.NewRequest(
"POST",
"http://"+endpoint+"?"+tt.queryParams,
bytes.NewBuffer([]byte(tt.requestBody)),
tt.HTTPMethod,
"http://"+endpoint+"?"+tt.QueryParams,
bytes.NewBuffer([]byte(tt.RequestBody)),
)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, tt.responseCode, resp.StatusCode)
assert.Equal(t, tt.ResponseCode, resp.StatusCode)
defer resp.Body.Close()

if tt.responseCode != 200 {
if tt.ResponseCode != 200 {
return
}

Expand All @@ -169,30 +187,27 @@ func TestCollectDServer(t *testing.T) {
}, 10*time.Second, 5*time.Millisecond)
mds := sink.AllMetrics()
require.Len(t, mds, 1)
assertMetricsAreEqual(t, tt.wantData, mds)
assertMetricsAreEqual(t, tt.WantData, mds)
})
}
}

func createWantedMetrics(wantedBody wantedBody) pmetric.Metrics {
func createWantedMetrics(wantedRequestBody wantedBody) pmetric.Metrics {
var dataPoint pmetric.NumberDataPoint
testMetrics := pmetric.NewMetrics()
scopeMemtrics := testMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
testMetric := pmetric.NewMetric()
testMetric.SetName(wantedBody.Name)
testMetric.SetName(wantedRequestBody.Name)
sum := testMetric.SetEmptySum()
sum.SetIsMonotonic(true)
dataPoint = sum.DataPoints().AppendEmpty()
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, int64(float64(time.Second)*wantedBody.Time))))
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, int64(float64(time.Second)*wantedRequestBody.Time))))
attributes := pcommon.NewMap()

for key, value := range wantedBody.Attributes {
for key, value := range wantedRequestBody.Attributes {
attributes.PutStr(key, value)
}

attributes.CopyTo(dataPoint.Attributes())
dataPoint.SetDoubleValue(wantedBody.Value)

dataPoint.SetDoubleValue(wantedRequestBody.Value)
newMetric := scopeMemtrics.Metrics().AppendEmpty()
testMetric.MoveTo(newMetric)
return testMetrics
Expand Down