Skip to content

Commit dbd4cbc

Browse files
authored
cherry-pick #6977 to 1.61.x release branch (#6980)
1 parent 57ed608 commit dbd4cbc

File tree

3 files changed

+29
-7
lines changed

3 files changed

+29
-7
lines changed

internal/transport/controlbuf.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -535,16 +535,15 @@ const minBatchSize = 1000
535535
// size is too low to give stream goroutines a chance to fill it up.
536536
//
537537
// Upon exiting, if the error causing the exit is not an I/O error, run()
538-
// flushes and closes the underlying connection. Otherwise, the connection is
539-
// left open to allow the I/O error to be encountered by the reader instead.
538+
// flushes the underlying connection. The connection is always left open to
539+
// allow different closing behavior on the client and server.
540540
func (l *loopyWriter) run() (err error) {
541541
defer func() {
542542
if l.logger.V(logLevel) {
543543
l.logger.Infof("loopyWriter exiting with error: %v", err)
544544
}
545545
if !isIOError(err) {
546546
l.framer.writer.Flush()
547-
l.conn.Close()
548547
}
549548
l.cbuf.finish()
550549
}()

internal/transport/http2_client.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,13 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
451451
}
452452
go func() {
453453
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
454-
t.loopy.run()
454+
if err := t.loopy.run(); !isIOError(err) {
455+
// Immediately close the connection, as the loopy writer returns
456+
// when there are no more active streams and we were draining (the
457+
// server sent a GOAWAY). For I/O errors, the reader will hit it
458+
// after draining any remaining incoming data.
459+
t.conn.Close()
460+
}
455461
close(t.writerDone)
456462
}()
457463
return t, nil

internal/transport/http2_server.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
322322
go func() {
323323
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
324324
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
325-
t.loopy.run()
325+
err := t.loopy.run()
326326
close(t.loopyWriterDone)
327+
if !isIOError(err) {
328+
// Close the connection if a non-I/O error occurs (for I/O errors
329+
// the reader will also encounter the error and close). Wait 1
330+
// second before closing the connection, or when the reader is done
331+
// (i.e. the client already closed the connection or a connection
332+
// error occurred). This avoids the potential problem where there
333+
// is unread data on the receive side of the connection, which, if
334+
// closed, would lead to a TCP RST instead of FIN, and the client
335+
// encountering errors. For more info:
336+
// https://github.com/grpc/grpc-go/issues/5358
337+
select {
338+
case <-t.readerDone:
339+
case <-time.After(time.Second):
340+
}
341+
t.conn.Close()
342+
}
327343
}()
328344
go t.keepalive()
329345
return t, nil
@@ -609,8 +625,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
609625
// traceCtx attaches trace to ctx and returns the new context.
610626
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
611627
defer func() {
612-
<-t.loopyWriterDone
613628
close(t.readerDone)
629+
<-t.loopyWriterDone
614630
}()
615631
for {
616632
t.controlBuf.throttle()
@@ -1329,6 +1345,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
13291345
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
13301346
return false, err
13311347
}
1348+
t.framer.writer.Flush()
13321349
if retErr != nil {
13331350
return false, retErr
13341351
}
@@ -1349,7 +1366,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
13491366
return false, err
13501367
}
13511368
go func() {
1352-
timer := time.NewTimer(time.Minute)
1369+
timer := time.NewTimer(5 * time.Second)
13531370
defer timer.Stop()
13541371
select {
13551372
case <-t.drainEvent.Done():

0 commit comments

Comments
 (0)