From 1753aeacf809206ea6385420328ec91725eb62e0 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Fri, 20 Nov 2020 17:36:32 +0100 Subject: [PATCH 01/13] Move connection logic into grpcConnection object If we will need to maintain more than one connection in future, this splitting off will come in handy. Co-authored-by: Stefan Prisca --- exporters/otlp/alignment_test.go | 4 +- exporters/otlp/connection.go | 167 ++++++++++++++++++++++++++----- exporters/otlp/otlp.go | 150 ++++++--------------------- 3 files changed, 178 insertions(+), 143 deletions(-) diff --git a/exporters/otlp/alignment_test.go b/exporters/otlp/alignment_test.go index f20a0bd1b2d..276625637a1 100644 --- a/exporters/otlp/alignment_test.go +++ b/exporters/otlp/alignment_test.go @@ -26,8 +26,8 @@ import ( func TestMain(m *testing.M) { fields := []ottest.FieldOffset{ { - Name: "Exporter.lastConnectErrPtr", - Offset: unsafe.Offsetof(Exporter{}.lastConnectErrPtr), + Name: "grpcConnection.lastConnectErrPtr", + Offset: unsafe.Offsetof(grpcConnection{}.lastConnectErrPtr), }, } if !ottest.Aligned8Byte(fields, os.Stderr) { diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 48c799c80e2..300f1372d6f 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -15,52 +15,102 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" import ( + "context" + "fmt" "math/rand" + "sync" "sync/atomic" "time" "unsafe" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) -func (e *Exporter) lastConnectError() error { - errPtr := (*error)(atomic.LoadPointer(&e.lastConnectErrPtr)) +type grpcConnection struct { + // Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. + lastConnectErrPtr unsafe.Pointer + + // mu protects the non-atomic and non-channel variables + mu sync.RWMutex + + c config + metadata metadata.MD + cc *grpc.ClientConn + + newConnectionHandler func(cc *grpc.ClientConn) error + disconnectedCh chan bool + backgroundConnectionDoneCh chan bool + stopCh chan struct{} +} + +func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcConnection { + conn := new(grpcConnection) + conn.newConnectionHandler = handler + if c.collectorAddr == "" { + c.collectorAddr = fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort) + } + conn.c = c + if len(conn.c.headers) > 0 { + conn.metadata = metadata.New(conn.c.headers) + } + return conn +} + +func (oc *grpcConnection) startConnection(stopCh chan struct{}) { + oc.stopCh = stopCh + + oc.disconnectedCh = make(chan bool) + oc.backgroundConnectionDoneCh = make(chan bool) + + if err := oc.connect(); err == nil { + oc.setStateConnected() + } else { + oc.setStateDisconnected(err) + } + go oc.indefiniteBackgroundConnection() +} + +func (oc *grpcConnection) lastConnectError() error { + errPtr := (*error)(atomic.LoadPointer(&oc.lastConnectErrPtr)) if errPtr == nil { return nil } return *errPtr } -func (e *Exporter) saveLastConnectError(err error) { +func (oc *grpcConnection) saveLastConnectError(err error) { var errPtr *error if err != nil { errPtr = &err } - atomic.StorePointer(&e.lastConnectErrPtr, unsafe.Pointer(errPtr)) + atomic.StorePointer(&oc.lastConnectErrPtr, unsafe.Pointer(errPtr)) } -func (e *Exporter) setStateDisconnected(err error) { - e.saveLastConnectError(err) +func (oc *grpcConnection) setStateDisconnected(err error) { + oc.saveLastConnectError(err) select { - case e.disconnectedCh <- true: + case oc.disconnectedCh <- true: default: } } -func (e *Exporter) setStateConnected() { - e.saveLastConnectError(nil) +func (oc *grpcConnection) setStateConnected() { + oc.saveLastConnectError(nil) } -func (e *Exporter) connected() bool { - return e.lastConnectError() == nil +func (oc *grpcConnection) connected() bool { + return oc.lastConnectError() == nil } const defaultConnReattemptPeriod = 10 * time.Second -func (e *Exporter) indefiniteBackgroundConnection() { +func (oc *grpcConnection) indefiniteBackgroundConnection() { defer func() { - e.backgroundConnectionDoneCh <- true + oc.backgroundConnectionDoneCh <- true }() - connReattemptPeriod := e.c.reconnectionPeriod + connReattemptPeriod := oc.c.reconnectionPeriod if connReattemptPeriod <= 0 { connReattemptPeriod = defaultConnReattemptPeriod } @@ -79,17 +129,17 @@ func (e *Exporter) indefiniteBackgroundConnection() { // 2. Otherwise block until we are disconnected, and // then retry connecting select { - case <-e.stopCh: + case <-oc.stopCh: return - case <-e.disconnectedCh: + case <-oc.disconnectedCh: // Normal scenario that we'll wait for } - if err := e.connect(); err == nil { - e.setStateConnected() + if err := oc.connect(); err == nil { + oc.setStateConnected() } else { - e.setStateDisconnected(err) + oc.setStateDisconnected(err) } // Apply some jitter to avoid lockstep retrials of other @@ -97,17 +147,88 @@ func (e *Exporter) indefiniteBackgroundConnection() { // innocent DDOS, by clogging the machine's resources and network. jitter := time.Duration(rng.Int63n(maxJitterNanos)) select { - case <-e.stopCh: + case <-oc.stopCh: return case <-time.After(connReattemptPeriod + jitter): } } } -func (e *Exporter) connect() error { - cc, err := e.dialToCollector() +func (oc *grpcConnection) connect() error { + cc, err := oc.dialToCollector() if err != nil { return err } - return e.enableConnections(cc) + + oc.mu.Lock() + defer oc.mu.Unlock() + + // If previous clientConn is same as the current then just return. + // This doesn't happen right now as this func is only called with new ClientConn. + // It is more about future-proofing. + if oc.cc == cc { + return nil + } + + // If the previous clientConn was non-nil, close it + if oc.cc != nil { + _ = oc.cc.Close() + } + oc.cc = cc + + err = oc.newConnectionHandler(cc) + return err +} + +func (oc *grpcConnection) dialToCollector() (*grpc.ClientConn, error) { + addr := oc.c.collectorAddr + + dialOpts := []grpc.DialOption{} + if oc.c.grpcServiceConfig != "" { + dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(oc.c.grpcServiceConfig)) + } + if oc.c.clientCredentials != nil { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(oc.c.clientCredentials)) + } else if oc.c.canDialInsecure { + dialOpts = append(dialOpts, grpc.WithInsecure()) + } + if oc.c.compressor != "" { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(oc.c.compressor))) + } + if len(oc.c.grpcDialOptions) != 0 { + dialOpts = append(dialOpts, oc.c.grpcDialOptions...) + } + + ctx := oc.contextWithMetadata(context.Background()) + return grpc.DialContext(ctx, addr, dialOpts...) +} + +func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Context { + if oc.metadata.Len() > 0 { + return metadata.NewOutgoingContext(ctx, oc.metadata) + } + return ctx +} + +func (oc *grpcConnection) shutdown(ctx context.Context) error { + oc.mu.RLock() + cc := oc.cc + oc.mu.RUnlock() + + var err error + if cc != nil { + err = cc.Close() + } + + // Ensure that the backgroundConnector returns + select { + case <-oc.backgroundConnectionDoneCh: + case <-ctx.Done(): + return ctx.Err() + } + + close(oc.disconnectedCh) + close(oc.backgroundConnectionDoneCh) + + return err } diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 16754394ce9..d471601b8f3 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -20,12 +20,9 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" import ( "context" "errors" - "fmt" "sync" - "unsafe" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" @@ -43,22 +40,17 @@ type Exporter struct { // mu protects the non-atomic and non-channel variables mu sync.RWMutex // senderMu protects the concurrent unsafe sends on the shared gRPC client connection. - senderMu sync.Mutex - started bool - traceExporter coltracepb.TraceServiceClient - metricExporter colmetricpb.MetricsServiceClient - grpcClientConn *grpc.ClientConn - lastConnectErrPtr unsafe.Pointer - - startOnce sync.Once - stopOnce sync.Once - stopCh chan struct{} - disconnectedCh chan bool - - backgroundConnectionDoneCh chan bool - - c config - metadata metadata.MD + senderMu sync.Mutex + started bool + traceExporter coltracepb.TraceServiceClient + metricExporter colmetricpb.MetricsServiceClient + cc *grpcConnection + + startOnce sync.Once + stopOnce sync.Once + stopCh chan struct{} + + exportKindSelector metricsdk.ExportKindSelector } var _ tracesdk.SpanExporter = (*Exporter)(nil) @@ -93,16 +85,22 @@ func NewExporter(opts ...ExporterOption) (*Exporter, error) { // NewUnstartedExporter constructs a new Exporter and does not start it. func NewUnstartedExporter(opts ...ExporterOption) *Exporter { e := new(Exporter) - e.c = newConfig(opts...) - if len(e.c.headers) > 0 { - e.metadata = metadata.New(e.c.headers) - } + cfg := newConfig(opts...) + e.exportKindSelector = cfg.exportKindSelector + e.cc = newGRPCConnection(cfg, e.handleNewConnection) return e } +func (e *Exporter) handleNewConnection(cc *grpc.ClientConn) error { + e.mu.Lock() + e.metricExporter = colmetricpb.NewMetricsServiceClient(cc) + e.traceExporter = coltracepb.NewTraceServiceClient(cc) + e.mu.Unlock() + return nil +} + var ( errAlreadyStarted = errors.New("already started") - errNotStarted = errors.New("not started") errDisconnected = errors.New("exporter disconnected") errStopped = errors.New("exporter stopped") errContextCanceled = errors.New("context canceled") @@ -118,93 +116,16 @@ func (e *Exporter) Start() error { e.startOnce.Do(func() { e.mu.Lock() e.started = true - e.disconnectedCh = make(chan bool, 1) e.stopCh = make(chan struct{}) - e.backgroundConnectionDoneCh = make(chan bool) e.mu.Unlock() - // An optimistic first connection attempt to ensure that - // applications under heavy load can immediately process - // data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63 - if err := e.connect(); err == nil { - e.setStateConnected() - } else { - e.setStateDisconnected(err) - } - go e.indefiniteBackgroundConnection() - err = nil + e.cc.startConnection(e.stopCh) }) return err } -func (e *Exporter) prepareCollectorAddress() string { - if e.c.collectorAddr != "" { - return e.c.collectorAddr - } - return fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort) -} - -func (e *Exporter) enableConnections(cc *grpc.ClientConn) error { - e.mu.RLock() - started := e.started - e.mu.RUnlock() - - if !started { - return errNotStarted - } - - e.mu.Lock() - // If previous clientConn is same as the current then just return. - // This doesn't happen right now as this func is only called with new ClientConn. - // It is more about future-proofing. - if e.grpcClientConn == cc { - e.mu.Unlock() - return nil - } - // If the previous clientConn was non-nil, close it - if e.grpcClientConn != nil { - _ = e.grpcClientConn.Close() - } - e.grpcClientConn = cc - e.traceExporter = coltracepb.NewTraceServiceClient(cc) - e.metricExporter = colmetricpb.NewMetricsServiceClient(cc) - e.mu.Unlock() - - return nil -} - -func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context { - if e.metadata.Len() > 0 { - return metadata.NewOutgoingContext(ctx, e.metadata) - } - return ctx -} - -func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { - addr := e.prepareCollectorAddress() - - dialOpts := []grpc.DialOption{} - if e.c.grpcServiceConfig != "" { - dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(e.c.grpcServiceConfig)) - } - if e.c.clientCredentials != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(e.c.clientCredentials)) - } else if e.c.canDialInsecure { - dialOpts = append(dialOpts, grpc.WithInsecure()) - } - if e.c.compressor != "" { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(e.c.compressor))) - } - if len(e.c.grpcDialOptions) != 0 { - dialOpts = append(dialOpts, e.c.grpcDialOptions...) - } - - ctx := e.contextWithMetadata(context.Background()) - return grpc.DialContext(ctx, addr, dialOpts...) -} - // closeStopCh is used to wrap the exporters stopCh channel closing for testing. var closeStopCh = func(stopCh chan struct{}) { close(stopCh) @@ -214,7 +135,7 @@ var closeStopCh = func(stopCh chan struct{}) { // by the exporter. If the exporter is not started this does nothing. func (e *Exporter) Shutdown(ctx context.Context) error { e.mu.RLock() - cc := e.grpcClientConn + cc := e.cc started := e.started e.mu.RUnlock() @@ -225,23 +146,16 @@ func (e *Exporter) Shutdown(ctx context.Context) error { var err error e.stopOnce.Do(func() { + closeStopCh(e.stopCh) if cc != nil { // Clean things up before checking this error. - err = cc.Close() + err = cc.shutdown(ctx) } // At this point we can change the state variable started e.mu.Lock() e.started = false e.mu.Unlock() - closeStopCh(e.stopCh) - - // Ensure that the backgroundConnector returns - select { - case <-e.backgroundConnectionDoneCh: - case <-ctx.Done(): - err = ctx.Err() - } }) return err @@ -270,7 +184,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e return err } - if !e.connected() { + if !e.cc.connected() { return errDisconnected } @@ -281,7 +195,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e return errContextCanceled default: e.senderMu.Lock() - _, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ + _, err := e.metricExporter.Export(e.cc.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: rms, }) e.senderMu.Unlock() @@ -295,7 +209,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e // ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter // metric telemetry that it needs to be provided in a cumulative format. func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) metricsdk.ExportKind { - return e.c.exportKindSelector.ExportKindFor(desc, kind) + return e.exportKindSelector.ExportKindFor(desc, kind) } // ExportSpans exports a batch of SpanData. @@ -308,7 +222,7 @@ func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) e case <-e.stopCh: return nil default: - if !e.connected() { + if !e.cc.connected() { return nil } @@ -318,12 +232,12 @@ func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) e } e.senderMu.Lock() - _, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ + _, err := e.traceExporter.Export(e.cc.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ ResourceSpans: protoSpans, }) e.senderMu.Unlock() if err != nil { - e.setStateDisconnected(err) + e.cc.setStateDisconnected(err) return err } } From 2f89719ad5451535f04a1b8837fb81471b132699 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Fri, 20 Nov 2020 17:30:54 +0100 Subject: [PATCH 02/13] Make another channel a signal channel There is another channel that serves as a one-time signal, where channel's data type does not matter. --- exporters/otlp/connection.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 300f1372d6f..4b92ff2dca7 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -40,7 +40,7 @@ type grpcConnection struct { newConnectionHandler func(cc *grpc.ClientConn) error disconnectedCh chan bool - backgroundConnectionDoneCh chan bool + backgroundConnectionDoneCh chan struct{} stopCh chan struct{} } @@ -61,7 +61,7 @@ func (oc *grpcConnection) startConnection(stopCh chan struct{}) { oc.stopCh = stopCh oc.disconnectedCh = make(chan bool) - oc.backgroundConnectionDoneCh = make(chan bool) + oc.backgroundConnectionDoneCh = make(chan struct{}) if err := oc.connect(); err == nil { oc.setStateConnected() @@ -107,7 +107,7 @@ const defaultConnReattemptPeriod = 10 * time.Second func (oc *grpcConnection) indefiniteBackgroundConnection() { defer func() { - oc.backgroundConnectionDoneCh <- true + close(oc.backgroundConnectionDoneCh) }() connReattemptPeriod := oc.c.reconnectionPeriod @@ -228,7 +228,6 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error { } close(oc.disconnectedCh) - close(oc.backgroundConnectionDoneCh) return err } From 75d0601c1aaa8f99f2c7b6c21edd10c6a59a7902 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 12:37:36 +0100 Subject: [PATCH 03/13] Reorder and document connection members This is to make clear that the lock is guarding only the connection since it can be changed by multiple goroutines, and other members are either atomic or read-only. --- exporters/otlp/connection.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 4b92ff2dca7..59ba72adbb5 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -31,14 +31,17 @@ type grpcConnection struct { // Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. lastConnectErrPtr unsafe.Pointer - // mu protects the non-atomic and non-channel variables + // mu protects the connection as it is accessed by the + // exporter goroutines and background connection goroutine mu sync.RWMutex + cc *grpc.ClientConn - c config - metadata metadata.MD - cc *grpc.ClientConn + // these fields are read-only after constructor is finished + c config + metadata metadata.MD + newConnectionHandler func(cc *grpc.ClientConn) error - newConnectionHandler func(cc *grpc.ClientConn) error + // these channels are created once disconnectedCh chan bool backgroundConnectionDoneCh chan struct{} stopCh chan struct{} From 5d96142d66bb03ce00e10ad2274226653131a81e Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 12:44:21 +0100 Subject: [PATCH 04/13] Move stop signal into connection The stop channel was rather useless on the exporter side - the primary reason for existence of this channel is to stop a background reconnecting goroutine. Since the goroutine lives entirely within grpcConnection object, move the stop channel here. Also expose a function to unify the stop channel with the context cancellation, so exporter can use it without knowing anything about stop channels. Also make export functions a bit more consistent. --- exporters/otlp/connection.go | 27 +++++++++++-- exporters/otlp/otlp.go | 76 +++++++++++++----------------------- 2 files changed, 51 insertions(+), 52 deletions(-) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 59ba72adbb5..1cdf466496a 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -60,9 +60,8 @@ func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcC return conn } -func (oc *grpcConnection) startConnection(stopCh chan struct{}) { - oc.stopCh = stopCh - +func (oc *grpcConnection) startConnection() { + oc.stopCh = make(chan struct{}) oc.disconnectedCh = make(chan bool) oc.backgroundConnectionDoneCh = make(chan struct{}) @@ -213,6 +212,11 @@ func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Conte return ctx } +// closeStopCh is used to wrap the exporters stopCh channel closing for testing. +var closeStopCh = func(stopCh chan struct{}) { + close(stopCh) +} + func (oc *grpcConnection) shutdown(ctx context.Context) error { oc.mu.RLock() cc := oc.cc @@ -223,6 +227,7 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error { err = cc.Close() } + closeStopCh(oc.stopCh) // Ensure that the backgroundConnector returns select { case <-oc.backgroundConnectionDoneCh: @@ -234,3 +239,19 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error { return err } + +func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { + // Unify the parent context Done signal with the connection's + // stop channel. + ctx, cancel := context.WithCancel(ctx) + go func(ctx context.Context, cancel context.CancelFunc) { + select { + case <-ctx.Done(): + // Nothing to do, either cancelled or deadline + // happened. + case <-oc.stopCh: + cancel() + } + }(ctx, cancel) + return ctx, cancel +} diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index d471601b8f3..441857f9dcd 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -48,7 +48,6 @@ type Exporter struct { startOnce sync.Once stopOnce sync.Once - stopCh chan struct{} exportKindSelector metricsdk.ExportKindSelector } @@ -100,10 +99,8 @@ func (e *Exporter) handleNewConnection(cc *grpc.ClientConn) error { } var ( - errAlreadyStarted = errors.New("already started") - errDisconnected = errors.New("exporter disconnected") - errStopped = errors.New("exporter stopped") - errContextCanceled = errors.New("context canceled") + errAlreadyStarted = errors.New("already started") + errDisconnected = errors.New("exporter disconnected") ) // Start dials to the collector, establishing a connection to it. It also @@ -116,21 +113,15 @@ func (e *Exporter) Start() error { e.startOnce.Do(func() { e.mu.Lock() e.started = true - e.stopCh = make(chan struct{}) e.mu.Unlock() err = nil - e.cc.startConnection(e.stopCh) + e.cc.startConnection() }) return err } -// closeStopCh is used to wrap the exporters stopCh channel closing for testing. -var closeStopCh = func(stopCh chan struct{}) { - close(stopCh) -} - // Shutdown closes all connections and releases resources currently being used // by the exporter. If the exporter is not started this does nothing. func (e *Exporter) Shutdown(ctx context.Context) error { @@ -146,7 +137,6 @@ func (e *Exporter) Shutdown(ctx context.Context) error { var err error e.stopOnce.Do(func() { - closeStopCh(e.stopCh) if cc != nil { // Clean things up before checking this error. err = cc.shutdown(ctx) @@ -165,16 +155,8 @@ func (e *Exporter) Shutdown(ctx context.Context) error { // interface. It transforms and batches metric Records into OTLP Metrics and // transmits them to the configured collector. func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error { - // Unify the parent context Done signal with the exporter stopCh. - ctx, cancel := context.WithCancel(parent) + ctx, cancel := e.cc.contextWithStop(parent) defer cancel() - go func(ctx context.Context, cancel context.CancelFunc) { - select { - case <-ctx.Done(): - case <-e.stopCh: - cancel() - } - }(ctx, cancel) // Hardcode the number of worker goroutines to 1. We later will // need to see if there's a way to adjust that number for longer @@ -188,22 +170,18 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e return errDisconnected } - select { - case <-e.stopCh: - return errStopped - case <-ctx.Done(): - return errContextCanceled - default: + err = func() error { e.senderMu.Lock() + defer e.senderMu.Unlock() _, err := e.metricExporter.Export(e.cc.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: rms, }) - e.senderMu.Unlock() - if err != nil { - return err - } + return err + }() + if err != nil { + e.cc.setStateDisconnected(err) } - return nil + return err } // ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter @@ -218,28 +196,28 @@ func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) er } func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) error { - select { - case <-e.stopCh: + ctx, cancel := e.cc.contextWithStop(ctx) + defer cancel() + + if !e.cc.connected() { return nil - default: - if !e.cc.connected() { - return nil - } + } - protoSpans := transform.SpanData(sdl) - if len(protoSpans) == 0 { - return nil - } + protoSpans := transform.SpanData(sdl) + if len(protoSpans) == 0 { + return nil + } + err := func() error { e.senderMu.Lock() + defer e.senderMu.Unlock() _, err := e.traceExporter.Export(e.cc.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ ResourceSpans: protoSpans, }) - e.senderMu.Unlock() - if err != nil { - e.cc.setStateDisconnected(err) - return err - } + return err + }() + if err != nil { + e.cc.setStateDisconnected(err) } - return nil + return err } From 6eecc86fc6fceac234089bb96ac83f823a9f0936 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 13:03:16 +0100 Subject: [PATCH 05/13] Do not run reconnection routine when being stopped too It's possible that both disconnected channel and stop channel will be triggered around the same time, so the goroutine is as likely to start reconnecting as to return from the goroutine. Make sure we return if the stop channel is closed. --- exporters/otlp/connection.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 1cdf466496a..a3989cbfa09 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -135,6 +135,15 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() { return case <-oc.disconnectedCh: + // Quickly check if we haven't stopped at the + // same time. + select { + case <-oc.stopCh: + return + + default: + } + // Normal scenario that we'll wait for } From 21e32a97c20cb8dfef34ecb161908dd4f35224c7 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 13:13:58 +0100 Subject: [PATCH 06/13] Nil clients on connection error Set clients to nil on connection error, so we don't try to send the data over a bad connection, but return a "no client" error immediately. --- exporters/otlp/connection.go | 18 +++++++++++------- exporters/otlp/otlp.go | 18 +++++++++++++++--- exporters/otlp/otlp_test.go | 30 ++++++++++++------------------ 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index a3989cbfa09..142a7d5ebb9 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -45,6 +45,11 @@ type grpcConnection struct { disconnectedCh chan bool backgroundConnectionDoneCh chan struct{} stopCh chan struct{} + + // this is for tests, so they can replace the closing + // routine without a worry of modifying some global variable + // or changing it back to original after the test is done + closeBackgroundConnectionDoneCh func(ch chan struct{}) } func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcConnection { @@ -57,6 +62,9 @@ func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcC if len(conn.c.headers) > 0 { conn.metadata = metadata.New(conn.c.headers) } + conn.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { + close(ch) + } return conn } @@ -95,6 +103,7 @@ func (oc *grpcConnection) setStateDisconnected(err error) { case oc.disconnectedCh <- true: default: } + _ = oc.newConnectionHandler(nil) } func (oc *grpcConnection) setStateConnected() { @@ -109,7 +118,7 @@ const defaultConnReattemptPeriod = 10 * time.Second func (oc *grpcConnection) indefiniteBackgroundConnection() { defer func() { - close(oc.backgroundConnectionDoneCh) + oc.closeBackgroundConnectionDoneCh(oc.backgroundConnectionDoneCh) }() connReattemptPeriod := oc.c.reconnectionPeriod @@ -221,11 +230,6 @@ func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Conte return ctx } -// closeStopCh is used to wrap the exporters stopCh channel closing for testing. -var closeStopCh = func(stopCh chan struct{}) { - close(stopCh) -} - func (oc *grpcConnection) shutdown(ctx context.Context) error { oc.mu.RLock() cc := oc.cc @@ -236,7 +240,7 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error { err = cc.Close() } - closeStopCh(oc.stopCh) + close(oc.stopCh) // Ensure that the backgroundConnector returns select { case <-oc.backgroundConnectionDoneCh: diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 441857f9dcd..61973f48668 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -92,13 +92,19 @@ func NewUnstartedExporter(opts ...ExporterOption) *Exporter { func (e *Exporter) handleNewConnection(cc *grpc.ClientConn) error { e.mu.Lock() - e.metricExporter = colmetricpb.NewMetricsServiceClient(cc) - e.traceExporter = coltracepb.NewTraceServiceClient(cc) - e.mu.Unlock() + defer e.mu.Unlock() + if cc != nil { + e.metricExporter = colmetricpb.NewMetricsServiceClient(cc) + e.traceExporter = coltracepb.NewTraceServiceClient(cc) + } else { + e.metricExporter = nil + e.traceExporter = nil + } return nil } var ( + errNoClient = errors.New("no client") errAlreadyStarted = errors.New("already started") errDisconnected = errors.New("exporter disconnected") ) @@ -173,6 +179,9 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e err = func() error { e.senderMu.Lock() defer e.senderMu.Unlock() + if e.metricExporter == nil { + return errNoClient + } _, err := e.metricExporter.Export(e.cc.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: rms, }) @@ -211,6 +220,9 @@ func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) e err := func() error { e.senderMu.Lock() defer e.senderMu.Unlock() + if e.traceExporter == nil { + return errNoClient + } _, err := e.traceExporter.Export(e.cc.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ ResourceSpans: protoSpans, }) diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index 5c2e0e655bf..eaf31a7cb96 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -23,20 +23,17 @@ import ( ) func TestExporterShutdownHonorsTimeout(t *testing.T) { - orig := closeStopCh ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer func() { - cancel() - closeStopCh = orig - }() - closeStopCh = func(stopCh chan struct{}) { + defer cancel() + + e := NewUnstartedExporter() + orig := e.cc.closeBackgroundConnectionDoneCh + e.cc.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { go func() { <-ctx.Done() - close(stopCh) + orig(ch) }() } - - e := NewUnstartedExporter() if err := e.Start(); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -51,20 +48,17 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) { } func TestExporterShutdownHonorsCancel(t *testing.T) { - orig := closeStopCh ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer func() { - cancel() - closeStopCh = orig - }() - closeStopCh = func(stopCh chan struct{}) { + defer cancel() + + e := NewUnstartedExporter() + orig := e.cc.closeBackgroundConnectionDoneCh + e.cc.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { go func() { <-ctx.Done() - close(stopCh) + orig(ch) }() } - - e := NewUnstartedExporter() if err := e.Start(); err != nil { t.Fatalf("failed to start exporter: %v", err) } From e24e5e57125d85a917a2344cdef58dcc1bc92caa Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 13:16:50 +0100 Subject: [PATCH 07/13] Do not call new connection handler within critical section It's rather risky to call a callback coming from outside within a critical section. Move it out. --- exporters/otlp/connection.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 142a7d5ebb9..18766369762 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -179,7 +179,13 @@ func (oc *grpcConnection) connect() error { if err != nil { return err } + oc.setConnection(cc) + return oc.newConnectionHandler(cc) +} +// setConnection sets cc as the client connection and returns true if +// the connection state changed. +func (oc *grpcConnection) setConnection(cc *grpc.ClientConn) bool { oc.mu.Lock() defer oc.mu.Unlock() @@ -187,7 +193,7 @@ func (oc *grpcConnection) connect() error { // This doesn't happen right now as this func is only called with new ClientConn. // It is more about future-proofing. if oc.cc == cc { - return nil + return false } // If the previous clientConn was non-nil, close it @@ -195,9 +201,7 @@ func (oc *grpcConnection) connect() error { _ = oc.cc.Close() } oc.cc = cc - - err = oc.newConnectionHandler(cc) - return err + return true } func (oc *grpcConnection) dialToCollector() (*grpc.ClientConn, error) { From 4d4f29e43e34d05ffff8580055f9f2aa30e19884 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 13:21:04 +0100 Subject: [PATCH 08/13] Add context parameter to connection routines Connecting to the collector may also take its time, so it can be useful in some cases to pass a context with a deadline. Currently we just pass a background context, so this commit does not really change any behavior. The follow-up commits will make a use of it, though. --- exporters/otlp/connection.go | 14 +++++++------- exporters/otlp/otlp.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 18766369762..5c4a8e6db0e 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -68,12 +68,12 @@ func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcC return conn } -func (oc *grpcConnection) startConnection() { +func (oc *grpcConnection) startConnection(ctx context.Context) { oc.stopCh = make(chan struct{}) oc.disconnectedCh = make(chan bool) oc.backgroundConnectionDoneCh = make(chan struct{}) - if err := oc.connect(); err == nil { + if err := oc.connect(ctx); err == nil { oc.setStateConnected() } else { oc.setStateDisconnected(err) @@ -156,7 +156,7 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() { // Normal scenario that we'll wait for } - if err := oc.connect(); err == nil { + if err := oc.connect(context.Background()); err == nil { oc.setStateConnected() } else { oc.setStateDisconnected(err) @@ -174,8 +174,8 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() { } } -func (oc *grpcConnection) connect() error { - cc, err := oc.dialToCollector() +func (oc *grpcConnection) connect(ctx context.Context) error { + cc, err := oc.dialToCollector(ctx) if err != nil { return err } @@ -204,7 +204,7 @@ func (oc *grpcConnection) setConnection(cc *grpc.ClientConn) bool { return true } -func (oc *grpcConnection) dialToCollector() (*grpc.ClientConn, error) { +func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { addr := oc.c.collectorAddr dialOpts := []grpc.DialOption{} @@ -223,7 +223,7 @@ func (oc *grpcConnection) dialToCollector() (*grpc.ClientConn, error) { dialOpts = append(dialOpts, oc.c.grpcDialOptions...) } - ctx := oc.contextWithMetadata(context.Background()) + ctx = oc.contextWithMetadata(ctx) return grpc.DialContext(ctx, addr, dialOpts...) } diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 61973f48668..18bdb90a431 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -122,7 +122,7 @@ func (e *Exporter) Start() error { e.mu.Unlock() err = nil - e.cc.startConnection() + e.cc.startConnection(context.Background()) }) return err From 7303facbb9c90558d86c88d529ee241b7f474b66 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 13:23:20 +0100 Subject: [PATCH 09/13] Add context parameter to NewExporter and Start It makes it possible to limit the time spent on connecting to the collector. --- example/otel-collector/main.go | 2 +- exporters/otlp/example_test.go | 14 +++--- exporters/otlp/otlp.go | 8 ++-- exporters/otlp/otlp_integration_test.go | 63 ++++++++++++++----------- exporters/otlp/otlp_test.go | 8 ++-- 5 files changed, 53 insertions(+), 42 deletions(-) diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index c357209a3ce..a2452f87485 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -49,7 +49,7 @@ func initProvider() func() { // `localhost:30080` address. Otherwise, replace `localhost` with the // address of your cluster. If you run the app inside k8s, then you can // probably connect directly to the service through dns - exp, err := otlp.NewExporter( + exp, err := otlp.NewExporter(ctx, otlp.WithInsecure(), otlp.WithAddress("localhost:30080"), otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing diff --git a/exporters/otlp/example_test.go b/exporters/otlp/example_test.go index cf3a8eb0103..a34811e25e2 100644 --- a/exporters/otlp/example_test.go +++ b/exporters/otlp/example_test.go @@ -28,12 +28,13 @@ import ( ) func Example_insecure() { - exp, err := otlp.NewExporter(otlp.WithInsecure()) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, otlp.WithInsecure()) if err != nil { log.Fatalf("Failed to create the collector exporter: %v", err) } defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { otel.Handle(err) @@ -54,7 +55,7 @@ func Example_insecure() { tracer := otel.Tracer("test-tracer") // Then use the OpenTelemetry tracing library, like we normally would. - ctx, span := tracer.Start(context.Background(), "CollectorExporter-Example") + ctx, span := tracer.Start(ctx, "CollectorExporter-Example") defer span.End() for i := 0; i < 10; i++ { @@ -72,12 +73,13 @@ func Example_withTLS() { log.Fatalf("failed to create gRPC client TLS credentials: %v", err) } - exp, err := otlp.NewExporter(otlp.WithTLSCredentials(creds)) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, otlp.WithTLSCredentials(creds)) if err != nil { log.Fatalf("failed to create the collector exporter: %v", err) } defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { otel.Handle(err) @@ -98,7 +100,7 @@ func Example_withTLS() { tracer := otel.Tracer("test-tracer") // Then use the OpenTelemetry tracing library, like we normally would. - ctx, span := tracer.Start(context.Background(), "Securely-Talking-To-Collector-Span") + ctx, span := tracer.Start(ctx, "Securely-Talking-To-Collector-Span") defer span.End() for i := 0; i < 10; i++ { diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 18bdb90a431..3beaf589bfa 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -73,9 +73,9 @@ func newConfig(opts ...ExporterOption) config { } // NewExporter constructs a new Exporter and starts it. -func NewExporter(opts ...ExporterOption) (*Exporter, error) { +func NewExporter(ctx context.Context, opts ...ExporterOption) (*Exporter, error) { exp := NewUnstartedExporter(opts...) - if err := exp.Start(); err != nil { + if err := exp.Start(ctx); err != nil { return nil, err } return exp, nil @@ -114,7 +114,7 @@ var ( // messages that consist of the node identifier. Start invokes a background // connector that will reattempt connections to the collector periodically // if the connection dies. -func (e *Exporter) Start() error { +func (e *Exporter) Start(ctx context.Context) error { var err = errAlreadyStarted e.startOnce.Do(func() { e.mu.Lock() @@ -122,7 +122,7 @@ func (e *Exporter) Start() error { e.mu.Unlock() err = nil - e.cc.startConnection(context.Background()) + e.cc.startConnection(ctx) }) return err diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index 70f32c46571..ced8d103558 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -72,12 +72,13 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) } opts = append(opts, additionalOpts...) - exp, err := otlp.NewExporter(opts...) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, opts...) if err != nil { t.Fatalf("failed to create a new collector exporter: %v", err) } defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { panic(err) @@ -110,11 +111,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) // Now create few spans m := 4 for i := 0; i < m; i++ { - _, span := tr1.Start(context.Background(), "AlwaysSample") + _, span := tr1.Start(ctx, "AlwaysSample") span.SetAttributes(label.Int64("i", int64(i))) span.End() - _, span = tr2.Start(context.Background(), "AlwaysSample") + _, span = tr2.Start(ctx, "AlwaysSample") span.SetAttributes(label.Int64("i", int64(i))) span.End() } @@ -124,7 +125,6 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) pusher := push.New(processor, exp) pusher.Start() - ctx := context.Background() meter := pusher.MeterProvider().Meter("test-meter") labels := []label.KeyValue{label.Bool("test", true)} @@ -190,7 +190,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) <-time.After(40 * time.Millisecond) // Now shutdown the exporter - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, time.Millisecond) defer cancel() if err := exp.Shutdown(ctx); err != nil { t.Fatalf("failed to stop the exporter: %v", err) @@ -307,31 +307,33 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { _ = mc.stop() }() - exp, err := otlp.NewExporter(otlp.WithInsecure(), + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, + otlp.WithInsecure(), otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(mc.address)) if err != nil { t.Fatalf("error creating exporter: %v", err) } defer func() { - if err := exp.Shutdown(context.Background()); err != nil { + if err := exp.Shutdown(ctx); err != nil { panic(err) } }() // Invoke Start numerous times, should return errAlreadyStarted for i := 0; i < 10; i++ { - if err := exp.Start(); err == nil || !strings.Contains(err.Error(), "already started") { + if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") { t.Fatalf("#%d unexpected Start error: %v", i, err) } } - if err := exp.Shutdown(context.Background()); err != nil { + if err := exp.Shutdown(ctx); err != nil { t.Fatalf("failed to Shutdown the exporter: %v", err) } // Invoke Shutdown numerous times for i := 0; i < 10; i++ { - if err := exp.Shutdown(context.Background()); err != nil { + if err := exp.Shutdown(ctx); err != nil { t.Fatalf(`#%d got error (%v) expected none`, i, err) } } @@ -341,14 +343,16 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { mc := runMockCol(t) reconnectionPeriod := 20 * time.Millisecond - exp, err := otlp.NewExporter(otlp.WithInsecure(), + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, + otlp.WithInsecure(), otlp.WithAddress(mc.address), otlp.WithReconnectionPeriod(reconnectionPeriod)) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer func() { - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) }() // We'll now stop the collector right away to simulate a connection @@ -363,7 +367,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { // No endpoint up. require.Error( t, - exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}), + exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "in the midst"}}), "transport: Error while dialing dial tcp %s: connect: connection refused", mc.address, ) @@ -377,7 +381,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { n := 10 for i := 0; i < n; i++ { - require.NoError(t, exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "Resurrected"}})) + require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "Resurrected"}})) } nmaSpans := nmc.getSpans() @@ -412,13 +416,15 @@ func TestNewExporter_collectorOnBadConnection(t *testing.T) { _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) address := fmt.Sprintf("localhost:%s", collectorPortStr) - exp, err := otlp.NewExporter(otlp.WithInsecure(), + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, + otlp.WithInsecure(), otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(address)) if err != nil { t.Fatalf("Despite an indefinite background reconnection, got error: %v", err) } - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) } func TestNewExporter_withAddress(t *testing.T) { @@ -432,11 +438,12 @@ func TestNewExporter_withAddress(t *testing.T) { otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(mc.address)) + ctx := context.Background() defer func() { - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) }() - if err := exp.Start(); err != nil { + if err := exp.Start(ctx); err != nil { t.Fatalf("Unexpected Start error: %v", err) } } @@ -447,16 +454,17 @@ func TestNewExporter_withHeaders(t *testing.T) { _ = mc.stop() }() - exp, _ := otlp.NewExporter( + ctx := context.Background() + exp, _ := otlp.NewExporter(ctx, otlp.WithInsecure(), otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(mc.address), otlp.WithHeaders(map[string]string{"header1": "value1"}), ) - require.NoError(t, exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}})) + require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "in the midst"}})) defer func() { - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) }() headers := mc.getHeaders() @@ -473,14 +481,15 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { <-time.After(5 * time.Millisecond) - exp, _ := otlp.NewExporter( + ctx := context.Background() + exp, _ := otlp.NewExporter(ctx, otlp.WithInsecure(), otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(mc.address), ) defer func() { - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) }() tp := sdktrace.NewTracerProvider( @@ -492,7 +501,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { sdktrace.WithMaxExportBatchSize(10), ), ) - defer func() { _ = tp.Shutdown(context.Background()) }() + defer func() { _ = tp.Shutdown(ctx) }() tr := tp.Tracer("test-tracer") testKvs := []label.KeyValue{ @@ -504,7 +513,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { label.Bool("Bool", true), label.String("String", "test"), } - _, span := tr.Start(context.Background(), "AlwaysSample") + _, span := tr.Start(ctx, "AlwaysSample") span.SetAttributes(testKvs...) span.End() @@ -520,7 +529,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { <-time.After(40 * time.Millisecond) // Now shutdown the exporter - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, time.Millisecond) defer cancel() if err := exp.Shutdown(ctx); err != nil { t.Fatalf("failed to stop the exporter: %v", err) diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index eaf31a7cb96..afcb28ddfea 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -34,7 +34,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) { orig(ch) }() } - if err := e.Start(); err != nil { + if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -59,7 +59,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) { orig(ch) }() } - if err := e.Start(); err != nil { + if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -78,7 +78,7 @@ func TestExporterShutdownNoError(t *testing.T) { defer cancel() e := NewUnstartedExporter() - if err := e.Start(); err != nil { + if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -89,7 +89,7 @@ func TestExporterShutdownNoError(t *testing.T) { func TestExporterShutdownManyTimes(t *testing.T) { ctx := context.Background() - e, err := NewExporter() + e, err := NewExporter(ctx) if err != nil { t.Fatalf("failed to start an exporter: %v", err) } From 1cd7d2c17516378b1da4138379f95ac743ed9c80 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 15:38:04 +0100 Subject: [PATCH 10/13] Stop connecting on shutdown Dialling to grpc service ignored the closing of the stop channel, but this can be easily changed. --- exporters/otlp/connection.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 5c4a8e6db0e..7f6c38ec8a7 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -223,6 +223,8 @@ func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn dialOpts = append(dialOpts, oc.c.grpcDialOptions...) } + ctx, cancel := oc.contextWithStop(ctx) + defer cancel() ctx = oc.contextWithMetadata(ctx) return grpc.DialContext(ctx, addr, dialOpts...) } From 30fab2867b24eb345aacaa8e9950978bf06f551d Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Thu, 19 Nov 2020 15:39:07 +0100 Subject: [PATCH 11/13] Close connection after background is shut down That way we can make sure that there won't be a window between closing a connection and waiting for the background goroutine to return, where the new connection could be established. --- exporters/otlp/connection.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 7f6c38ec8a7..283d6d42d55 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -33,7 +33,7 @@ type grpcConnection struct { // mu protects the connection as it is accessed by the // exporter goroutines and background connection goroutine - mu sync.RWMutex + mu sync.Mutex cc *grpc.ClientConn // these fields are read-only after constructor is finished @@ -237,15 +237,6 @@ func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Conte } func (oc *grpcConnection) shutdown(ctx context.Context) error { - oc.mu.RLock() - cc := oc.cc - oc.mu.RUnlock() - - var err error - if cc != nil { - err = cc.Close() - } - close(oc.stopCh) // Ensure that the backgroundConnector returns select { @@ -256,7 +247,16 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error { close(oc.disconnectedCh) - return err + oc.mu.Lock() + cc := oc.cc + oc.cc = nil + oc.mu.Unlock() + + if cc != nil { + return cc.Close() + } + + return nil } func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { From 885570a218d3e031c15d3f2fa6f461424d52e733 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Fri, 20 Nov 2020 18:05:20 +0100 Subject: [PATCH 12/13] Remove unnecessary nil check This member is never nil, unless the Exporter is created like &Exporter{}, which is not a thing we support anyway. --- exporters/otlp/otlp.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 3beaf589bfa..f8c1c5b0e76 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -143,10 +143,8 @@ func (e *Exporter) Shutdown(ctx context.Context) error { var err error e.stopOnce.Do(func() { - if cc != nil { - // Clean things up before checking this error. - err = cc.shutdown(ctx) - } + // Clean things up before checking this error. + err = cc.shutdown(ctx) // At this point we can change the state variable started e.mu.Lock() From 2368f435ffd8c4305601b1896d7a6e0a96bbefeb Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Fri, 20 Nov 2020 17:55:57 +0100 Subject: [PATCH 13/13] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 347742c0b11..d1823333836 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - Move the OpenCensus example into `example` directory. (#1359) +- `NewExporter` and `Start` functions in `go.opentelemetry.io/otel/exporters/otlp` now receive `context.Context` as a first parameter. (#1357) ## [0.14.0] - 2020-11-19