Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use batched reads #3142

Merged
merged 1 commit into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conn_helper_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ const (
msgTypeIPv4PKTINFO = unix.IP_PKTINFO
msgTypeIPv6PKTINFO = 0x2e
)

// ReadBatch only returns a single packet on OSX,
// see https://godoc.org/golang.org/x/net/ipv4#PacketConn.ReadBatch.
const batchSize = 1
2 changes: 2 additions & 0 deletions conn_helper_freebsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ const (
msgTypeIPv4PKTINFO = 0x7
msgTypeIPv6PKTINFO = 0x2e
)

const batchSize = 8
2 changes: 2 additions & 0 deletions conn_helper_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ const (
msgTypeIPv4PKTINFO = unix.IP_PKTINFO
msgTypeIPv6PKTINFO = unix.IPV6_PKTINFO
)

const batchSize = 8 // needs to smaller than MaxUint8 (otherwise the type of oobConn.readPos has to be changed)
79 changes: 60 additions & 19 deletions conn_oob.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,27 @@ import (
"time"
"unsafe"

"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"golang.org/x/sys/unix"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
)

const ecnMask uint8 = 0x3
const (
ecnMask = 0x3
oobBufferSize = 128
)

// Contrary to what the naming suggests, the ipv{4,6}.Message is not dependent on the IP version.
// They're both just aliases for x/net/internal/socket.Message.
// This means we can use this struct to read from a socket that receives both IPv4 and IPv6 messages.
var _ ipv4.Message = ipv6.Message{}

type batchConn interface {
ReadBatch(ms []ipv4.Message, flags int) (int, error)
}

