Skip to content

Commit

Permalink
internal/grpc: handle one response per request
Browse files Browse the repository at this point in the history
Updates projectcontour#499
Updates projectcontour#273
Updates projectcontour#1176

The XDS spec says that Envoy will always initiate a stream with a
discovery request, and expects the management server to respond with
only one discovery response. After that, Envoy will initiate another
discovery request containing an ACK or a NACK from the previous
response.

Currently Contour ignores the ACK/NACK, this is projectcontour#1176, however after
inspection of the current code it is evident that we're also not waiting
for Envoy to send the next discovery request.

This PR removes the inner `for {}` loop that would continue to reuse the
initial discovery request until the client disconnected. The previous
code was written in a time when we'd just implemented filtering and it
was possible for the filter to return no results, hence the inner loop
was--incorrectly--trying to loop until there was a result to return.

Huge thanks to @lrouquette who pointed this out.

Signed-off-by: Dave Cheney <dave@cheney.net>
  • Loading branch information
davecheney committed Jun 19, 2019
1 parent 686f02e commit b2b3669
Showing 1 changed file with 40 additions and 42 deletions.
82 changes: 40 additions & 42 deletions internal/grpc/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (xh *xdsHandler) stream(st grpcStream) (err error) {
return err
}

// TODO(dfc) issue 1176: handle xDS ACK/NACK

// from the request we derive the resource to stream which have
// been registered according to the typeURL.
r, ok := xh.resources[req.TypeUrl]
Expand All @@ -97,49 +99,45 @@ func (xh *xdsHandler) stream(st grpcStream) (err error) {
// so the next time around the loop all is forgotten.
log := log.WithField("version_info", req.VersionInfo).WithField("resource_names", req.ResourceNames).WithField("type_url", req.TypeUrl).WithField("response_nonce", req.ResponseNonce).WithField("error_detail", req.ErrorDetail)

for {
log.Info("stream_wait")

// now we wait for a notification, if this is the first time through the loop
// then last will be less than zero and that will trigger a notification immediately.
r.Register(ch, last)
select {
case last = <-ch:
// boom, something in the cache has changed.
// TODO(dfc) the thing that has changed may not be in the scope of the filter
// so we're going to be sending an update that is a no-op. See #426

var resources []proto.Message
switch len(req.ResourceNames) {
case 0:
// no resource hints supplied, return the full
// contents of the resource
resources = r.Contents()
default:
// resource hints supplied, return exactly those
resources = r.Query(req.ResourceNames)
}

any, err := toAny(r.TypeURL(), resources)
if err != nil {
return err
}

resp := &v2.DiscoveryResponse{
VersionInfo: strconv.Itoa(last),
Resources: any,
TypeUrl: r.TypeURL(),
Nonce: strconv.Itoa(last),
}
if err := st.Send(resp); err != nil {
return err
}
log.WithField("count", len(resources)).Info("response")

// ok, the client hung up, return any error stored in the context and we're done.
case <-ctx.Done():
return ctx.Err()
log.Info("stream_wait")

// now we wait for a notification, if this is the first request received on this
// connection last will be less than zero and that will trigger a response immediately.
r.Register(ch, last)
select {
case last = <-ch:
// boom, something in the cache has changed.
// TODO(dfc) the thing that has changed may not be in the scope of the filter
// so we're going to be sending an update that is a no-op. See #426

var resources []proto.Message
switch len(req.ResourceNames) {
case 0:
// no resource hints supplied, return the full
// contents of the resource
resources = r.Contents()
default:
// resource hints supplied, return exactly those
resources = r.Query(req.ResourceNames)
}

any, err := toAny(r.TypeURL(), resources)
if err != nil {
return err
}

resp := &v2.DiscoveryResponse{
VersionInfo: strconv.Itoa(last),
Resources: any,
TypeUrl: r.TypeURL(),
Nonce: strconv.Itoa(last),
}
if err := st.Send(resp); err != nil {
return err
}
log.WithField("count", len(resources)).Info("response")
case <-ctx.Done():
return ctx.Err()
}
}
}
Expand Down

0 comments on commit b2b3669

Please sign in to comment.