Skip to content

Commit

Permalink
Split connection management away from exporter (#1369)
Browse files Browse the repository at this point in the history
* Split protocol handling away from exporter

This commits adds a ProtocolDriver interface, which the exporter
will use to connect to the collector and send both metrics and traces
to it. That way, the Exporter type is free from dealing with any
connection/protocol details, as this business is taken over by the
implementations of the ProtocolDriver interface.

The gRPC code from the exporter is moved into the implementation of
ProtocolDriver. Currently it only maintains a single connection,
just as the Exporter used to do.

With the split, most of the Exporter options became actually gRPC
connection manager's options. Currently the only option that remained
to be Exporter's is about setting the export kind selector.

* Update changelog

* Increase the test coverage of GRPC driver

* Do not close a channel with multiple senders

The disconnected channel can be used for sending by multiple
goroutines (for example, by metric controller and span processor), so
this channel should not be closed at all. Dropping this line closes a
race between closing a channel and sending to it.

* Simplify new connection handler

The callbacks never return an error, so drop the return type from it.

* Access clients under a lock

The client may change as a result on reconnection in background, so
guard against a racy access.

* Simplify the GRPC driver a bit

The config type was exported earlier to have a consistent way of
configuring the driver, when also the multiple connection driver would
appear. Since we are not going to add a multiple connection driver,
pass the options directly to the driver constructor. Also shorten the
name of the constructor to `NewGRPCDriver`.

* Merge common gRPC code back into the driver

The common code was supposed to be shared between single connection
driver and multiple connection driver, but since the latter won't be
happening, it makes no sense to keep the not-so-common code in a
separate file. Also drop some abstraction too.

* Rename the file with gRPC driver implementation

* Update changelog

* Sleep for a second to trigger the timeout

Sometimes CI has it's better moments, so it's blazing fast and manages
to finish shutting the exporter down within the 1 microsecond timeout.

* Increase the timeout for shutting down the exporter

One millisecond is quite short, and I was getting failures locally or
in CI:

go test ./... + race in ./exporters/otlp
2020/12/14 18:27:54 rpc error: code = Canceled desc = context canceled
2020/12/14 18:27:54 context deadline exceeded
--- FAIL: TestNewExporter_withMultipleAttributeTypes (0.37s)
    otlp_integration_test.go:541: resource span count: got 0, want 1
FAIL
FAIL	go.opentelemetry.io/otel/exporters/otlp	5.278s

or

go test ./... + coverage in ./exporters/otlp
2020/12/14 17:41:16 rpc error: code = Canceled desc = context canceled
2020/12/14 17:41:16 exporter disconnected
--- FAIL: TestNewExporter_endToEnd (1.53s)
    --- FAIL: TestNewExporter_endToEnd/WithCompressor (0.41s)
        otlp_integration_test.go:246: span counts: got 3, want 4
2020/12/14 17:41:18 context canceled
FAIL
coverage: 35.3% of statements in ./...
FAIL	go.opentelemetry.io/otel/exporters/otlp	4.753s

* Shut down the providers in end to end test

This is to make sure that all batched spans are actually flushed
before closing the exporter.
  • Loading branch information
krnowak authored Dec 21, 2020
1 parent add9d93 commit 3521526
Show file tree
Hide file tree
Showing 13 changed files with 682 additions and 420 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add the `ReadOnlySpan` and `ReadWriteSpan` interfaces to provide better control for accessing span data. (#1360)
- `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369)

### Changed

Expand All @@ -19,6 +20,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Improve span duration accuracy. (#1360)
- Migrated CI/CD from CircleCI to GitHub Actions (#1382)
- Remove duplicate checkout from GitHub Actions workflow (#1407)
- `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369)
- Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369)

### Removed