func inspectReadBuffer(c interface{}) (int, error) {
conn, ok := c.(interface {
Expand All @@ -43,7 +57,12 @@ func inspectReadBuffer(c interface{}) (int, error) {

type oobConn struct {
OOBCapablePacketConn
oobBuffer []byte
batchConn batchConn

readPos uint8
// Packets received from the kernel, but not yet returned by ReadPacket().
messages []ipv4.Message
buffers [batchSize]*packetBuffer
}

var _ connection = &oobConn{}
Expand Down Expand Up @@ -94,23 +113,41 @@ func newConn(c OOBCapablePacketConn) (*oobConn, error) {
return nil, errors.New("activating packet info failed for both IPv4 and IPv6")
}
}
return &oobConn{
oobConn := &oobConn{
OOBCapablePacketConn: c,
oobBuffer: make([]byte, 128),
}, nil
batchConn: ipv4.NewPacketConn(c),
messages: make([]ipv4.Message, batchSize),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's a bit less error-prone if you keep a [batchSize]ipv4.Message array and a currentMessages slice over that as members? That could allow you to get rid of the readPos tracking…

readPos: batchSize,
}
for i := 0; i < batchSize; i++ {
oobConn.messages[i].OOB = make([]byte, oobBufferSize)
}
return oobConn, nil
}

func (c *oobConn) ReadPacket() (*receivedPacket, error) {
buffer := getPacketBuffer()
// The packet size should not exceed protocol.MaxPacketBufferSize bytes
// If it does, we only read a truncated packet, which will then end up undecryptable
buffer.Data = buffer.Data[:protocol.MaxPacketBufferSize]
c.oobBuffer = c.oobBuffer[:cap(c.oobBuffer)]
n, oobn, _, addr, err := c.OOBCapablePacketConn.ReadMsgUDP(buffer.Data, c.oobBuffer)
if err != nil {
return nil, err
if len(c.messages) == int(c.readPos) { // all messages read. Read the next batch of messages.
c.messages = c.messages[:batchSize]
// replace buffers data buffers up to the packet that has been consumed during the last ReadBatch call
for i := uint8(0); i < c.readPos; i++ {
buffer := getPacketBuffer()
buffer.Data = buffer.Data[:protocol.MaxPacketBufferSize]
c.buffers[i] = buffer
c.messages[i].Buffers = [][]byte{c.buffers[i].Data}
}
c.readPos = 0

n, err := c.batchConn.ReadBatch(c.messages, 0)
if n == 0 || err != nil {
return nil, err
}
c.messages = c.messages[:n]
}
ctrlMsgs, err := unix.ParseSocketControlMessage(c.oobBuffer[:oobn])

msg := c.messages[c.readPos]
buffer := c.buffers[c.readPos]
c.readPos++
ctrlMsgs, err := unix.ParseSocketControlMessage(msg.OOB[:msg.NN])
if err != nil {
return nil, err
}
Expand All @@ -129,13 +166,15 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
// struct in_addr ipi_addr; /* Header Destination
// address */
// };
ip := make([]byte, 4)
if len(ctrlMsg.Data) == 12 {
ifIndex = binary.LittleEndian.Uint32(ctrlMsg.Data)
destIP = net.IP(ctrlMsg.Data[8:12])
copy(ip, ctrlMsg.Data[8:12])
} else if len(ctrlMsg.Data) == 4 {
// FreeBSD
destIP = net.IP(ctrlMsg.Data)
copy(ip, ctrlMsg.Data)
}
destIP = net.IP(ip)
}
}
if ctrlMsg.Header.Level == unix.IPPROTO_IPV6 {
Expand All @@ -148,7 +187,9 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
// unsigned int ipi6_ifindex; /* send/recv interface index */
// };
if len(ctrlMsg.Data) == 20 {
destIP = net.IP(ctrlMsg.Data[:16])
ip := make([]byte, 16)
copy(ip, ctrlMsg.Data[:16])
destIP = net.IP(ip)
ifIndex = binary.LittleEndian.Uint32(ctrlMsg.Data[16:])
}
}
Expand All @@ -162,9 +203,9 @@ func (c *oobConn) ReadPacket() (*receivedPacket, error) {
}
}
return &receivedPacket{
remoteAddr: addr,
remoteAddr: msg.Addr,
rcvTime: time.Now(),
data: buffer.Data[:n],
data: msg.Buffers[0][:msg.N],
ecn: ecn,
info: info,
buffer: buffer,
Expand Down
46 changes: 44 additions & 2 deletions conn_oob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
package quic

import (
"fmt"
"net"
"time"

"golang.org/x/net/ipv4"
"golang.org/x/sys/unix"

"github.com/golang/mock/gomock"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"

Expand All @@ -21,14 +24,14 @@ var _ = Describe("OOB Conn Test", func() {
Expect(err).ToNot(HaveOccurred())
udpConn, err := net.ListenUDP(network, addr)
Expect(err).ToNot(HaveOccurred())
ecnConn, err := newConn(udpConn)
oobConn, err := newConn(udpConn)
Expect(err).ToNot(HaveOccurred())

packetChan := make(chan *receivedPacket)
go func() {
defer GinkgoRecover()
for {
p, err := ecnConn.ReadPacket()
p, err := oobConn.ReadPacket()
if err != nil {
return
}
Expand Down Expand Up @@ -197,4 +200,43 @@ var _ = Describe("OOB Conn Test", func() {
Expect(p.info.addr).To(Equal(ip6))
})
})

Context("Batch Reading", func() {
var batchConn *MockBatchConn

BeforeEach(func() {
batchConn = NewMockBatchConn(mockCtrl)
})

It("reads multiple messages in one batch", func() {
const numMsgRead = batchSize/2 + 1
var counter int
batchConn.EXPECT().ReadBatch(gomock.Any(), gomock.Any()).DoAndReturn(func(ms []ipv4.Message, flags int) (int, error) {
Expect(ms).To(HaveLen(batchSize))
for i := 0; i < numMsgRead; i++ {
Expect(ms[i].Buffers).To(HaveLen(1))
Expect(ms[i].Buffers[0]).To(HaveLen(int(protocol.MaxPacketBufferSize)))
data := []byte(fmt.Sprintf("message %d", counter))
counter++
ms[i].Buffers[0] = data
ms[i].N = len(data)
}
return numMsgRead, nil
}).Times(2)

addr, err := net.ResolveUDPAddr("udp", "localhost:0")
Expect(err).ToNot(HaveOccurred())
udpConn, err := net.ListenUDP("udp", addr)
Expect(err).ToNot(HaveOccurred())
oobConn, err := newConn(udpConn)
Expect(err).ToNot(HaveOccurred())
oobConn.batchConn = batchConn

for i := 0; i < batchSize+1; i++ {
p, err := oobConn.ReadPacket()
Expect(err).ToNot(HaveOccurred())
Expect(string(p.data)).To(Equal(fmt.Sprintf("message %d", i)))
}
})
})
})
50 changes: 50 additions & 0 deletions mock_batch_conn_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mockgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ package quic
//go:generate sh -c "./mockgen_private.sh quic mock_unknown_packet_handler_test.go github.com/lucas-clemente/quic-go unknownPacketHandler"
//go:generate sh -c "./mockgen_private.sh quic mock_packet_handler_manager_test.go github.com/lucas-clemente/quic-go packetHandlerManager"
//go:generate sh -c "./mockgen_private.sh quic mock_multiplexer_test.go github.com/lucas-clemente/quic-go multiplexer"
//go:generate sh -c "./mockgen_private.sh quic mock_batch_conn_test.go github.com/lucas-clemente/quic-go batchConn"
//go:generate sh -c "mockgen -package quic -self_package github.com/lucas-clemente/quic-go -destination mock_token_store_test.go github.com/lucas-clemente/quic-go TokenStore && goimports -w mock_token_store_test.go"
//go:generate sh -c "mockgen -package quic -self_package github.com/lucas-clemente/quic-go -destination mock_packetconn_test.go net PacketConn && goimports -w mock_packetconn_test.go"