Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Goroutine leak in DeltaStreamHandler: fix for issue 913: #916

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/prometheus/client_model v0.6.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/proto/otlp v1.0.0
go.uber.org/goleak v1.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80
google.golang.org/grpc v1.62.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,8 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
18 changes: 8 additions & 10 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,21 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro

// we need to concurrently handle incoming requests since we kick off processDelta as a return
go func() {
defer close(reqCh)
for {
req, err := str.Recv()
if err != nil {
return
}
select {
case reqCh <- req:
case <-str.Context().Done():
close(reqCh)
return
default:
req, err := str.Recv()
if err != nil {
close(reqCh)
return
}

reqCh <- req
case <-s.ctx.Done():
return
}
}
}()

return s.processDelta(str, reqCh, typeURL)
}

Expand Down
60 changes: 56 additions & 4 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.uber.org/goleak"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
Expand Down Expand Up @@ -103,6 +105,7 @@ type mockDeltaStream struct {
nonce int
sendError bool
grpc.ServerStream
cancel func()
}

func (stream *mockDeltaStream) Context() context.Context {
Expand Down Expand Up @@ -146,11 +149,13 @@ func (stream *mockDeltaStream) Recv() (*discovery.DeltaDiscoveryRequest, error)
}

func makeMockDeltaStream(t *testing.T) *mockDeltaStream {
ctx, cancel := context.WithCancel(context.Background())
return &mockDeltaStream{
t: t,
ctx: context.Background(),
sent: make(chan *discovery.DeltaDiscoveryResponse, 10),
recv: make(chan *discovery.DeltaDiscoveryRequest, 10),
t: t,
ctx: ctx,
sent: make(chan *discovery.DeltaDiscoveryResponse, 10),
recv: make(chan *discovery.DeltaDiscoveryRequest, 10),
cancel: cancel,
}
}

Expand Down Expand Up @@ -227,6 +232,7 @@ func TestDeltaResponseHandlersWildcard(t *testing.T) {
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})

resp := makeMockDeltaStream(t)
defer resp.cancel()
// This is a wildcard request since we don't specify a list of resource subscriptions
resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node, TypeUrl: typ}

Expand Down Expand Up @@ -256,6 +262,7 @@ func TestDeltaResponseHandlers(t *testing.T) {
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})

resp := makeMockDeltaStream(t)
defer resp.cancel()
resourceNames := []string{}
for resourceName := range config.deltaResources[typ] {
resourceNames = append(resourceNames, resourceName)
Expand Down Expand Up @@ -290,6 +297,7 @@ func TestSendDeltaError(t *testing.T) {

// make a request with an error
resp := makeMockDeltaStream(t)
defer resp.cancel()
resp.sendError = true
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
Expand All @@ -309,6 +317,7 @@ func TestDeltaAggregatedHandlers(t *testing.T) {
config := makeMockConfigWatcher()
config.deltaResources = makeDeltaResources()
resp := makeMockDeltaStream(t)
defer resp.cancel()

reqs := []*discovery.DeltaDiscoveryRequest{
{
Expand Down Expand Up @@ -383,9 +392,11 @@ func TestDeltaAggregatedHandlers(t *testing.T) {
}

func TestDeltaAggregateRequestType(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
resp := makeMockDeltaStream(t)
defer resp.cancel()
resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node}
if err := s.DeltaAggregatedResources(resp); err == nil {
t.Error("DeltaAggregatedResources() => got nil, want an error")
Expand All @@ -394,8 +405,10 @@ func TestDeltaAggregateRequestType(t *testing.T) {
}

func TestDeltaCancellations(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
resp := makeMockDeltaStream(t)
defer resp.cancel()
for _, typ := range testTypes {
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
Expand All @@ -413,8 +426,10 @@ func TestDeltaCancellations(t *testing.T) {
}

func TestDeltaOpaqueRequestsChannelMuxing(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
resp := makeMockDeltaStream(t)
defer resp.cancel()
for i := 0; i < 10; i++ {
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
Expand All @@ -435,6 +450,7 @@ func TestDeltaOpaqueRequestsChannelMuxing(t *testing.T) {
func TestDeltaCallbackError(t *testing.T) {
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
config.deltaResources = makeDeltaResources()

Expand All @@ -446,6 +462,7 @@ func TestDeltaCallbackError(t *testing.T) {

// make a request
resp := makeMockDeltaStream(t)
defer resp.cancel()
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: typ,
Expand Down Expand Up @@ -498,7 +515,9 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
}

t.Run("legacy still working", func(t *testing.T) {
defer goleak.VerifyNone(t)
resp := makeMockDeltaStream(t)
defer resp.cancel()
defer close(resp.recv)
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
Expand Down Expand Up @@ -536,6 +555,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {

t.Run("* subscription/unsubscription support", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer resp.cancel()
defer close(resp.recv)
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
Expand Down Expand Up @@ -581,6 +601,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {

t.Run("resource specific subscriptions while using wildcard", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer resp.cancel()
defer close(resp.recv)
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
Expand Down Expand Up @@ -616,3 +637,34 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
validateResponse(t, resp.sent, []string{"endpoints2"}, []string{"endpoints4"})
})
}

func TestDeltaMultipleStreams(t *testing.T) {
// Unit test for issue identified in https://github.com/envoyproxy/go-control-plane/issues/913
t.Run("return error to delta stream request; multiple streams.", func(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
resp := makeMockDeltaStream(t)
defer close(resp.recv)
defer resp.cancel()
s := server.NewServer(
context.Background(),
config,
server.CallbackFuncs{
StreamDeltaRequestFunc: func(int64, *discovery.DeltaDiscoveryRequest) error {
return fmt.Errorf("error")
},
},
)

for i := 0; i < 2; i++ {
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{"*"},
}
}

err := s.DeltaAggregatedResources(resp)
require.Error(t, err)
})
}