Skip to content

Commit

Permalink
Merge pull request #21 from libp2p/feat/netbench
Browse files Browse the repository at this point in the history
Add net benchmark harness
  • Loading branch information
willscott committed May 10, 2020
2 parents 74c1462 + d75c3b5 commit 99f31e5
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ go 1.12
require (
github.com/libp2p/go-libp2p-core v0.3.0
github.com/multiformats/go-multiaddr v0.2.0
google.golang.org/grpc v1.20.1
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
133 changes: 133 additions & 0 deletions net/bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package tnet

import (
"context"
"fmt"
"net"
"runtime"
"sync"
"testing"
"time"

"go.uber.org/atomic"
"google.golang.org/grpc/benchmark/latency"
)

// WriteTrackedConn provides a wrapper for tracking how many write calls are made to a network connection.
type WriteTrackedConn struct {
net.Conn
WriteCount atomic.Uint32
}

func (c *WriteTrackedConn) Write(b []byte) (n int, err error) {
c.WriteCount.Inc()
return c.Conn.Write(b)
}

// NetworkTestFunc is a benchmark function under test by `FindNetworkLimit`
type NetworkTestFunc func(b *testing.B, n1, n2 net.Conn)

// ConnectionForNetwork generates a pair of network connections with a specified latency.
func ConnectionForNetwork(n *latency.Network) (n1, n2 *WriteTrackedConn, err error) {
var wg sync.WaitGroup
wg.Add(1)

var listener net.Listener
listener, err = net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return
}
slowListener := n.Listener(listener)
go func() {
defer wg.Done()
ac, _ := slowListener.Accept()
slowListener.Close()
n2 = &WriteTrackedConn{ac, atomic.Uint32{}}
return
}()
baseDialer := net.Dialer{}
dialer := n.ContextDialer(baseDialer.DialContext)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := dialer(ctx, "tcp4", slowListener.Addr().String())
if err != nil {
return
}
n1 = &WriteTrackedConn{conn, atomic.Uint32{}}

wg.Wait()
return
}

func benchWithNet(testFunc NetworkTestFunc, n *latency.Network) func(*testing.B) {
return func(sb *testing.B) {
n1, n2, err := ConnectionForNetwork(n)
if err != nil {
sb.Error(err)
}
defer n1.Close()
defer n2.Close()
testFunc(sb, n1, n2)
sb.ReportMetric(float64(n1.WriteCount.Load()), "writes")
}
}

// FindNetworkLimit benchmarks a function to analyze CPU and network relationship
func FindNetworkLimit(testFunc NetworkTestFunc) (int, error) {
network := latency.Network{
Kbps: 0,
Latency: 0,
}

wrapperFunc := benchWithNet(testFunc, &network)

result := testing.Benchmark(wrapperFunc)
if result.N < 1 {
return 0, fmt.Errorf("failed to run benchmark")
}
fmt.Printf("Limit with no network latency: %s\n", result)

last := 1.0
network.Latency = 30 * time.Millisecond
network.Kbps = 1024 * 1 // 1 Mbps
result = testing.Benchmark(wrapperFunc)
current := (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds()
for current/(2*last) > .1 {
network.Latency *= 2
result = testing.Benchmark(wrapperFunc)
last = current
current = (float64(result.Bytes) * float64(result.N) / 1e6) / result.T.Seconds()
}
fmt.Printf("At 30ms latency, bandwidth can saturate: %dKbps\n", network.Kbps)

return network.Kbps, nil
}

// ParallelismSlowdown tracks how much overhead is incurred on a ntework bound function when parallelism contentention
// in increased.
func ParallelismSlowdown(testFunc NetworkTestFunc, kbps int, l time.Duration) (slowdown float64, err error) {
maxProcs := runtime.GOMAXPROCS(0)
runtime.GOMAXPROCS(maxProcs)
if maxProcs > 1 {
runtime.GOMAXPROCS(1)
}

network := latency.Network{
Kbps: kbps,
Latency: l,
}
prevBytes := int64(-1)
ratio := 1.0
for i := 1; i <= maxProcs; i *= 2 {
runtime.GOMAXPROCS(i)
result := testing.Benchmark(benchWithNet(testFunc, &network))
if prevBytes > 0 {
ratio = float64(result.Bytes) / float64(prevBytes)
}
prevBytes = result.Bytes
fmt.Printf("At MaxProc %d %dKbps / %s latency: %s\n", i, network.Kbps, network.Latency, result)
}
fmt.Printf("Slowdown is %f%%\n", 100*(1.0-ratio))
return (1.0 - ratio), nil
}

0 comments on commit 99f31e5

Please sign in to comment.