Skip to content

Commit

Permalink
Merge pull request #54 from libp2p/rcv-window-tuning-new
Browse files Browse the repository at this point in the history
increase the receive window size if we're sending updates to frequently
  • Loading branch information
Stebalien committed May 5, 2021
2 parents 9b04322 + f6e177c commit 82173f2
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 28 deletions.
3 changes: 2 additions & 1 deletion const.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ const (

const (
// initialStreamWindow is the initial stream window size
initialStreamWindow uint32 = 256 * 1024
initialStreamWindow uint32 = 64 * 1024
maxStreamWindow uint32 = 16 * 1024 * 1024
)

const (
Expand Down
30 changes: 18 additions & 12 deletions mux.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package yamux

import (
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -30,6 +31,10 @@ type Config struct {
// an expectation that things will move along quickly.
ConnectionWriteTimeout time.Duration

// InitialStreamWindowSize is used to control the initial
// window size that we allow for a stream.
InitialStreamWindowSize uint32

// MaxStreamWindowSize is used to control the maximum
// window size that we allow for a stream.
MaxStreamWindowSize uint32
Expand All @@ -55,16 +60,17 @@ type Config struct {
// DefaultConfig is used to return a default configuration
func DefaultConfig() *Config {
return &Config{
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
MaxStreamWindowSize: initialStreamWindow,
LogOutput: os.Stderr,
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024,
WriteCoalesceDelay: 100 * time.Microsecond,
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
InitialStreamWindowSize: initialStreamWindow,
MaxStreamWindowSize: maxStreamWindow,
LogOutput: os.Stderr,
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024,
WriteCoalesceDelay: 100 * time.Microsecond,
}
}

Expand All @@ -76,8 +82,8 @@ func VerifyConfig(config *Config) error {
if config.KeepAliveInterval == 0 {
return fmt.Errorf("keep-alive interval must be positive")
}
if config.MaxStreamWindowSize < initialStreamWindow {
return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
if config.MaxStreamWindowSize < config.InitialStreamWindowSize {
return errors.New("MaxStreamWindowSize must be larger than the InitialStreamWindowSize")
}
if config.MaxMessageSize < 1024 {
return fmt.Errorf("MaxMessageSize must be greater than a kilobyte")
Expand Down
16 changes: 16 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
// Session is used to wrap a reliable ordered connection and to
// multiplex it into multiple streams.
type Session struct {
rtt int64 // to be accessed atomically, in nanoseconds

// remoteGoAway indicates the remote side does
// not want futher connections. Must be first for alignment.
remoteGoAway int32
Expand Down Expand Up @@ -129,6 +131,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
}
go s.recv()
go s.send()
go s.measureRTT()
return s
}

Expand Down Expand Up @@ -291,6 +294,19 @@ func (s *Session) goAway(reason uint32) header {
return hdr
}

func (s *Session) measureRTT() {
rtt, err := s.Ping()
if err != nil {
return
}
atomic.StoreInt64(&s.rtt, rtt.Nanoseconds())
}

// 0 if we don't yet have a measurement
func (s *Session) getRTT() time.Duration {
return time.Duration(atomic.LoadInt64(&s.rtt))
}

// Ping is used to measure the RTT response time
func (s *Session) Ping() (dur time.Duration, err error) {
// Prepare a ping.
Expand Down
2 changes: 1 addition & 1 deletion session_norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestLargeWindow(t *testing.T) {
if err != nil {
t.Fatal(err)
}
buf := make([]byte, conf.MaxStreamWindowSize)
buf := make([]byte, initialStreamWindow)
n, err := stream.Write(buf)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down
81 changes: 74 additions & 7 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
wg.Add(1)

// Choose a huge flood size that we know will result in a window update.
flood := int64(client.config.MaxStreamWindowSize)
flood := int64(initialStreamWindow)
var wr *Stream

// The server will accept a new stream and then flood data to it.
Expand All @@ -1180,8 +1180,8 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
}

sendWindow := atomic.LoadUint32(&wr.sendWindow)
if sendWindow != client.config.MaxStreamWindowSize {
t.Errorf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, sendWindow)
if sendWindow != initialStreamWindow {
t.Errorf("sendWindow: exp=%d, got=%d", client.config.InitialStreamWindowSize, sendWindow)
return
}

Expand Down Expand Up @@ -1215,22 +1215,89 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
}

var (
exp = uint32(flood / 2)
sendWindow uint32
expWithoutWindowIncrease = uint32(flood / 2)
expWithWindowIncrease = uint32(flood)
sendWindow uint32
)

// This test is racy. Wait a short period, then longer and longer. At
// most ~1s.
for i := 1; i < 15; i++ {
time.Sleep(time.Duration(i*i) * time.Millisecond)
sendWindow = atomic.LoadUint32(&wr.sendWindow)
if sendWindow == exp {
if sendWindow == expWithoutWindowIncrease || sendWindow == expWithWindowIncrease {
return
}
}
t.Errorf("sendWindow: exp=%d, got=%d", exp, sendWindow)
t.Errorf("sendWindow: exp=%d or %d, got=%d", expWithoutWindowIncrease, expWithWindowIncrease, sendWindow)
}

// func TestSession_WindowAutoSizing(t *testing.T) {
// const initialWindow uint32 = 10
// conf := testConfNoKeepAlive()
// conf.InitialStreamWindowSize = initialWindow
// client, server := testClientServerConfig(conf)
// defer client.Close()
// defer server.Close()

// receiveAndConsume := func(str *Stream, size uint32) {
// if _, err := str.Read(make([]byte, size)); err != nil {
// t.Fatal(err)
// }
// }

// clientStr, err := client.OpenStream(context.Background())
// if err != nil {
// t.Fatal(err)
// }
// serverStr, err := server.AcceptStream()
// if err != nil {
// t.Fatal(err)
// }

// const rtt = 20 * time.Millisecond
// t.Run("finding the window size", func(t *testing.T) {
// // Consume a maximum of 1234 bytes per RTT.
// // We expect the window to be scaled such that we send one update every 2 RTTs.
// go func() {
// for {
// serverStr.Write(make([]byte, 100))
// }
// }()

// var counter int
// ticker := time.NewTicker(rtt)
// for range ticker.C {
// receiveAndConsume(clientStr, 1234)
// counter++
// if counter > 25 {
// break
// }
// }
// fmt.Println(clientStr.recvWindow)
// })
// // t.Run("capping the window size", func(t *testing.T) {
// // const maxWindow = 78 * initialWindow
// // buf := newSegmentedBuffer(initialWindow, maxWindow, func() time.Duration { return rtt })
// // start := time.Now()
// // // Consume a maximum of 1234 bytes per RTT.
// // // We expect the window to be scaled such that we send one update every 2 RTTs.
// // now := start
// // delta := initialWindow
// // for i := 0; i < 100; i++ {
// // now = now.Add(rtt)
// // receiveAndConsume(&buf, delta)
// // grow, d := buf.GrowTo(false, now)
// // if grow {
// // delta = d
// // }
// // }
// // if buf.windowSize != maxWindow {
// // t.Fatalf("expected the window size to be at max (%d), got %d", maxWindow, buf.windowSize)
// // }
// // })
// }

func TestSession_sendMsg_Timeout(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
defer client.Close()
Expand Down
30 changes: 23 additions & 7 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package yamux

import (
"io"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -33,6 +34,9 @@ type Stream struct {
id uint32
session *Session

recvWindow uint32
epochStart time.Time

state streamState
writeState, readState halfStreamState
stateLock sync.Mutex
Expand All @@ -48,6 +52,7 @@ type Stream struct {
// newStream is used to construct a new stream within
// a given session for an ID
func newStream(session *Session, id uint32, state streamState) *Stream {
initialStreamWindow := session.config.InitialStreamWindowSize
s := &Stream{
id: id,
session: session,
Expand All @@ -56,6 +61,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(),
recvBuf: newSegmentedBuffer(initialStreamWindow),
recvWindow: initialStreamWindow,
epochStart: time.Now(),
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
}
Expand Down Expand Up @@ -202,16 +209,25 @@ func (s *Stream) sendWindowUpdate() error {
// Determine the flags if any
flags := s.sendFlags()

// Determine the delta update
max := s.session.config.MaxStreamWindowSize

// Update our window
needed, delta := s.recvBuf.GrowTo(max, flags != 0)
// Update the receive window.
needed, delta := s.recvBuf.GrowTo(s.recvWindow, flags != 0)
if !needed {
return nil
}

// Send the header
now := time.Now()
if rtt := s.session.getRTT(); rtt > 0 && now.Sub(s.epochStart) < rtt*4 {
var recvWindow uint32
if s.recvWindow > math.MaxUint32/2 {
recvWindow = min(math.MaxUint32, s.session.config.MaxStreamWindowSize)
} else {
recvWindow = min(s.recvWindow*2, s.session.config.MaxStreamWindowSize)
}
if recvWindow > s.recvWindow {
s.recvWindow = recvWindow
_, delta = s.recvBuf.GrowTo(s.recvWindow, true)
}
}
s.epochStart = now
hdr := encode(typeWindowUpdate, flags, s.id, delta)
return s.session.sendMsg(hdr, nil, nil)
}
Expand Down

0 comments on commit 82173f2

Please sign in to comment.