- Remove `errUninitializedSpan` as its only usage is now obsolete. (#1360)
Expand Down
3 changes: 2 additions & 1 deletion example/otel-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ func initProvider() func() {
// `localhost:30080` address. Otherwise, replace `localhost` with the
// address of your cluster. If you run the app inside k8s, then you can
// probably connect directly to the service through dns
exp, err := otlp.NewExporter(ctx,
driver := otlp.NewGRPCDriver(
otlp.WithInsecure(),
otlp.WithAddress("localhost:30080"),
otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing
)
exp, err := otlp.NewExporter(ctx, driver)
handleErr(err, "failed to create exporter")

res, err := resource.New(ctx,
Expand Down
6 changes: 4 additions & 2 deletions exporters/otlp/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (

func Example_insecure() {
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, otlp.WithInsecure())
driver := otlp.NewGRPCDriver(otlp.WithInsecure())
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
log.Fatalf("Failed to create the collector exporter: %v", err)
}
Expand Down Expand Up @@ -74,7 +75,8 @@ func Example_withTLS() {
}

ctx := context.Background()
exp, err := otlp.NewExporter(ctx, otlp.WithTLSCredentials(creds))
driver := otlp.NewGRPCDriver(otlp.WithTLSCredentials(creds))
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
log.Fatalf("failed to create the collector exporter: %v", err)
}
Expand Down
17 changes: 6 additions & 11 deletions exporters/otlp/connection.go → exporters/otlp/grpcconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp"

import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand All @@ -37,9 +36,9 @@ type grpcConnection struct {
cc *grpc.ClientConn

// these fields are read-only after constructor is finished
c config
c grpcConnectionConfig
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn) error
newConnectionHandler func(cc *grpc.ClientConn)

// these channels are created once
disconnectedCh chan bool
Expand All @@ -52,12 +51,9 @@ type grpcConnection struct {
closeBackgroundConnectionDoneCh func(ch chan struct{})
}

func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcConnection {
func newGRPCConnection(c grpcConnectionConfig, handler func(cc *grpc.ClientConn)) *grpcConnection {
conn := new(grpcConnection)
conn.newConnectionHandler = handler
if c.collectorAddr == "" {
c.collectorAddr = fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort)
}
conn.c = c
if len(conn.c.headers) > 0 {
conn.metadata = metadata.New(conn.c.headers)
Expand Down Expand Up @@ -103,7 +99,7 @@ func (oc *grpcConnection) setStateDisconnected(err error) {
case oc.disconnectedCh <- true:
default:
}
_ = oc.newConnectionHandler(nil)
oc.newConnectionHandler(nil)
}

func (oc *grpcConnection) setStateConnected() {
Expand Down Expand Up @@ -180,7 +176,8 @@ func (oc *grpcConnection) connect(ctx context.Context) error {
return err
}
oc.setConnection(cc)
return oc.newConnectionHandler(cc)
oc.newConnectionHandler(cc)
return nil
}

// setConnection sets cc as the client connection and returns true if
Expand Down Expand Up @@ -245,8 +242,6 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error {
return ctx.Err()
}

close(oc.disconnectedCh)

oc.mu.Lock()
cc := oc.cc
oc.cc = nil
Expand Down
144 changes: 144 additions & 0 deletions exporters/otlp/grpcdriver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otlp // import "go.opentelemetry.io/otel/exporters/otlp"

import (
"context"
"fmt"
"sync"

"google.golang.org/grpc"

colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)

type grpcDriver struct {
connection *grpcConnection

lock sync.Mutex
metricsClient colmetricpb.MetricsServiceClient
tracesClient coltracepb.TraceServiceClient
}

func NewGRPCDriver(opts ...GRPCConnectionOption) ProtocolDriver {
cfg := grpcConnectionConfig{
collectorAddr: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort),
grpcServiceConfig: DefaultGRPCServiceConfig,
}
for _, opt := range opts {
opt(&cfg)
}
d := &grpcDriver{}
d.connection = newGRPCConnection(cfg, d.handleNewConnection)
return d
}

func (d *grpcDriver) handleNewConnection(cc *grpc.ClientConn) {
d.lock.Lock()
defer d.lock.Unlock()
if cc != nil {
d.metricsClient = colmetricpb.NewMetricsServiceClient(cc)
d.tracesClient = coltracepb.NewTraceServiceClient(cc)
} else {
d.metricsClient = nil
d.tracesClient = nil
}
}

func (d *grpcDriver) Start(ctx context.Context) error {
d.connection.startConnection(ctx)
return nil
}

func (d *grpcDriver) Stop(ctx context.Context) error {
return d.connection.shutdown(ctx)
}

func (d *grpcDriver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
if !d.connection.connected() {
return errDisconnected
}
ctx, cancel := d.connection.contextWithStop(ctx)
defer cancel()

rms, err := transform.CheckpointSet(ctx, selector, cps, 1)
if err != nil {
return err
}
if len(rms) == 0 {
return nil
}

return d.uploadMetrics(ctx, rms)
}

func (d *grpcDriver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error {
ctx = d.connection.contextWithMetadata(ctx)
err := func() error {
d.lock.Lock()
defer d.lock.Unlock()
if d.metricsClient == nil {
return errNoClient
}
_, err := d.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: protoMetrics,
})
return err
}()
if err != nil {
d.connection.setStateDisconnected(err)
}
return err
}

func (d *grpcDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
if !d.connection.connected() {
return errDisconnected
}
ctx, cancel := d.connection.contextWithStop(ctx)
defer cancel()

protoSpans := transform.SpanData(ss)
if len(protoSpans) == 0 {
return nil
}

return d.uploadTraces(ctx, protoSpans)
}

func (d *grpcDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
ctx = d.connection.contextWithMetadata(ctx)
err := func() error {
d.lock.Lock()
defer d.lock.Unlock()
if d.tracesClient == nil {
return errNoClient
}
_, err := d.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
})
return err
}()
if err != nil {
d.connection.setStateDisconnected(err)
}
return err
}
Loading

0 comments on commit 3521526

Please sign in to comment.