Skip to content

Commit 7805fdc

Browse files
committed
http2: rewrite inbound flow control tracking
Add a new inflow type for tracking inbound flow control. An inflow tracks both the window sent to the peer, and the window we are willing to send. Updates are accumulated and sent in a batch when the unsent window update is large enough. This change makes both the client and server use the same algorithm to decide when to send window updates. This should slightly reduce the rate of updates sent by the client, and significantly reduce the rate sent by the server. Fix a client flow control tracking bug: When processing data for a canceled stream, the record of flow control consumed by the peer was not updated to account for the discard stream. Fixes golang/go#28732 Fixes golang/go#56558 Change-Id: Id119d17b84b46f3dc2719f28a86758d9a10085d9 Reviewed-on: https://go-review.googlesource.com/c/net/+/448155 Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org> TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Heschi Kreinick <heschi@google.com> Run-TryBot: Damien Neil <dneil@google.com>
1 parent 2aa8215 commit 7805fdc

File tree

6 files changed

+403
-206
lines changed

6 files changed

+403
-206
lines changed

http2/flow.go

+78-10
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,99 @@
66

77
package http2
88

9-
// flow is the flow control window's size.
10-
type flow struct {
9+
// inflowMinRefresh is the minimum number of bytes we'll send for a
10+
// flow control window update.
11+
const inflowMinRefresh = 4 << 10
12+
13+
// inflow accounts for an inbound flow control window.
14+
// It tracks both the latest window sent to the peer (used for enforcement)
15+
// and the accumulated unsent window.
16+
type inflow struct {
17+
avail int32
18+
unsent int32
19+
}
20+
21+
// set sets the initial window.
22+
func (f *inflow) init(n int32) {
23+
f.avail = n
24+
}
25+
26+
// add adds n bytes to the window, with a maximum window size of max,
27+
// indicating that the peer can now send us more data.
28+
// For example, the user read from a {Request,Response} body and consumed
29+
// some of the buffered data, so the peer can now send more.
30+
// It returns the number of bytes to send in a WINDOW_UPDATE frame to the peer.
31+
// Window updates are accumulated and sent when the unsent capacity
32+
// is at least inflowMinRefresh or will at least double the peer's available window.
33+
func (f *inflow) add(n int) (connAdd int32) {
34+
if n < 0 {
35+
panic("negative update")
36+
}
37+
unsent := int64(f.unsent) + int64(n)
38+
// "A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets."
39+
// RFC 7540 Section 6.9.1.
40+
const maxWindow = 1<<31 - 1
41+
if unsent+int64(f.avail) > maxWindow {
42+
panic("flow control update exceeds maximum window size")
43+
}
44+
f.unsent = int32(unsent)
45+
if f.unsent < inflowMinRefresh && f.unsent < f.avail {
46+
// If there aren't at least inflowMinRefresh bytes of window to send,
47+
// and this update won't at least double the window, buffer the update for later.
48+
return 0
49+
}
50+
f.avail += f.unsent
51+
f.unsent = 0
52+
return int32(unsent)
53+
}
54+
55+
// take attempts to take n bytes from the peer's flow control window.
56+
// It reports whether the window has available capacity.
57+
func (f *inflow) take(n uint32) bool {
58+
if n > uint32(f.avail) {
59+
return false
60+
}
61+
f.avail -= int32(n)
62+
return true
63+
}
64+
65+
// takeInflows attempts to take n bytes from two inflows,
66+
// typically connection-level and stream-level flows.
67+
// It reports whether both windows have available capacity.
68+
func takeInflows(f1, f2 *inflow, n uint32) bool {
69+
if n > uint32(f1.avail) || n > uint32(f2.avail) {
70+
return false
71+
}
72+
f1.avail -= int32(n)
73+
f2.avail -= int32(n)
74+
return true
75+
}
76+
77+
// outflow is the outbound flow control window's size.
78+
type outflow struct {
1179
_ incomparable
1280

1381
// n is the number of DATA bytes we're allowed to send.
14-
// A flow is kept both on a conn and a per-stream.
82+
// An outflow is kept both on a conn and a per-stream.
1583
n int32
1684

17-
// conn points to the shared connection-level flow that is
18-
// shared by all streams on that conn. It is nil for the flow
85+
// conn points to the shared connection-level outflow that is
86+
// shared by all streams on that conn. It is nil for the outflow
1987
// that's on the conn directly.
20-
conn *flow
88+
conn *outflow
2189
}
2290

23-
func (f *flow) setConnFlow(cf *flow) { f.conn = cf }
91+
func (f *outflow) setConnFlow(cf *outflow) { f.conn = cf }
2492

25-
func (f *flow) available() int32 {
93+
func (f *outflow) available() int32 {
2694
n := f.n
2795
if f.conn != nil && f.conn.n < n {
2896
n = f.conn.n
2997
}
3098
return n
3199
}
32100

33-
func (f *flow) take(n int32) {
101+
func (f *outflow) take(n int32) {
34102
if n > f.available() {
35103
panic("internal error: took too much")
36104
}
@@ -42,7 +110,7 @@ func (f *flow) take(n int32) {
42110

43111
// add adds n bytes (positive or negative) to the flow control window.
44112
// It returns false if the sum would exceed 2^31-1.
45-
func (f *flow) add(n int32) bool {
113+
func (f *outflow) add(n int32) bool {
46114
sum := f.n + n
47115
if (sum > n) == (f.n > 0) {
48116
f.n = sum

http2/flow_test.go

+59-7
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,61 @@ package http2
66

77
import "testing"
88

9-
func TestFlow(t *testing.T) {
10-
var st flow
11-
var conn flow
9+
func TestInFlowTake(t *testing.T) {
10+
var f inflow
11+
f.init(100)
12+
if !f.take(40) {
13+
t.Fatalf("f.take(40) from 100: got false, want true")
14+
}
15+
if !f.take(40) {
16+
t.Fatalf("f.take(40) from 60: got false, want true")
17+
}
18+
if f.take(40) {
19+
t.Fatalf("f.take(40) from 20: got true, want false")
20+
}
21+
if !f.take(20) {
22+
t.Fatalf("f.take(20) from 20: got false, want true")
23+
}
24+
}
25+
26+
func TestInflowAddSmall(t *testing.T) {
27+
var f inflow
28+
f.init(0)
29+
// Adding even a small amount when there is no flow causes an immediate send.
30+
if got, want := f.add(1), int32(1); got != want {
31+
t.Fatalf("f.add(1) to 1 = %v, want %v", got, want)
32+
}
33+
}
34+
35+
func TestInflowAdd(t *testing.T) {
36+
var f inflow
37+
f.init(10 * inflowMinRefresh)
38+
if got, want := f.add(inflowMinRefresh-1), int32(0); got != want {
39+
t.Fatalf("f.add(minRefresh - 1) = %v, want %v", got, want)
40+
}
41+
if got, want := f.add(1), int32(inflowMinRefresh); got != want {
42+
t.Fatalf("f.add(minRefresh) = %v, want %v", got, want)
43+
}
44+
}
45+
46+
func TestTakeInflows(t *testing.T) {
47+
var a, b inflow
48+
a.init(10)
49+
b.init(20)
50+
if !takeInflows(&a, &b, 5) {
51+
t.Fatalf("takeInflows(a, b, 5) from 10, 20: got false, want true")
52+
}
53+
if takeInflows(&a, &b, 6) {
54+
t.Fatalf("takeInflows(a, b, 6) from 5, 15: got true, want false")
55+
}
56+
if !takeInflows(&a, &b, 5) {
57+
t.Fatalf("takeInflows(a, b, 5) from 5, 15: got false, want true")
58+
}
59+
}
60+
61+
func TestOutFlow(t *testing.T) {
62+
var st outflow
63+
var conn outflow
1264
st.add(3)
1365
conn.add(2)
1466

@@ -29,8 +81,8 @@ func TestFlow(t *testing.T) {
2981
}
3082
}
3183

32-
func TestFlowAdd(t *testing.T) {
33-
var f flow
84+
func TestOutFlowAdd(t *testing.T) {
85+
var f outflow
3486
if !f.add(1) {
3587
t.Fatal("failed to add 1")
3688
}
@@ -51,8 +103,8 @@ func TestFlowAdd(t *testing.T) {
51103
}
52104
}
53105

54-
func TestFlowAddOverflow(t *testing.T) {
55-
var f flow
106+
func TestOutFlowAddOverflow(t *testing.T) {
107+
var f outflow
56108
if !f.add(0) {
57109
t.Fatal("failed to add 0")
58110
}

http2/server.go

+30-55
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
448448
// configured value for inflow, that will be updated when we send a
449449
// WINDOW_UPDATE shortly after sending SETTINGS.
450450
sc.flow.add(initialWindowSize)
451-
sc.inflow.add(initialWindowSize)
451+
sc.inflow.init(initialWindowSize)
452452
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
453453
sc.hpackEncoder.SetMaxDynamicTableSizeLimit(s.maxEncoderHeaderTableSize())
454454

@@ -563,8 +563,8 @@ type serverConn struct {
563563
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
564564
bodyReadCh chan bodyReadMsg // from handlers -> serve
565565
serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
566-
flow flow // conn-wide (not stream-specific) outbound flow control
567-
inflow flow // conn-wide inbound flow control
566+
flow outflow // conn-wide (not stream-specific) outbound flow control
567+
inflow inflow // conn-wide inbound flow control
568568
tlsState *tls.ConnectionState // shared by all handlers, like net/http
569569
remoteAddrStr string
570570
writeSched WriteScheduler
@@ -641,10 +641,10 @@ type stream struct {
641641
cancelCtx func()
642642

643643
// owned by serverConn's serve loop:
644-
bodyBytes int64 // body bytes seen so far
645-
declBodyBytes int64 // or -1 if undeclared
646-
flow flow // limits writing from Handler to client
647-
inflow flow // what the client is allowed to POST/etc to us
644+
bodyBytes int64 // body bytes seen so far
645+
declBodyBytes int64 // or -1 if undeclared
646+
flow outflow // limits writing from Handler to client
647+
inflow inflow // what the client is allowed to POST/etc to us
648648
state streamState
649649
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
650650
gotTrailerHeader bool // HEADER frame for trailers was seen
@@ -1503,7 +1503,7 @@ func (sc *serverConn) processFrame(f Frame) error {
15031503
if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || f.Header().StreamID > sc.maxClientStreamID) {
15041504

15051505
if f, ok := f.(*DataFrame); ok {
1506-
if sc.inflow.available() < int32(f.Length) {
1506+
if !sc.inflow.take(f.Length) {
15071507
return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
15081508
}
15091509
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
@@ -1775,14 +1775,9 @@ func (sc *serverConn) processData(f *DataFrame) error {
17751775
// But still enforce their connection-level flow control,
17761776
// and return any flow control bytes since we're not going
17771777
// to consume them.
1778-
if sc.inflow.available() < int32(f.Length) {
1778+
if !sc.inflow.take(f.Length) {
17791779
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
17801780
}
1781-
// Deduct the flow control from inflow, since we're
1782-
// going to immediately add it back in
1783-
// sendWindowUpdate, which also schedules sending the
1784-
// frames.
1785-
sc.inflow.take(int32(f.Length))
17861781
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
17871782

17881783
if st != nil && st.resetQueued {
@@ -1797,10 +1792,9 @@ func (sc *serverConn) processData(f *DataFrame) error {
17971792

17981793
// Sender sending more than they'd declared?
17991794
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1800-
if sc.inflow.available() < int32(f.Length) {
1795+
if !sc.inflow.take(f.Length) {
18011796
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
18021797
}
1803-
sc.inflow.take(int32(f.Length))
18041798
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
18051799

18061800
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
@@ -1811,10 +1805,9 @@ func (sc *serverConn) processData(f *DataFrame) error {
18111805
}
18121806
if f.Length > 0 {
18131807
// Check whether the client has flow control quota.
1814-
if st.inflow.available() < int32(f.Length) {
1808+
if !takeInflows(&sc.inflow, &st.inflow, f.Length) {
18151809
return sc.countError("flow_on_data_length", streamError(id, ErrCodeFlowControl))
18161810
}
1817-
st.inflow.take(int32(f.Length))
18181811

18191812
if len(data) > 0 {
18201813
wrote, err := st.body.Write(data)
@@ -1830,10 +1823,12 @@ func (sc *serverConn) processData(f *DataFrame) error {
18301823

18311824
// Return any padded flow control now, since we won't
18321825
// refund it later on body reads.
1833-
if pad := int32(f.Length) - int32(len(data)); pad > 0 {
1834-
sc.sendWindowUpdate32(nil, pad)
1835-
sc.sendWindowUpdate32(st, pad)
1836-
}
1826+
// Call sendWindowUpdate even if there is no padding,
1827+
// to return buffered flow control credit if the sent
1828+
// window has shrunk.
1829+
pad := int32(f.Length) - int32(len(data))
1830+
sc.sendWindowUpdate32(nil, pad)
1831+
sc.sendWindowUpdate32(st, pad)
18371832
}
18381833
if f.StreamEnded() {
18391834
st.endStream()
@@ -2105,8 +2100,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
21052100
st.cw.Init()
21062101
st.flow.conn = &sc.flow // link to conn-level counter
21072102
st.flow.add(sc.initialStreamSendWindowSize)
2108-
st.inflow.conn = &sc.inflow // link to conn-level counter
2109-
st.inflow.add(sc.srv.initialStreamRecvWindowSize())
2103+
st.inflow.init(sc.srv.initialStreamRecvWindowSize())
21102104
if sc.hs.WriteTimeout != 0 {
21112105
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
21122106
}
@@ -2388,47 +2382,28 @@ func (sc *serverConn) noteBodyRead(st *stream, n int) {
23882382
}
23892383

23902384
// st may be nil for conn-level
2391-
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2392-
sc.serveG.check()
2393-
// "The legal range for the increment to the flow control
2394-
// window is 1 to 2^31-1 (2,147,483,647) octets."
2395-
// A Go Read call on 64-bit machines could in theory read
2396-
// a larger Read than this. Very unlikely, but we handle it here
2397-
// rather than elsewhere for now.
2398-
const maxUint31 = 1<<31 - 1
2399-
for n > maxUint31 {
2400-
sc.sendWindowUpdate32(st, maxUint31)
2401-
n -= maxUint31
2402-
}
2403-
sc.sendWindowUpdate32(st, int32(n))
2385+
func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2386+
sc.sendWindowUpdate(st, int(n))
24042387
}
24052388

24062389
// st may be nil for conn-level
2407-
func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2390+
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
24082391
sc.serveG.check()
2409-
if n == 0 {
2410-
return
2411-
}
2412-
if n < 0 {
2413-
panic("negative update")
2414-
}
24152392
var streamID uint32
2416-
if st != nil {
2393+
var send int32
2394+
if st == nil {
2395+
send = sc.inflow.add(n)
2396+
} else {
24172397
streamID = st.id
2398+
send = st.inflow.add(n)
2399+
}
2400+
if send == 0 {
2401+
return
24182402
}
24192403
sc.writeFrame(FrameWriteRequest{
2420-
write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
2404+
write: writeWindowUpdate{streamID: streamID, n: uint32(send)},
24212405
stream: st,
24222406
})
2423-
var ok bool
2424-
if st == nil {
2425-
ok = sc.inflow.add(n)
2426-
} else {
2427-
ok = st.inflow.add(n)
2428-
}
2429-
if !ok {
2430-
panic("internal error; sent too many window updates without decrements?")
2431-
}
24322407
}
24332408

24342409
// requestBody is the Handler's Request.Body type.

0 commit comments

Comments
 (0)