diff --git a/CHANGELOG.md b/CHANGELOG.md index f08abbc7a66..8d5d9319919 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm Additionally, this tag is overridden, as specified in the OTel specification, if the event contains an attribute with that key. (#1768) - Zipkin Exporter: Ensure mapping between OTel and Zipkin span data complies with the specification. (#1688) - Fixed typo for default service name in Jaeger Exporter. (#1797) +- Fix flaky OTLP for the reconnnection of the client connection. (#1527, TBD) ### Changed diff --git a/exporters/otlp/otlpgrpc/mock_collector_test.go b/exporters/otlp/otlpgrpc/mock_collector_test.go index fd806008a5f..21ae4f22e31 100644 --- a/exporters/otlp/otlpgrpc/mock_collector_test.go +++ b/exporters/otlp/otlpgrpc/mock_collector_test.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "net" + "runtime" + "strings" "sync" "testing" "time" @@ -107,7 +109,8 @@ type mockCollector struct { metricSvc *mockMetricService endpoint string - stopFunc func() error + ln *listener + stopFunc func() stopOnce sync.Once } @@ -119,8 +122,9 @@ var errAlreadyStopped = fmt.Errorf("already stopped") func (mc *mockCollector) stop() error { var err = errAlreadyStopped mc.stopOnce.Do(func() { + err = nil if mc.stopFunc != nil { - err = mc.stopFunc() + mc.stopFunc() } }) // Give it sometime to shutdown. @@ -189,19 +193,70 @@ func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector { mc := makeMockCollector(t) collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc) collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc) + mc.ln = newListener(ln) go func() { - _ = srv.Serve(ln) + _ = srv.Serve((net.Listener)(mc.ln)) }() - deferFunc := func() error { - srv.Stop() - return ln.Close() + mc.endpoint = ln.Addr().String() + // srv.Stop calls Close on mc.ln. + mc.stopFunc = srv.Stop + + return mc +} + +type listener struct { + closeOnce sync.Once + wrapped net.Listener + C chan struct{} +} + +func newListener(wrapped net.Listener) *listener { + return &listener{ + wrapped: wrapped, + C: make(chan struct{}, 1), } +} - _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) +func (l *listener) Close() error { return l.wrapped.Close() } - mc.endpoint = "localhost:" + collectorPortStr - mc.stopFunc = deferFunc +func (l *listener) Addr() net.Addr { return l.wrapped.Addr() } - return mc +// Accept waits for and returns the next connection to the listener. It will +// send a signal on l.C that a connection has been made before returning. +func (l *listener) Accept() (net.Conn, error) { + conn, err := l.wrapped.Accept() + if err != nil { + // Go 1.16 exported net.ErrClosed that could clean up this check, but to + // remain backwards compatible with previous versions of Go that we + // support the following string evaluation is used instead to keep in line + // with the previously recommended way to check this: + // https://github.com/golang/go/issues/4373#issuecomment-353076799 + if strings.Contains(err.Error(), "use of closed network connection") { + // If the listener has been closed, do not allow callers of + // WaitForConn to wait for a connection that will never come. + l.closeOnce.Do(func() { close(l.C) }) + } + return conn, err + } + + select { + case l.C <- struct{}{}: + default: + // If C is full, assume nobody is listening and move on. + } + return conn, nil +} + +// WaitForConn will wait indefintely for a connection to be estabilished with +// the listener before returning. +func (l *listener) WaitForConn() { + for { + select { + case <-l.C: + return + default: + runtime.Gosched() + } + } } diff --git a/exporters/otlp/otlpgrpc/otlp_integration_test.go b/exporters/otlp/otlpgrpc/otlp_integration_test.go index 0b3a722a615..b598b9dd512 100644 --- a/exporters/otlp/otlpgrpc/otlp_integration_test.go +++ b/exporters/otlp/otlpgrpc/otlp_integration_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "net" - "runtime" "strings" "testing" "time" @@ -149,44 +148,22 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithReconnectionPeriod(reconnectionPeriod)) - defer func() { - _ = exp.Shutdown(ctx) - }() + defer func() { require.NoError(t, exp.Shutdown(ctx)) }() + + // Wait for a connection. + mc.ln.WaitForConn() // We'll now stop the collector right away to simulate a connection // dying in the midst of communication or even not existing before. - _ = mc.stop() + require.NoError(t, mc.stop()) // first export, it will send disconnected message to the channel on export failure, // trigger almost immediate reconnection - require.Error( - t, - exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}), - "transport: Error while dialing dial tcp %s: connect: connection refused", - mc.endpoint, - ) - - // Give the exporter sometime to reconnect - func() { - timer := time.After(reconnectionPeriod * 10) - for { - select { - case <-timer: - return - default: - runtime.Gosched() - } - } - }() + require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}})) // second export, it will detect connection issue, change state of exporter to disconnected and // send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel) - require.Error( - t, - exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}), - "transport: Error while dialing dial tcp %s: connect: connection refused2", - mc.endpoint, - ) + require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}})) // as a result we have exporter in disconnected state waiting for disconnection message to reconnect @@ -195,17 +172,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test // make sure reconnection loop hits beginning and goes back to waiting mode // after hitting beginning of the loop it should reconnect - func() { - timer := time.After(reconnectionPeriod * 10) - for { - select { - case <-timer: - return - default: - runtime.Gosched() - } - } - }() + nmc.ln.WaitForConn() n := 10 for i := 0; i < n; i++ { @@ -226,22 +193,24 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test if g, w := len(dSpans), 0; g != w { t.Fatalf("Disconnected collector: spans: got %d want %d", g, w) } + + require.NoError(t, nmc.Stop()) } func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { mc := runMockCollector(t) - reconnectionPeriod := 20 * time.Millisecond + reconnectionPeriod := 50 * time.Millisecond ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithReconnectionPeriod(reconnectionPeriod)) - defer func() { - _ = exp.Shutdown(ctx) - }() + defer func() { require.NoError(t, exp.Shutdown(ctx)) }() + + mc.ln.WaitForConn() // We'll now stop the collector right away to simulate a connection // dying in the midst of communication or even not existing before. - _ = mc.stop() + require.NoError(t, mc.stop()) // In the test below, we'll stop the collector many times, // while exporting traces and test to ensure that we can @@ -249,29 +218,14 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { for j := 0; j < 3; j++ { // No endpoint up. - require.Error( - t, - exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}), - "transport: Error while dialing dial tcp %s: connect: connection refused", - mc.endpoint, - ) + require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}})) // Now resurrect the collector by making a new one but reusing the // old endpoint, and the collector should reconnect automatically. nmc := runMockCollectorAtEndpoint(t, mc.endpoint) // Give the exporter sometime to reconnect - func() { - timer := time.After(reconnectionPeriod * 10) - for { - select { - case <-timer: - return - default: - runtime.Gosched() - } - } - }() + nmc.ln.WaitForConn() n := 10 for i := 0; i < n; i++ { @@ -289,7 +243,9 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { if g, w := len(dSpans), 0; g != w { t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w) } - _ = nmc.stop() + + // Disconnect for the next try. + require.NoError(t, nmc.stop()) } }