Skip to content

Commit

Permalink
Updated stackdriver exporter (#1427)
Browse files Browse the repository at this point in the history
* Updated stackdriver exporter

* make generate

* fix tests

Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
james-bebbington and bogdandrutu authored Nov 1, 2020
1 parent f4fe665 commit 8d1f6af
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 79 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

The OpenTelemetry Collector Contrib contains everything in the [opentelemetry-collector release](https://github.com/open-telemetry/opentelemetry-collector/releases/tag/v0.13.0) (be sure to check the release notes here as well!). Check out the [Getting Started Guide](https://opentelemetry.io/docs/collector/about/) for deployment and configuration information.

## 🛑 Breaking changes 🛑'

- `stackdriver` exporter:
- Config options `metric_prefix` & `skip_create_metric_descriptor` are now nested under `metric`, see [README](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/master/exporter/stackdriverexporter/README.md).
- Trace status codes no longer reflect gRPC codes as per spec changes: open-telemetry/opentelemetry-specification#1067

## 💡 Enhancements 💡

- `sapm` exporter:
Expand Down
38 changes: 29 additions & 9 deletions exporter/stackdriverexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,38 @@ The following configuration options are supported:

- `project` (optional): GCP project identifier.
- `endpoint` (optional): Endpoint where data is going to be sent to.
- `metric_prefix` (optional): MetricPrefix overrides the prefix / namespace of the Stackdriver metric type identifier. If not set, defaults to "custom.googleapis.com/opencensus/"
- `number_of_workers` (optional): NumberOfWorkers sets the number of go rountines that send requests. The minimum number of workers is 1.
- `user_agent` (optional): Override the user agent string sent on requests to Cloud Monitoring (currently only applies to metrics). Specify `{{version}}` to include the application version number. Defaults to `opentelemetry-collector-contrib {{version}}`.
- `use_insecure` (optional): If true. use gRPC as their communication transport. Only has effect if Endpoint is not "".
- `timeout` (optional): Timeout for all API calls. If not set, defaults to 12 seconds.
- `skip_create_metric_descriptor` (optional): Whether to skip creating the metric descriptor.
- `number_of_workers` (optional): NumberOfWorkers sets the number of go rountines that send requests. The minimum number of workers is 1.
- `resource_mappings` (optional): ResourceMapping defines mapping of resources from source (OpenCensus) to target (Stackdriver).
- `label_mappings`.`optional` (optional): Optional flag signals whether we can proceed with transformation if a label is missing in the resource.
- `user_agent` (optional): Override the user agent string sent on requests to Cloud Monitoring (currently only applies to metrics). Specify `{{version}}` to include the application version number. Defaults to `opentelemetry-collector-contrib {{version}}`.
- `label_mappings` (optional): Optional flag signals whether we can proceed with transformation if a label is missing in the resource.

Additional configuration for the trace exporter:

- `trace.bundle_delay_threshold` (optional): Starting from the time that the first span is added to a bundle, once this delay has passed, handle the bundle. If not set, uses the exporter default.
- `trace.bundle_count_threshold` (optional): Once a bundle has this many spans, handle the bundle. Since only one span at a time is added to a bundle, no bundle will exceed this threshold, so it also serves as a limit. If not set, uses the exporter default.
- `trace.bundle_byte_threshold` (optional): Once the number of bytes in current bundle reaches this threshold, handle the bundle. This triggers handling, but does not cap the total size of a bundle. If not set, uses the exporter default.
- `trace.bundle_byte_limit` (optional): The maximum size of a bundle, in bytes. Zero means unlimited.
- `trace.buffer_max_bytes` (optional): The maximum number of bytes that the Bundler will keep in memory before returning ErrOverflow. If not set, uses the exporter default.

Additional configuration for the metric exporter:

- `metric.prefix` (optional): MetricPrefix overrides the prefix / namespace of the Stackdriver metric type identifier. If not set, defaults to "custom.googleapis.com/opencensus/"
- `metric.skip_create_descriptor` (optional): Whether to skip creating the metric descriptor.

Example:

```yaml
exporters:
stackdriver:
stackdriver/customname:
project: my-project
metric_prefix: prefix
endpoint: test-endpoint
user_agent: my-collector {{version}}
number_of_workers: 3
skip_create_metric_descriptor: true
use_insecure: true
timeout: 12s
number_of_workers: 3

resource_mappings:
- source_type: source.resource1
target_type: target-resource1
Expand All @@ -38,6 +47,17 @@ exporters:
optional: true
- source_key: source.label1
target_key: target_label_1

trace:
bundle_delay_threshold: 2s
bundle_count_threshold: 50
bundle_byte_threshold: 15e3
bundle_byte_limit: 0
buffer_max_bytes: 8e6

metric:
prefix: prefix
skip_create_descriptor: true
```
Beyond standard YAML configuration as outlined in the sections that follow,
Expand Down
22 changes: 19 additions & 3 deletions exporter/stackdriverexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package stackdriverexporter

import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"google.golang.org/api/option"
Expand All @@ -24,11 +26,8 @@ import (
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
ProjectID string `mapstructure:"project"`
Prefix string `mapstructure:"metric_prefix"`
UserAgent string `mapstructure:"user_agent"`
Endpoint string `mapstructure:"endpoint"`
NumOfWorkers int `mapstructure:"number_of_workers"`
SkipCreateMetricDescriptor bool `mapstructure:"skip_create_metric_descriptor"`
// Only has effect if Endpoint is not ""
UseInsecure bool `mapstructure:"use_insecure"`
// Timeout for all API calls. If not set, defaults to 12 seconds.
Expand All @@ -39,6 +38,23 @@ type Config struct {
// Must be set programmatically (no support via declarative config).
// Optional.
GetClientOptions func() []option.ClientOption

TraceConfig TraceConfig `mapstructure:"trace"`
MetricConfig MetricConfig `mapstructure:"metric"`
NumOfWorkers int `mapstructure:"number_of_workers"`
}

type TraceConfig struct {
BundleDelayThreshold time.Duration `mapstructure:"bundle_delay_threshold"`
BundleCountThreshold int `mapstructure:"bundle_count_threshold"`
BundleByteThreshold int `mapstructure:"bundle_byte_threshold"`
BundleByteLimit int `mapstructure:"bundle_byte_limit"`
BufferMaxBytes int `mapstructure:"buffer_max_bytes"`
}

type MetricConfig struct {
Prefix string `mapstructure:"prefix"`
SkipCreateMetricDescriptor bool `mapstructure:"skip_create_descriptor"`
}

// ResourceMapping defines mapping of resources from source (OpenCensus) to target (Stackdriver).
Expand Down
25 changes: 17 additions & 8 deletions exporter/stackdriverexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,11 @@ func TestLoadConfig(t *testing.T) {
r1 := cfg.Exporters["stackdriver/customname"].(*Config)
assert.Equal(t, r1,
&Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "stackdriver/customname"},
ProjectID: "my-project",
Prefix: "prefix",
UserAgent: "opentelemetry-collector-contrib {{version}}",
Endpoint: "test-endpoint",
NumOfWorkers: 3,
SkipCreateMetricDescriptor: true,
UseInsecure: true,
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "stackdriver/customname"},
ProjectID: "my-project",
UserAgent: "opentelemetry-collector-contrib {{version}}",
Endpoint: "test-endpoint",
UseInsecure: true,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 20 * time.Second,
},
Expand All @@ -81,5 +78,17 @@ func TestLoadConfig(t *testing.T) {
TargetType: "target-resource2",
},
},
NumOfWorkers: 3,
TraceConfig: TraceConfig{
BundleDelayThreshold: 2 * time.Second,
BundleCountThreshold: 50,
BundleByteThreshold: 15000,
BundleByteLimit: 0,
BufferMaxBytes: 8000000,
},
MetricConfig: MetricConfig{
Prefix: "prefix",
SkipCreateMetricDescriptor: true,
},
})
}
6 changes: 3 additions & 3 deletions exporter/stackdriverexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ go 1.14

