Skip to content

Commit

Permalink
Merge branch 'main' of github.com:shree007/opentelemetry-collector-co…
Browse files Browse the repository at this point in the history
…ntrib into otel-7386
  • Loading branch information
shree007 authored and hero committed Jan 27, 2022
2 parents c236599 + 4ccaabd commit d84dab0
Show file tree
Hide file tree
Showing 20 changed files with 593 additions and 74 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

## Unreleased

## 💡 Enhancements 💡

- `tanzuobservabilityexporter`: Turn on metrics exporter (#7281)
- `attributesprocessor` `resourceprocessor`: Add `from_context` value source

## 🛑 Breaking changes 🛑

## 🚀 New components 🚀

## 🧰 Bug fixes 🧰

## v0.43.0

## 💡 Enhancements 💡
Expand All @@ -14,6 +25,7 @@
- `elasticsearchreceiver`: Implement scraping logic (#7174)
- `datadogexporter`: Add http.status_code tag to trace stats (#6889)
- `datadogexporter`: Add configuration option to use OTel span name into the Datatog resource name (#6611)
- `dynatraceexporter`: Write error logs using plugin logger (#7360)
- `mongodbreceiver`: Add initial client code to the component (#7125)
- `tanzuobservabilityexporter`: Support delta histograms (#6897)
- `awscloudwatchlogsexporter`: Use cwlogs package to export logs (#7152)
Expand Down Expand Up @@ -64,6 +76,7 @@
- `k8sattributeprocessor`: Process IP correctly for net.Addr instances that are not typed (#7133)
- `mdatagen`: Fix validation of `enabled` field in metadata.yaml (#7166)
- `elasticsearch`: Fix timestamp for each metric being startup time (#7255)
- `prometheusremotewriteexporter`: Fix index out of range panic caused by expiring metrics (#7149)
- `resourcedetection`: Log the error when checking for ec2metadata availability (#7296)

## v0.42.0
Expand Down
55 changes: 34 additions & 21 deletions exporter/dynatraceexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dynatraceexporter/config"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dynatraceexporter/serialization"
Expand Down Expand Up @@ -99,10 +98,10 @@ func (e *exporter) PushMetricsData(ctx context.Context, md pdata.Metrics) error
}

lines := e.serializeMetrics(md)
ce := e.settings.Logger.Check(zapcore.DebugLevel, "Serialization complete")
if ce != nil {
ce.Write(zap.Int("DataPoints", md.DataPointCount()), zap.Int("Lines", len(lines)))
}
e.settings.Logger.Sugar().Debugw("Serialization complete",
"DataPoints", md.DataPointCount(),
"lines", len(lines),
)

// If request is empty string, there are no serializable metrics in the batch.
// This can happen if all metric names are invalid
Expand Down Expand Up @@ -136,13 +135,22 @@ func (e *exporter) serializeMetrics(md pdata.Metrics) []string {
metricLines, err := serialization.SerializeMetric(e.settings.Logger, e.cfg.Prefix, metric, e.defaultDimensions, e.staticDimensions, e.prevPts)

if err != nil {
e.settings.Logger.Sugar().Errorf("failed to serialize %s %s: %s", metric.DataType().String(), metric.Name(), err.Error())
e.settings.Logger.Sugar().Errorw(
"failed to serialize",
"datatype", metric.DataType().String(),
"name", metric.Name(),
zap.Error(err),
)
}

if len(metricLines) > 0 {
lines = append(lines, metricLines...)
}
e.settings.Logger.Debug(fmt.Sprintf("Serialized %s %s - %d lines", metric.DataType().String(), metric.Name(), len(metricLines)))
e.settings.Logger.Sugar().Debugw("Serialized metric data",
"metric-type", metric.DataType().String(),
"metric-name", metric.Name(),
"data-len", len(metricLines),
)
}
}
}
Expand All @@ -155,10 +163,10 @@ var lastLog int64
// send sends a serialized metric batch to Dynatrace.
// An error indicates all lines were dropped regardless of the returned number.
func (e *exporter) send(ctx context.Context, lines []string) error {
e.settings.Logger.Sugar().Debugf("Exporting %d lines", len(lines))
e.settings.Logger.Debug("Exporting", zap.Int("lines", len(lines)))

if now := time.Now().Unix(); len(lines) > apiconstants.GetPayloadLinesLimit() && now-lastLog > 60 {
e.settings.Logger.Warn(fmt.Sprintf("Batch too large. Sending in chunks of %[1]d metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to %[1]d or less. Suppressing this log for 60 seconds.", apiconstants.GetPayloadLinesLimit()))
e.settings.Logger.Sugar().Warnf("Batch too large. Sending in chunks of %[1]d metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to %[1]d or less. Suppressing this log for 60 seconds.", apiconstants.GetPayloadLinesLimit())
lastLog = time.Now().Unix()
}

Expand All @@ -182,7 +190,10 @@ func (e *exporter) send(ctx context.Context, lines []string) error {
// An error indicates all lines were dropped regardless of the returned number.
func (e *exporter) sendBatch(ctx context.Context, lines []string) error {
message := strings.Join(lines, "\n")
e.settings.Logger.Debug("SendBatch", zap.Int("lines", len(lines)), zap.String("endpoint", e.cfg.Endpoint))
e.settings.Logger.Sugar().Debugw("SendBatch",
"lines", len(lines),
"endpoint", e.cfg.Endpoint,
)

req, err := http.NewRequestWithContext(ctx, "POST", e.cfg.Endpoint, bytes.NewBufferString(message))

Expand All @@ -194,7 +205,8 @@ func (e *exporter) sendBatch(ctx context.Context, lines []string) error {
resp, err := e.client.Do(req)

if err != nil {
return err
e.settings.Logger.Error("failed to send request", zap.Error(err))
return fmt.Errorf("sendBatch: %w", err)
}

defer resp.Body.Close()
Expand All @@ -209,28 +221,28 @@ func (e *exporter) sendBatch(ctx context.Context, lines []string) error {
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
// if the response cannot be read, do not retry the batch as it may have been successful
e.settings.Logger.Error(fmt.Sprintf("failed to read response: %s", err.Error()))
e.settings.Logger.Error("Failed to read response from Dynatrace", zap.Error(err))
return nil
}

responseBody := metricsResponse{}
if err := json.Unmarshal(bodyBytes, &responseBody); err != nil {
// if the response cannot be read, do not retry the batch as it may have been successful
e.settings.Logger.Error("failed to unmarshal response", zap.Error(err), zap.ByteString("body", bodyBytes))
e.settings.Logger.Error("Failed to unmarshal response from Dynatrace", zap.Error(err), zap.ByteString("body", bodyBytes))
return nil
}

e.settings.Logger.Debug(fmt.Sprintf("Accepted %d lines", responseBody.Ok))
e.settings.Logger.Error(fmt.Sprintf("Rejected %d lines", responseBody.Invalid))

if responseBody.Error.Message != "" {
e.settings.Logger.Error(fmt.Sprintf("Error from Dynatrace: %s", responseBody.Error.Message))
}
e.settings.Logger.Sugar().Errorw("Response from Dynatrace",
"accepted-lines", responseBody.Ok,
"rejected-lines", responseBody.Invalid,
"error-message", responseBody.Error.Message,
"status", resp.Status,
)

for _, line := range responseBody.Error.InvalidLines {
// Enabled debug logging to see which lines were dropped
if line.Line >= 0 && line.Line < len(lines) {
e.settings.Logger.Debug(fmt.Sprintf("rejected line %3d: [%s] %s", line.Line, line.Error, lines[line.Line]))
e.settings.Logger.Sugar().Debugf("rejected line %3d: [%s] %s", line.Line, line.Error, lines[line.Line])
}
}

Expand Down Expand Up @@ -259,7 +271,8 @@ func (e *exporter) sendBatch(ctx context.Context, lines []string) error {
func (e *exporter) start(_ context.Context, host component.Host) (err error) {
client, err := e.cfg.HTTPClientSettings.ToClient(host.GetExtensions(), e.settings)
if err != nil {
return err
e.settings.Logger.Error("Failed to construct HTTP client", zap.Error(err))
return fmt.Errorf("start: %w", err)
}

e.client = client
Expand Down
10 changes: 10 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func Test_PushMetrics(t *testing.T) {

// staleNaN cases
staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram])
staleNaNEmptyHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNEmptyHistogram])

staleNaNSummaryBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSummary])

Expand Down Expand Up @@ -593,6 +594,15 @@ func Test_PushMetrics(t *testing.T) {
false,
true,
},
{
"staleNaNEmptyHistogram_case",
&staleNaNEmptyHistogramBatch,
checkFunc,
3,
http.StatusAccepted,
false,
true,
},
{
"staleNaNSummary_case",
&staleNaNSummaryBatch,
Expand Down
5 changes: 3 additions & 2 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,14 @@ func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Res
bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound})
}
// add le=+Inf bucket
cumulativeCount += pt.BucketCounts()[len(pt.BucketCounts())-1]
infBucket := &prompb.Sample{
Value: float64(cumulativeCount),
Timestamp: time,
}
if pt.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
infBucket.Value = math.Float64frombits(value.StaleNaN)
} else {
cumulativeCount += pt.BucketCounts()[len(pt.BucketCounts())-1]
infBucket.Value = float64(cumulativeCount)
}
infLabels := createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName+bucketStr, leStr, pInfStr)
sig := addSample(tsMap, infBucket, infLabels, metric)
Expand Down
26 changes: 14 additions & 12 deletions exporter/prometheusremotewriteexporter/testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,23 @@ var (
emptyCumulativeSum: getEmptyCumulativeSumMetric(emptyCumulativeSum),
emptyCumulativeHistogram: getEmptyCumulativeHistogramMetric(emptyCumulativeHistogram),
}
staleNaNIntGauge = "staleNaNIntGauge"
staleNaNDoubleGauge = "staleNaNDoubleGauge"
staleNaNIntSum = "staleNaNIntSum"
staleNaNSum = "staleNaNSum"
staleNaNHistogram = "staleNaNHistogram"
staleNaNSummary = "staleNaNSummary"
staleNaNIntGauge = "staleNaNIntGauge"
staleNaNDoubleGauge = "staleNaNDoubleGauge"
staleNaNIntSum = "staleNaNIntSum"
staleNaNSum = "staleNaNSum"
staleNaNHistogram = "staleNaNHistogram"
staleNaNEmptyHistogram = "staleNaNEmptyHistogram"
staleNaNSummary = "staleNaNSummary"

// staleNaN metrics as input should have the staleness marker flag
staleNaNMetrics = map[string]pdata.Metric{
staleNaNIntGauge: getIntGaugeMetric(staleNaNIntGauge, lbs1, intVal1, time1),
staleNaNDoubleGauge: getDoubleGaugeMetric(staleNaNDoubleGauge, lbs1, floatVal1, time1),
staleNaNIntSum: getIntSumMetric(staleNaNIntSum, lbs1, intVal1, time1),
staleNaNSum: getSumMetric(staleNaNSum, lbs1, floatVal1, time1),
staleNaNHistogram: getHistogramMetric(staleNaNHistogram, lbs1, time1, floatVal2, uint64(intVal2), bounds, buckets),
staleNaNSummary: getSummaryMetric(staleNaNSummary, lbs2, time2, floatVal2, uint64(intVal2), quantiles),
staleNaNIntGauge: getIntGaugeMetric(staleNaNIntGauge, lbs1, intVal1, time1),
staleNaNDoubleGauge: getDoubleGaugeMetric(staleNaNDoubleGauge, lbs1, floatVal1, time1),
staleNaNIntSum: getIntSumMetric(staleNaNIntSum, lbs1, intVal1, time1),
staleNaNSum: getSumMetric(staleNaNSum, lbs1, floatVal1, time1),
staleNaNHistogram: getHistogramMetric(staleNaNHistogram, lbs1, time1, floatVal2, uint64(intVal2), bounds, buckets),
staleNaNEmptyHistogram: getHistogramMetric(staleNaNEmptyHistogram, lbs1, time1, floatVal2, uint64(intVal2), []float64{}, []uint64{}),
staleNaNSummary: getSummaryMetric(staleNaNSummary, lbs2, time2, floatVal2, uint64(intVal2), quantiles),
}
)

Expand Down
32 changes: 27 additions & 5 deletions exporter/tanzuobservabilityexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tanzuobservabilityexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tanzuobservabilityexporter"

import (
"errors"
"fmt"
"net/url"

Expand All @@ -27,22 +28,43 @@ type TracesConfig struct {
confighttp.HTTPClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
}

type MetricsConfig struct {
confighttp.HTTPClientSettings `mapstructure:",squash"`
}

// Config defines configuration options for the exporter.
type Config struct {
config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// Traces defines the Traces exporter specific configuration
Traces TracesConfig `mapstructure:"traces"`
Traces TracesConfig `mapstructure:"traces"`
Metrics MetricsConfig `mapstructure:"metrics"`
}

func (c *Config) Validate() error {
if c.Traces.Endpoint == "" {
return fmt.Errorf("A non-empty traces.endpoint is required")
tracesURL, err := parseEndpoint("traces", c.Traces.Endpoint)
if err != nil {
return err
}
if _, err := url.Parse(c.Traces.Endpoint); err != nil {
return fmt.Errorf("invalid traces.endpoint %s", err)
metricsURL, err := parseEndpoint("metrics", c.Metrics.Endpoint)
if err != nil {
return err
}
if tracesURL.Hostname() != metricsURL.Hostname() {
return errors.New("host for metrics and traces must be the same")
}
return nil
}

func parseEndpoint(name string, endpoint string) (*url.URL, error) {
if endpoint == "" {
return nil, fmt.Errorf("A non-empty %s.endpoint is required", name)
}
u, err := url.Parse(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid %s.endpoint %s", name, err)
}
return u, nil
}
60 changes: 60 additions & 0 deletions exporter/tanzuobservabilityexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func TestConfigRequiresNonEmptyEndpoint(t *testing.T) {
Traces: TracesConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: ""},
},
Metrics: MetricsConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost:2878"},
},
}

assert.Error(t, c.Validate())
Expand All @@ -39,7 +42,64 @@ func TestConfigRequiresValidEndpointUrl(t *testing.T) {
Traces: TracesConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http#$%^&#$%&#"},
},
Metrics: MetricsConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost:2878"},
},
}

assert.Error(t, c.Validate())
}

func TestMetricsConfigRequiresNonEmptyEndpoint(t *testing.T) {
c := &Config{
ExporterSettings: config.ExporterSettings{},
Traces: TracesConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost:30001"},
},
Metrics: MetricsConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: ""},
},
}

assert.Error(t, c.Validate())
}

func TestMetricsConfigRequiresValidEndpointUrl(t *testing.T) {
c := &Config{
ExporterSettings: config.ExporterSettings{},
Traces: TracesConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost:30001"},
},
Metrics: MetricsConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http#$%^&#$%&#"},
},
}

assert.Error(t, c.Validate())
}

func TestDifferentHostNames(t *testing.T) {
c := &Config{
ExporterSettings: config.ExporterSettings{},
Traces: TracesConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost:30001"},
},
Metrics: MetricsConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://foo.com:2878"},
},
}
assert.Error(t, c.Validate())
}

func TestConfigNormal(t *testing.T) {
c := &Config{
ExporterSettings: config.ExporterSettings{},
Traces: TracesConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost:40001"},
},
Metrics: MetricsConfig{
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost:2916"},
},
}
assert.NoError(t, c.Validate())
}
Loading

0 comments on commit d84dab0

Please sign in to comment.