Skip to content

Commit

Permalink
Fix OTLP testing flake: signal connection from mock collector (#1816)
Browse files Browse the repository at this point in the history
* Wrap TCP listener

The mock collector listener now signals when it receives a connection
instead of waiting an arbitrary time an hoping the event happens.

* Only close the listener C chan once

* Apply PR feedback

* Backwards compatible support for closed listener check

* Cleanup
  • Loading branch information
MrAlias authored Apr 17, 2021
1 parent a2cecb6 commit d9566ab
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
Additionally, this tag is overridden, as specified in the OTel specification, if the event contains an attribute with that key. (#1768)
- Zipkin Exporter: Ensure mapping between OTel and Zipkin span data complies with the specification. (#1688)
- Fixed typo for default service name in Jaeger Exporter. (#1797)
- Fix flaky OTLP for the reconnnection of the client connection. (#1527, TBD)

### Changed

Expand Down
75 changes: 65 additions & 10 deletions exporters/otlp/otlpgrpc/mock_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"net"
"runtime"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -107,7 +109,8 @@ type mockCollector struct {
metricSvc *mockMetricService

endpoint string
stopFunc func() error
ln *listener
stopFunc func()
stopOnce sync.Once
}

Expand All @@ -119,8 +122,9 @@ var errAlreadyStopped = fmt.Errorf("already stopped")
func (mc *mockCollector) stop() error {
var err = errAlreadyStopped
mc.stopOnce.Do(func() {
err = nil
if mc.stopFunc != nil {
err = mc.stopFunc()
mc.stopFunc()
}
})
// Give it sometime to shutdown.
Expand Down Expand Up @@ -189,19 +193,70 @@ func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector {
mc := makeMockCollector(t)
collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc)
collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc)
mc.ln = newListener(ln)
go func() {
_ = srv.Serve(ln)
_ = srv.Serve((net.Listener)(mc.ln))
}()

deferFunc := func() error {
srv.Stop()
return ln.Close()
mc.endpoint = ln.Addr().String()
// srv.Stop calls Close on mc.ln.
mc.stopFunc = srv.Stop

return mc
}

type listener struct {
closeOnce sync.Once
wrapped net.Listener
C chan struct{}
}

func newListener(wrapped net.Listener) *listener {
return &listener{
wrapped: wrapped,
C: make(chan struct{}, 1),
}
}

_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
func (l *listener) Close() error { return l.wrapped.Close() }

mc.endpoint = "localhost:" + collectorPortStr
mc.stopFunc = deferFunc
func (l *listener) Addr() net.Addr { return l.wrapped.Addr() }

return mc
// Accept waits for and returns the next connection to the listener. It will
// send a signal on l.C that a connection has been made before returning.
func (l *listener) Accept() (net.Conn, error) {
conn, err := l.wrapped.Accept()
if err != nil {
// Go 1.16 exported net.ErrClosed that could clean up this check, but to
// remain backwards compatible with previous versions of Go that we
// support the following string evaluation is used instead to keep in line
// with the previously recommended way to check this:
// https://github.com/golang/go/issues/4373#issuecomment-353076799
if strings.Contains(err.Error(), "use of closed network connection") {
// If the listener has been closed, do not allow callers of
// WaitForConn to wait for a connection that will never come.
l.closeOnce.Do(func() { close(l.C) })
}
return conn, err
}

select {
case l.C <- struct{}{}:
default:
// If C is full, assume nobody is listening and move on.
}
return conn, nil
}

// WaitForConn will wait indefintely for a connection to be estabilished with
// the listener before returning.
func (l *listener) WaitForConn() {
for {
select {
case <-l.C:
return
default:
runtime.Gosched()
}
}
}
84 changes: 20 additions & 64 deletions exporters/otlp/otlpgrpc/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"net"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -149,44 +148,22 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()

// Wait for a connection.
mc.ln.WaitForConn()

// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()
require.NoError(t, mc.stop())

// first export, it will send disconnected message to the channel on export failure,
// trigger almost immediate reconnection
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)

// Give the exporter sometime to reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// second export, it will detect connection issue, change state of exporter to disconnected and
// send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel)
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused2",
mc.endpoint,
)
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// as a result we have exporter in disconnected state waiting for disconnection message to reconnect

Expand All @@ -195,17 +172,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test

// make sure reconnection loop hits beginning and goes back to waiting mode
// after hitting beginning of the loop it should reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
nmc.ln.WaitForConn()

n := 10
for i := 0; i < n; i++ {
Expand All @@ -226,52 +193,39 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Disconnected collector: spans: got %d want %d", g, w)
}

require.NoError(t, nmc.Stop())
}

func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)

reconnectionPeriod := 20 * time.Millisecond
reconnectionPeriod := 50 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()

mc.ln.WaitForConn()

// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()
require.NoError(t, mc.stop())

// In the test below, we'll stop the collector many times,
// while exporting traces and test to ensure that we can
// reconnect.
for j := 0; j < 3; j++ {

// No endpoint up.
require.Error(
t,
exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "in the midst"}}))

// Now resurrect the collector by making a new one but reusing the
// old endpoint, and the collector should reconnect automatically.
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)

// Give the exporter sometime to reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()
nmc.ln.WaitForConn()

n := 10
for i := 0; i < n; i++ {
Expand All @@ -289,7 +243,9 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w)
}
_ = nmc.stop()

// Disconnect for the next try.
require.NoError(t, nmc.stop())
}
}

Expand Down

0 comments on commit d9566ab

Please sign in to comment.