require (
contrib.go.opencensus.io/exporter/stackdriver v0.13.4
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.12.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.13.0
github.com/census-instrumentation/opencensus-proto v0.3.0
github.com/stretchr/testify v1.6.1
go.opencensus.io v0.22.5
go.opentelemetry.io/collector v0.13.1-0.20201101004512-f4e4382d0e0e
go.opentelemetry.io/otel v0.12.0
go.opentelemetry.io/otel/sdk v0.12.0
go.opentelemetry.io/otel v0.13.0
go.opentelemetry.io/otel/sdk v0.13.0
go.uber.org/zap v1.16.0
google.golang.org/api v0.32.0
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d
Expand Down
6 changes: 6 additions & 0 deletions exporter/stackdriverexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ github.com/Djarvur/go-err113 v0.0.0-20200511133814-5174e21577d5 h1:XTrzB+F8+SpRm
github.com/Djarvur/go-err113 v0.0.0-20200511133814-5174e21577d5/go.mod h1:4UJr5HIiMZrwgkSPdsjy2uOQExX/WEILpIrO9UPGuXs=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.12.0 h1:AUPMdJz4YUWp5skd8d4WRX2uTKVYfcIvqMv0q/RI/+o=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.12.0/go.mod h1:T9DntLrVZwCug9Rwx2xrm8TRi+yJtpzZjLsc0u0ZgLA=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.13.0 h1:fjKUtfldCPIF4nIzAAj3LzP8Lrd3DuRIMiFdOsj4fLc=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.13.0/go.mod h1:q/paYxLXKVhwfC3lzLfhtL54fAx14wzMN9DundQOBMc=
github.com/HdrHistogram/hdrhistogram-go v0.9.0 h1:dpujRju0R4M/QZzcnR1LH1qm+TVG3UzkWdp5tH1WMcg=
github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
Expand Down Expand Up @@ -1161,8 +1163,12 @@ go.opentelemetry.io/collector v0.13.1-0.20201101004512-f4e4382d0e0e h1:rQyA+iPCb
go.opentelemetry.io/collector v0.13.1-0.20201101004512-f4e4382d0e0e/go.mod h1:itblxiZ5r454TNNQVvcAp7vj7LbwCdeNRtodo2t+lGM=
go.opentelemetry.io/otel v0.12.0 h1:bwWaPd/h2q+U6KdKaAiOS5GLwOMd1LDt9iNaeyIoAI8=
go.opentelemetry.io/otel v0.12.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY=
go.opentelemetry.io/otel v0.13.0 h1:2isEnyzjjJZq6r2EKMsFj4TxiQiexsM04AVhwbR/oBA=
go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY=
go.opentelemetry.io/otel/sdk v0.12.0 h1:YVUyDXsGvFWjhJxGXT4kBcGdfoTbo1vSGjbGRUdRh5U=
go.opentelemetry.io/otel/sdk v0.12.0/go.mod h1:u3joRdxhrS1hUf9xSFH8vgdXdujQ3jxXxZl3loZFSqs=
go.opentelemetry.io/otel/sdk v0.13.0 h1:4VCfpKamZ8GtnepXxMRurSpHpMKkcxhtO33z1S4rGDQ=
go.opentelemetry.io/otel/sdk v0.13.0/go.mod h1:dKvLH8Uu8LcEPlSAUsfW7kMGaJBhk/1NYvpPZ6wIMbU=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
30 changes: 26 additions & 4 deletions exporter/stackdriverexporter/spandata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (

"go.opentelemetry.io/collector/consumer/pdata"
apitrace "go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/label"
export "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
"google.golang.org/grpc/codes"
)

var errNilSpan = errors.New("expected a non-nil span")
Expand Down Expand Up @@ -89,7 +89,7 @@ func pdataSpanToOTSpanData(
}
}
if status := span.Status(); !status.IsNil() {
sd.StatusCode = pdataStatusCodeToGRPCCode(status.Code())
sd.StatusCode = pdataStatusCodeToOTCode(status.Code())
sd.StatusMessage = status.Message()
}

