Skip to content

Commit

Permalink
[receiver/datadogreceiver] add json handling for the api/v2/series en…
Browse files Browse the repository at this point in the history
…dpoint (open-telemetry#36218)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Adding json handling for the `api/v2/series` endpoint. The datadog api
client libraries use json messages, however only protobuf messages are
currently supported in the` api/v2/series` endpoint, so requests fail
with `proto: illegal wireType 6`

If `Content-Type: application/json` is set, then we handle the json
message. Otherwise, we handle the protobuf message.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes
open-telemetry#36079

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added test with a json metric payload that now passes.

Additionally, I also tested these changes in my own image and confirmed
that the datadog api client libraries can now successfully ship metrics
to the `api/v2/series` endpoint.

I also confirmed with the following curl:

```
curl -X POST \
  -H "Content-Type: application/json" \
  -H "DD-API-KEY: your_api_key_here" \
  -d '{
    "series": [
      {
        "resources": [
          {
            "name": "dummyhost",
            "type": "host"
          }
        ],
        "tags": ["env:test"],
        "metric": "test.metric",
        "points": [
          {
            "timestamp": 1730829575,
            "value": 1.0
          }
        ],
        "type": 3
      }
    ]
  }' \
  https://datadog-receiver/api/v2/series
{"errors":[]}
```

---------

Co-authored-by: Sean Marciniak <30928402+MovieStoreGuy@users.noreply.github.com>
  • Loading branch information
2 people authored and sbylica-splunk committed Dec 17, 2024
1 parent 6bfcf63 commit 167abc2
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 4 deletions.
27 changes: 27 additions & 0 deletions .chloggen/36079-add-datadog-json-handling.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'datadogreceiver'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add json handling for the `api/v2/series` endpoint in the datadogreceiver"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36079]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
17 changes: 13 additions & 4 deletions receiver/datadogreceiver/internal/translator/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"encoding/json"
"io"
"net/http"
"strings"
Expand All @@ -27,19 +28,27 @@ type SeriesList struct {
Series []datadogV1.Series `json:"series"`
}

// TODO: add handling for JSON format in additional to protobuf?
func (mt *MetricsTranslator) HandleSeriesV2Payload(req *http.Request) (mp []*gogen.MetricPayload_MetricSeries, err error) {
buf := GetBuffer()
defer PutBuffer(buf)
if _, err := io.Copy(buf, req.Body); err != nil {
return mp, err
}

contentType := req.Header.Get("Content-Type")

pl := new(gogen.MetricPayload)
if err := pl.Unmarshal(buf.Bytes()); err != nil {
return mp, err
}

// handle json messages if set, otherwise handle protobuf
if contentType == "application/json" {
if err := json.Unmarshal(buf.Bytes(), &pl); err != nil {
return mp, err
}
} else {
if err := pl.Unmarshal(buf.Bytes()); err != nil {
return mp, err
}
}
return pl.GetSeries(), nil
}

Expand Down
79 changes: 79 additions & 0 deletions receiver/datadogreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,85 @@ func TestDatadogMetricsV2_EndToEnd(t *testing.T) {
assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(1).StartTimestamp())
}

func TestDatadogMetricsV2_EndToEndJSON(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:0" // Using a randomly assigned address
sink := new(consumertest.MetricsSink)

dd, err := newDataDogReceiver(
cfg,
receivertest.NewNopSettings(),
)
require.NoError(t, err, "Must not error when creating receiver")
dd.(*datadogReceiver).nextMetricsConsumer = sink

require.NoError(t, dd.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
require.NoError(t, dd.Shutdown(context.Background()))
}()

metricsPayloadV2 := []byte(`{
"series": [
{
"metric": "system.load.1",
"type": 1,
"points": [
{
"timestamp": 1636629071,
"value": 1.5
},
{
"timestamp": 1636629081,
"value": 2.0
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
]
}
]
}`)

req, err := http.NewRequest(
http.MethodPost,
fmt.Sprintf("http://%s/api/v2/series", dd.(*datadogReceiver).address),
io.NopCloser(bytes.NewReader(metricsPayloadV2)),
)

req.Header.Set("Content-Type", "application/json")

require.NoError(t, err, "Must not error when creating request")

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err, "Must not error performing request")

body, err := io.ReadAll(resp.Body)
require.NoError(t, multierr.Combine(err, resp.Body.Close()), "Must not error when reading body")
require.JSONEq(t, `{"errors": []}`, string(body), "Expected JSON response to be `{\"errors\": []}`, got %s", string(body))
require.Equal(t, http.StatusAccepted, resp.StatusCode)

mds := sink.AllMetrics()
require.Len(t, mds, 1)
got := mds[0]
require.Equal(t, 1, got.ResourceMetrics().Len())
metrics := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
assert.Equal(t, 1, metrics.Len())
metric := metrics.At(0)
assert.Equal(t, pmetric.MetricTypeSum, metric.Type())
assert.Equal(t, "system.load.1", metric.Name())
assert.Equal(t, pmetric.AggregationTemporalityDelta, metric.Sum().AggregationTemporality())
assert.False(t, metric.Sum().IsMonotonic())
assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(0).Timestamp())
assert.Equal(t, 1.5, metric.Sum().DataPoints().At(0).DoubleValue())
assert.Equal(t, pcommon.Timestamp(0), metric.Sum().DataPoints().At(0).StartTimestamp())
assert.Equal(t, pcommon.Timestamp(1636629081*1_000_000_000), metric.Sum().DataPoints().At(1).Timestamp())
assert.Equal(t, 2.0, metric.Sum().DataPoints().At(1).DoubleValue())
assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(1).StartTimestamp())
}

func TestDatadogSketches_EndToEnd(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:0" // Using a randomly assigned address
Expand Down

0 comments on commit 167abc2

Please sign in to comment.