diff --git a/const.go b/const.go index 9e11ba3..c326f96 100644 --- a/const.go +++ b/const.go @@ -114,7 +114,8 @@ const ( const ( // initialStreamWindow is the initial stream window size - initialStreamWindow uint32 = 256 * 1024 + initialStreamWindow uint32 = 64 * 1024 + maxStreamWindow uint32 = 16 * 1024 * 1024 ) const ( diff --git a/mux.go b/mux.go index 8fc5221..8113646 100644 --- a/mux.go +++ b/mux.go @@ -1,6 +1,7 @@ package yamux import ( + "errors" "fmt" "io" "net" @@ -30,6 +31,10 @@ type Config struct { // an expectation that things will move along quickly. ConnectionWriteTimeout time.Duration + // InitialStreamWindowSize is used to control the initial + // window size that we allow for a stream. + InitialStreamWindowSize uint32 + // MaxStreamWindowSize is used to control the maximum // window size that we allow for a stream. MaxStreamWindowSize uint32 @@ -55,16 +60,17 @@ type Config struct { // DefaultConfig is used to return a default configuration func DefaultConfig() *Config { return &Config{ - AcceptBacklog: 256, - PingBacklog: 32, - EnableKeepAlive: true, - KeepAliveInterval: 30 * time.Second, - ConnectionWriteTimeout: 10 * time.Second, - MaxStreamWindowSize: initialStreamWindow, - LogOutput: os.Stderr, - ReadBufSize: 4096, - MaxMessageSize: 64 * 1024, - WriteCoalesceDelay: 100 * time.Microsecond, + AcceptBacklog: 256, + PingBacklog: 32, + EnableKeepAlive: true, + KeepAliveInterval: 30 * time.Second, + ConnectionWriteTimeout: 10 * time.Second, + InitialStreamWindowSize: initialStreamWindow, + MaxStreamWindowSize: maxStreamWindow, + LogOutput: os.Stderr, + ReadBufSize: 4096, + MaxMessageSize: 64 * 1024, + WriteCoalesceDelay: 100 * time.Microsecond, } } @@ -76,8 +82,8 @@ func VerifyConfig(config *Config) error { if config.KeepAliveInterval == 0 { return fmt.Errorf("keep-alive interval must be positive") } - if config.MaxStreamWindowSize < initialStreamWindow { - return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow) + if config.MaxStreamWindowSize < config.InitialStreamWindowSize { + return errors.New("MaxStreamWindowSize must be larger than the InitialStreamWindowSize") } if config.MaxMessageSize < 1024 { return fmt.Errorf("MaxMessageSize must be greater than a kilobyte") diff --git a/session.go b/session.go index 55b4e28..218d345 100644 --- a/session.go +++ b/session.go @@ -21,6 +21,8 @@ import ( // Session is used to wrap a reliable ordered connection and to // multiplex it into multiple streams. type Session struct { + rtt int64 // to be accessed atomically, in nanoseconds + // remoteGoAway indicates the remote side does // not want futher connections. Must be first for alignment. remoteGoAway int32 @@ -129,6 +131,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio } go s.recv() go s.send() + go s.measureRTT() return s } @@ -291,6 +294,19 @@ func (s *Session) goAway(reason uint32) header { return hdr } +func (s *Session) measureRTT() { + rtt, err := s.Ping() + if err != nil { + return + } + atomic.StoreInt64(&s.rtt, rtt.Nanoseconds()) +} + +// 0 if we don't yet have a measurement +func (s *Session) getRTT() time.Duration { + return time.Duration(atomic.LoadInt64(&s.rtt)) +} + // Ping is used to measure the RTT response time func (s *Session) Ping() (dur time.Duration, err error) { // Prepare a ping. diff --git a/session_norace_test.go b/session_norace_test.go index 4c45bd7..6acf760 100644 --- a/session_norace_test.go +++ b/session_norace_test.go @@ -159,7 +159,7 @@ func TestLargeWindow(t *testing.T) { if err != nil { t.Fatal(err) } - buf := make([]byte, conf.MaxStreamWindowSize) + buf := make([]byte, initialStreamWindow) n, err := stream.Write(buf) if err != nil { t.Fatalf("err: %v", err) diff --git a/session_test.go b/session_test.go index 02aae39..0421d36 100644 --- a/session_test.go +++ b/session_test.go @@ -1165,7 +1165,7 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) { wg.Add(1) // Choose a huge flood size that we know will result in a window update. - flood := int64(client.config.MaxStreamWindowSize) + flood := int64(initialStreamWindow) var wr *Stream // The server will accept a new stream and then flood data to it. @@ -1180,7 +1180,7 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) { } sendWindow := atomic.LoadUint32(&wr.sendWindow) - if sendWindow != client.config.MaxStreamWindowSize { + if sendWindow != initialStreamWindow { t.Errorf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, sendWindow) return } @@ -1215,8 +1215,9 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) { } var ( - exp = uint32(flood / 2) - sendWindow uint32 + expWithoutWindowIncrease = uint32(flood / 2) + expWithWindowIncrease = uint32(flood * 3 / 2) + sendWindow uint32 ) // This test is racy. Wait a short period, then longer and longer. At @@ -1224,11 +1225,11 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) { for i := 1; i < 15; i++ { time.Sleep(time.Duration(i*i) * time.Millisecond) sendWindow = atomic.LoadUint32(&wr.sendWindow) - if sendWindow == exp { + if sendWindow == expWithoutWindowIncrease || sendWindow == expWithWindowIncrease { return } } - t.Errorf("sendWindow: exp=%d, got=%d", exp, sendWindow) + t.Errorf("sendWindow: exp=%d or %d, got=%d", expWithoutWindowIncrease, expWithWindowIncrease, sendWindow) } func TestSession_sendMsg_Timeout(t *testing.T) { diff --git a/stream.go b/stream.go index 024ad70..25f3036 100644 --- a/stream.go +++ b/stream.go @@ -2,6 +2,7 @@ package yamux import ( "io" + "math" "sync" "sync/atomic" "time" @@ -33,6 +34,9 @@ type Stream struct { id uint32 session *Session + recvWindow uint32 + epochStart time.Time + state streamState writeState, readState halfStreamState stateLock sync.Mutex @@ -48,6 +52,7 @@ type Stream struct { // newStream is used to construct a new stream within // a given session for an ID func newStream(session *Session, id uint32, state streamState) *Stream { + initialStreamWindow := session.config.InitialStreamWindowSize s := &Stream{ id: id, session: session, @@ -56,6 +61,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream { readDeadline: makePipeDeadline(), writeDeadline: makePipeDeadline(), recvBuf: newSegmentedBuffer(initialStreamWindow), + recvWindow: initialStreamWindow, + epochStart: time.Now(), recvNotifyCh: make(chan struct{}, 1), sendNotifyCh: make(chan struct{}, 1), } @@ -202,16 +209,25 @@ func (s *Stream) sendWindowUpdate() error { // Determine the flags if any flags := s.sendFlags() - // Determine the delta update - max := s.session.config.MaxStreamWindowSize - - // Update our window - needed, delta := s.recvBuf.GrowTo(max, flags != 0) + // Update the receive window. + needed, delta := s.recvBuf.GrowTo(s.recvWindow, flags != 0) if !needed { return nil } - - // Send the header + now := time.Now() + if rtt := s.session.getRTT(); rtt > 0 && now.Sub(s.epochStart) < rtt*4 { + var recvWindow uint32 + if s.recvWindow > math.MaxUint32/2 { + recvWindow = min(math.MaxUint32, s.session.config.MaxStreamWindowSize) + } else { + recvWindow = min(s.recvWindow*2, s.session.config.MaxStreamWindowSize) + } + if recvWindow > s.recvWindow { + s.recvWindow = recvWindow + _, delta = s.recvBuf.GrowTo(s.recvWindow, true) + } + } + s.epochStart = now hdr := encode(typeWindowUpdate, flags, s.id, delta) return s.session.sendMsg(hdr, nil, nil) }