Expand All @@ -115,8 +115,30 @@ func pdataSpanKindToOTSpanKind(k pdata.SpanKind) apitrace.SpanKind {
}
}

func pdataStatusCodeToGRPCCode(c pdata.StatusCode) codes.Code {
return codes.Code(c)
func pdataStatusCodeToOTCode(c pdata.StatusCode) codes.Code {
switch c {
case pdata.StatusCodeOk:
return codes.Ok
case pdata.StatusCodeCancelled,
pdata.StatusCodeUnknownError,
pdata.StatusCodeInvalidArgument,
pdata.StatusCodeDeadlineExceeded,
pdata.StatusCodeNotFound,
pdata.StatusCodeAlreadyExists,
pdata.StatusCodePermissionDenied,
pdata.StatusCodeResourceExhausted,
pdata.StatusCodeFailedPrecondition,
pdata.StatusCodeAborted,
pdata.StatusCodeOutOfRange,
pdata.StatusCodeUnimplemented,
pdata.StatusCodeInternalError,
pdata.StatusCodeUnavailable,
pdata.StatusCodeDataLoss,
pdata.StatusCodeUnauthenticated:
return codes.Error
default:
return codes.Unset
}
}

func pdataAttributesToOTAttributes(attrs pdata.AttributeMap, resource pdata.Resource) []label.KeyValue {
Expand Down
4 changes: 2 additions & 2 deletions exporter/stackdriverexporter/spandata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/pdata"
apitrace "go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/instrumentation"
"google.golang.org/grpc/codes"
)

func TestPDataResourceSpansToOTSpanData_endToEnd(t *testing.T) {
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestPDataResourceSpansToOTSpanData_endToEnd(t *testing.T) {
Attributes: []label.KeyValue{},
},
},
StatusCode: codes.Internal,
StatusCode: codes.Error,
StatusMessage: "This is not a drill!",
Attributes: []label.KeyValue{
label.String("namespace", "kube-system"),
Expand Down
66 changes: 64 additions & 2 deletions exporter/stackdriverexporter/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"time"

"contrib.go.opencensus.io/exporter/stackdriver"
cloudtrace "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
Expand Down Expand Up @@ -97,6 +98,7 @@ func newStackdriverTraceExporter(cfg *Config, params component.ExporterCreatePar
cloudtrace.WithProjectID(cfg.ProjectID),
cloudtrace.WithTimeout(cfg.Timeout),
}

copts, err := generateClientOptions(cfg, params.ApplicationStartInfo.Version)
if err != nil {
return nil, err
Expand All @@ -105,10 +107,17 @@ func newStackdriverTraceExporter(cfg *Config, params component.ExporterCreatePar
if cfg.NumOfWorkers > 0 {
topts = append(topts, cloudtrace.WithMaxNumberOfWorkers(cfg.NumOfWorkers))
}

topts, err = appendBundleOptions(topts, cfg.TraceConfig)
if err != nil {
return nil, err
}

exp, err := cloudtrace.NewExporter(topts...)
if err != nil {
return nil, fmt.Errorf("error creating Stackdriver Trace exporter: %w", err)
}

tExp := &traceExporter{texporter: exp}

return exporterhelper.NewTraceExporter(
Expand All @@ -121,6 +130,59 @@ func newStackdriverTraceExporter(cfg *Config, params component.ExporterCreatePar
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}))
}

func appendBundleOptions(topts []cloudtrace.Option, cfg TraceConfig) ([]cloudtrace.Option, error) {
topts, err := validateAndAppendDurationOption(topts, "BundleDelayThreshold", cfg.BundleDelayThreshold, cloudtrace.WithBundleDelayThreshold(cfg.BundleDelayThreshold))
if err != nil {
return nil, err
}

topts, err = validateAndAppendIntOption(topts, "BundleCountThreshold", cfg.BundleCountThreshold, cloudtrace.WithBundleCountThreshold(cfg.BundleCountThreshold))
if err != nil {
return nil, err
}

topts, err = validateAndAppendIntOption(topts, "BundleByteThreshold", cfg.BundleByteThreshold, cloudtrace.WithBundleByteThreshold(cfg.BundleByteThreshold))
if err != nil {
return nil, err
}

topts, err = validateAndAppendIntOption(topts, "BundleByteLimit", cfg.BundleByteLimit, cloudtrace.WithBundleByteLimit(cfg.BundleByteLimit))
if err != nil {
return nil, err
}

topts, err = validateAndAppendIntOption(topts, "BufferMaxBytes", cfg.BufferMaxBytes, cloudtrace.WithBufferMaxBytes(cfg.BufferMaxBytes))
if err != nil {
return nil, err
}

return topts, nil
}

func validateAndAppendIntOption(topts []cloudtrace.Option, name string, val int, opt cloudtrace.Option) ([]cloudtrace.Option, error) {
if val < 0 {
return nil, fmt.Errorf("invalid value for: %s", name)
}

if val > 0 {
topts = append(topts, opt)
}

return topts, nil
}

func validateAndAppendDurationOption(topts []cloudtrace.Option, name string, val time.Duration, opt cloudtrace.Option) ([]cloudtrace.Option, error) {
if val < 0 {
return nil, fmt.Errorf("invalid value for: %s", name)
}

if val > 0 {
topts = append(topts, opt)
}

return topts, nil
}

func newStackdriverMetricsExporter(cfg *Config, params component.ExporterCreateParams) (component.MetricsExporter, error) {
// TODO: For each ProjectID, create a different exporter
// or at least a unique Stackdriver client per ProjectID.
Expand All @@ -129,7 +191,7 @@ func newStackdriverMetricsExporter(cfg *Config, params component.ExporterCreateP
// the project this is running on in GCP.
ProjectID: cfg.ProjectID,

MetricPrefix: cfg.Prefix,
MetricPrefix: cfg.MetricConfig.Prefix,

// Set DefaultMonitoringLabels to an empty map to avoid getting the "opencensus_task" label
DefaultMonitoringLabels: &stackdriver.Labels{},
Expand All @@ -147,7 +209,7 @@ func newStackdriverMetricsExporter(cfg *Config, params component.ExporterCreateP
if cfg.NumOfWorkers > 0 {
options.NumberOfWorkers = cfg.NumOfWorkers
}
if cfg.SkipCreateMetricDescriptor {
if cfg.MetricConfig.SkipCreateMetricDescriptor {
options.SkipCMD = true
}
if len(cfg.ResourceMappings) > 0 {
Expand Down
Loading

0 comments on commit 8d1f6af

Please sign in to comment.