Skip to content

Commit aa8b049

Browse files
committed
error-codes: add implementation for quic, yamux, websockets, webrtc
1 parent 08f9d22 commit aa8b049

34 files changed

+592
-72
lines changed

core/network/conn.go

+29
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package network
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67

78
ic "github.com/libp2p/go-libp2p/core/crypto"
@@ -11,6 +12,29 @@ import (
1112
ma "github.com/multiformats/go-multiaddr"
1213
)
1314

15+
type ConnErrorCode uint32
16+
17+
type ConnError struct {
18+
Remote bool
19+
ErrorCode ConnErrorCode
20+
TransportError error
21+
}
22+
23+
func (c *ConnError) Error() string {
24+
side := "local"
25+
if c.Remote {
26+
side = "remote"
27+
}
28+
if c.TransportError != nil {
29+
return fmt.Sprintf("connection closed (%s): code: %d: transport error: %s", side, c.ErrorCode, c.TransportError)
30+
}
31+
return fmt.Sprintf("connection closed (%s): code: %d", side, c.ErrorCode)
32+
}
33+
34+
func (c *ConnError) Unwrap() error {
35+
return c.TransportError
36+
}
37+
1438
// Conn is a connection to a remote peer. It multiplexes streams.
1539
// Usually there is no need to use a Conn directly, but it may
1640
// be useful to get information about the peer on the other side:
@@ -24,6 +48,11 @@ type Conn interface {
2448
ConnStat
2549
ConnScoper
2650

51+
// CloseWithError closes the connection with errCode. The errCode is sent to the
52+
// peer on a best effort basis. For transports that do not support sending error
53+
// codes on connection close, the behavior is identical to calling Close.
54+
CloseWithError(errCode ConnErrorCode) error
55+
2756
// ID returns an identifier that uniquely identifies this Conn within this
2857
// host, during this run. Connection IDs may repeat across restarts.
2958
ID() string

core/network/mux.go

+41
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package network
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"io"
78
"net"
89
"time"
@@ -11,6 +12,33 @@ import (
1112
// ErrReset is returned when reading or writing on a reset stream.
1213
var ErrReset = errors.New("stream reset")
1314

15+
type StreamErrorCode uint32
16+
17+
type StreamError struct {
18+
ErrorCode StreamErrorCode
19+
Remote bool
20+
TransportError error
21+
}
22+
23+
func (s *StreamError) Error() string {
24+
side := "local"
25+
if s.Remote {
26+
side = "remote"
27+
}
28+
if s.TransportError != nil {
29+
return fmt.Sprintf("stream reset (%s): code: %d: transport error: %s", side, s.ErrorCode, s.TransportError)
30+
}
31+
return fmt.Sprintf("stream reset (%s): code: %d", side, s.ErrorCode)
32+
}
33+
34+
func (s *StreamError) Is(target error) bool {
35+
return target == ErrReset
36+
}
37+
38+
func (s *StreamError) Unwrap() error {
39+
return s.TransportError
40+
}
41+
1442
// MuxedStream is a bidirectional io pipe within a connection.
1543
type MuxedStream interface {
1644
io.Reader
@@ -61,6 +89,13 @@ type MuxedStream interface {
6189
SetWriteDeadline(time.Time) error
6290
}
6391

92+
type ResetWithErrorer interface {
93+
// ResetWithError closes both ends of the stream with errCode. The errCode is sent
94+
// to the peer on a best effort basis. For transports that do not support sending
95+
// error codes to remote peer, the behavior is identical to calling Reset
96+
ResetWithError(errCode StreamErrorCode) error
97+
}
98+
6499
// MuxedConn represents a connection to a remote peer that has been
65100
// extended to support stream multiplexing.
66101
//
@@ -86,6 +121,12 @@ type MuxedConn interface {
86121
AcceptStream() (MuxedStream, error)
87122
}
88123

124+
type CloseWithErrorer interface {
125+
// CloseWithError closes the connection with errCode. The errCode is sent
126+
// to the peer.
127+
CloseWithError(errCode ConnErrorCode) error
128+
}
129+
89130
// Multiplexer wraps a net.Conn with a stream multiplexing
90131
// implementation and returns a MuxedConn that supports opening
91132
// multiple streams over the underlying net.Conn

core/network/stream.go

+4
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,8 @@ type Stream interface {
2727

2828
// Scope returns the user's view of this stream's resource scope
2929
Scope() StreamScope
30+
31+
// ResetWithError closes both ends of the stream with errCode. The errCode is sent
32+
// to the peer.
33+
ResetWithError(errCode StreamErrorCode) error
3034
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ require (
3232
github.com/libp2p/go-nat v0.2.0
3333
github.com/libp2p/go-netroute v0.2.1
3434
github.com/libp2p/go-reuseport v0.4.0
35-
github.com/libp2p/go-yamux/v4 v4.0.1
35+
github.com/libp2p/go-yamux/v4 v4.0.2-0.20241120100319-39abe7ed206a
3636
github.com/libp2p/zeroconf/v2 v2.2.0
3737
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
3838
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ github.com/libp2p/go-netroute v0.2.1 h1:V8kVrpD8GK0Riv15/7VN6RbUQ3URNZVosw7H2v9t
194194
github.com/libp2p/go-netroute v0.2.1/go.mod h1:hraioZr0fhBjG0ZRXJJ6Zj2IVEVNx6tDTFQfSmcq7mQ=
195195
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
196196
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
197-
github.com/libp2p/go-yamux/v4 v4.0.1 h1:FfDR4S1wj6Bw2Pqbc8Uz7pCxeRBPbwsBbEdfwiCypkQ=
198-
github.com/libp2p/go-yamux/v4 v4.0.1/go.mod h1:NWjl8ZTLOGlozrXSOZ/HlfG++39iKNnM5wwmtQP1YB4=
197+
github.com/libp2p/go-yamux/v4 v4.0.2-0.20241120100319-39abe7ed206a h1:zc7jPWFFQibZbACDyQdEAWg7yG/fjx5Jmg6djtpjKog=
198+
github.com/libp2p/go-yamux/v4 v4.0.2-0.20241120100319-39abe7ed206a/go.mod h1:PGP+3py2ZWDKABvqstBZtMnixEHNC7U/odnGylzur5o=
199199
github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv53Q=
200200
github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs=
201201
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=

p2p/muxer/testsuite/mux.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
crand "crypto/rand"
7+
"errors"
78
"fmt"
89
"io"
910
mrand "math/rand"
@@ -462,7 +463,7 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
462463
time.Sleep(time.Millisecond * 50)
463464

464465
_, err = s.Write([]byte("foo"))
465-
if err != network.ErrReset {
466+
if !errors.Is(err, network.ErrReset) {
466467
t.Error("should have been stream reset")
467468
}
468469
s.Close()

p2p/muxer/yamux/conn.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ func (c *conn) Close() error {
2323
return c.yamux().Close()
2424
}
2525

26+
func (c *conn) CloseWithError(errCode network.ConnErrorCode) error {
27+
return c.yamux().CloseWithError(uint32(errCode))
28+
}
29+
2630
// IsClosed checks if yamux.Session is in closed state.
2731
func (c *conn) IsClosed() bool {
2832
return c.yamux().IsClosed()
@@ -32,7 +36,7 @@ func (c *conn) IsClosed() bool {
3236
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
3337
s, err := c.yamux().OpenStream(ctx)
3438
if err != nil {
35-
return nil, err
39+
return nil, parseResetError(err)
3640
}
3741

3842
return (*stream)(s), nil
@@ -41,7 +45,7 @@ func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
4145
// AcceptStream accepts a stream opened by the other side.
4246
func (c *conn) AcceptStream() (network.MuxedStream, error) {
4347
s, err := c.yamux().AcceptStream()
44-
return (*stream)(s), err
48+
return (*stream)(s), parseResetError(err)
4549
}
4650

4751
func (c *conn) yamux() *yamux.Session {

p2p/muxer/yamux/stream.go

+27-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package yamux
22

33
import (
4+
"errors"
5+
"fmt"
46
"time"
57

68
"github.com/libp2p/go-libp2p/core/network"
@@ -13,22 +15,33 @@ type stream yamux.Stream
1315

1416
var _ network.MuxedStream = &stream{}
1517

16-
func (s *stream) Read(b []byte) (n int, err error) {
17-
n, err = s.yamux().Read(b)
18-
if err == yamux.ErrStreamReset {
19-
err = network.ErrReset
18+
func parseResetError(err error) error {
19+
if err == nil {
20+
return err
21+
}
22+
se := &yamux.StreamError{}
23+
if errors.As(err, &se) {
24+
return &network.StreamError{Remote: se.Remote, ErrorCode: network.StreamErrorCode(se.ErrorCode)}
2025
}
26+
ce := &yamux.GoAwayError{}
27+
if errors.As(err, &ce) {
28+
return &network.ConnError{Remote: ce.Remote, ErrorCode: network.ConnErrorCode(ce.ErrorCode)}
29+
}
30+
// TODO: How should we handle resets for reason other than a remote error
31+
if errors.Is(err, yamux.ErrStreamReset) {
32+
return fmt.Errorf("%w: %w", network.ErrReset, err)
33+
}
34+
return err
35+
}
2136

22-
return n, err
37+
func (s *stream) Read(b []byte) (n int, err error) {
38+
n, err = s.yamux().Read(b)
39+
return n, parseResetError(err)
2340
}
2441

2542
func (s *stream) Write(b []byte) (n int, err error) {
2643
n, err = s.yamux().Write(b)
27-
if err == yamux.ErrStreamReset {
28-
err = network.ErrReset
29-
}
30-
31-
return n, err
44+
return n, parseResetError(err)
3245
}
3346

3447
func (s *stream) Close() error {
@@ -39,6 +52,10 @@ func (s *stream) Reset() error {
3952
return s.yamux().Reset()
4053
}
4154

55+
func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
56+
return s.yamux().ResetWithError(uint32(errCode))
57+
}
58+
4259
func (s *stream) CloseRead() error {
4360
return s.yamux().CloseRead()
4461
}

p2p/net/connmgr/connmgr_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,7 @@ type mockConn struct {
794794
}
795795

796796
func (m mockConn) Close() error { panic("implement me") }
797+
func (m mockConn) CloseWithError(errCode network.ConnErrorCode) error { panic("implement me") }
797798
func (m mockConn) LocalPeer() peer.ID { panic("implement me") }
798799
func (m mockConn) RemotePeer() peer.ID { panic("implement me") }
799800
func (m mockConn) RemotePublicKey() crypto.PubKey { panic("implement me") }

p2p/net/mock/mock_conn.go

+4
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,7 @@ func (c *conn) Stat() network.ConnStats {
185185
func (c *conn) Scope() network.ConnScope {
186186
return &network.NullScope{}
187187
}
188+
189+
func (c *conn) CloseWithError(_ network.ConnErrorCode) error {
190+
return c.Close()
191+
}

p2p/net/mock/mock_stream.go

+4
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ func (s *stream) Reset() error {
144144
return nil
145145
}
146146

147+
func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
148+
panic("not implemented")
149+
}
150+
147151
func (s *stream) teardown() {
148152
// at this point, no streams are writing.
149153
s.conn.removeStream(s)

p2p/net/swarm/swarm.go

+8
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,14 @@ func (c connWithMetrics) Close() error {
840840
return c.CapableConn.Close()
841841
}
842842

843+
func (c connWithMetrics) CloseWithError(errCode network.ConnErrorCode) error {
844+
c.metricsTracer.ClosedConnection(c.dir, time.Since(c.opened), c.ConnState(), c.LocalMultiaddr())
845+
if ce, ok := c.CapableConn.(network.CloseWithErrorer); ok {
846+
return ce.CloseWithError(errCode)
847+
}
848+
return c.CapableConn.Close()
849+
}
850+
843851
func (c connWithMetrics) Stat() network.ConnStats {
844852
if cs, ok := c.CapableConn.(network.ConnStat); ok {
845853
return cs.Stat()

p2p/net/swarm/swarm_conn.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,20 @@ func (c *Conn) ID() string {
5858
// open notifications must finish before we can fire off the close
5959
// notifications).
6060
func (c *Conn) Close() error {
61-
c.closeOnce.Do(c.doClose)
61+
c.closeOnce.Do(func() {
62+
c.doClose(0)
63+
})
6264
return c.err
6365
}
6466

65-
func (c *Conn) doClose() {
67+
func (c *Conn) CloseWithError(errCode network.ConnErrorCode) error {
68+
c.closeOnce.Do(func() {
69+
c.doClose(errCode)
70+
})
71+
return c.err
72+
}
73+
74+
func (c *Conn) doClose(errCode network.ConnErrorCode) {
6675
c.swarm.removeConn(c)
6776

6877
// Prevent new streams from opening.
@@ -71,7 +80,15 @@ func (c *Conn) doClose() {
7180
c.streams.m = nil
7281
c.streams.Unlock()
7382

74-
c.err = c.conn.Close()
83+
if errCode != 0 {
84+
if ce, ok := c.conn.(network.CloseWithErrorer); ok {
85+
c.err = ce.CloseWithError(errCode)
86+
} else {
87+
c.err = c.conn.Close()
88+
}
89+
} else {
90+
c.err = c.conn.Close()
91+
}
7592

7693
// Send the connectedness event after closing the connection.
7794
// This ensures that both remote connection close and local connection

p2p/net/swarm/swarm_stream.go

+11
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,17 @@ func (s *Stream) Reset() error {
9191
return err
9292
}
9393

94+
func (s *Stream) ResetWithError(errCode network.StreamErrorCode) error {
95+
var err error
96+
if se, ok := s.stream.(network.ResetWithErrorer); ok {
97+
err = se.ResetWithError(errCode)
98+
} else {
99+
err = s.stream.Reset()
100+
}
101+
s.closeAndRemoveStream()
102+
return err
103+
}
104+
94105
func (s *Stream) closeAndRemoveStream() {
95106
s.closeMx.Lock()
96107
defer s.closeMx.Unlock()

p2p/net/swarm/swarm_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func TestResourceManagerAcceptStream(t *testing.T) {
538538
if err == nil {
539539
_, err = str.Read([]byte{0})
540540
}
541-
require.EqualError(t, err, "stream reset")
541+
require.ErrorContains(t, err, "stream reset")
542542
}
543543

544544
func TestListenCloseCount(t *testing.T) {

p2p/net/upgrader/conn.go

+7
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,10 @@ func (t *transportConn) ConnState() network.ConnectionState {
6363
UsedEarlyMuxerNegotiation: t.usedEarlyMuxerNegotiation,
6464
}
6565
}
66+
67+
func (t *transportConn) CloseWithError(errCode network.ConnErrorCode) error {
68+
if ce, ok := t.MuxedConn.(network.CloseWithErrorer); ok {
69+
return ce.CloseWithError(errCode)
70+
}
71+
return t.Close()
72+
}

0 commit comments

Comments
 (0)