-
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathproxy.go
168 lines (150 loc) · 4.08 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package masque
import (
"context"
"io"
"log"
"net"
"net/http"
"sync"
"sync/atomic"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/quic-go/quic-go/quicvarint"
)
const (
uriTemplateTargetHost = "target_host"
uriTemplateTargetPort = "target_port"
)
var contextIDZero = quicvarint.Append([]byte{}, 0)
type proxyEntry struct {
str http3.Stream
conn *net.UDPConn
}
// A Proxy is an RFC 9298 CONNECT-UDP proxy.
type Proxy struct {
closed atomic.Bool
mx sync.Mutex
refCount sync.WaitGroup // counter for the Go routines spawned in Upgrade
conns map[proxyEntry]struct{}
}
// Proxy proxies a request on a newly created connected UDP socket.
// For more control over the UDP socket, use ProxyConnectedSocket.
// Applications may add custom header fields to the response header,
// but MUST NOT call WriteHeader on the http.ResponseWriter.
func (s *Proxy) Proxy(w http.ResponseWriter, r *Request) error {
if s.closed.Load() {
w.WriteHeader(http.StatusServiceUnavailable)
return net.ErrClosed
}
addr, err := net.ResolveUDPAddr("udp", r.Target)
if err != nil {
// TODO(#2): set proxy-status header (might want to use structured headers)
w.WriteHeader(http.StatusGatewayTimeout)
return err
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
// TODO(#2): set proxy-status header (might want to use structured headers)
w.WriteHeader(http.StatusGatewayTimeout)
return err
}
defer conn.Close()
return s.ProxyConnectedSocket(w, r, conn)
}
// ProxyConnectedSocket proxies a request on a connected UDP socket.
// Applications may add custom header fields to the response header,
// but MUST NOT call WriteHeader on the http.ResponseWriter.
// It closes the connection before returning.
func (s *Proxy) ProxyConnectedSocket(w http.ResponseWriter, _ *Request, conn *net.UDPConn) error {
if s.closed.Load() {
conn.Close()
w.WriteHeader(http.StatusServiceUnavailable)
return net.ErrClosed
}
s.refCount.Add(1)
defer s.refCount.Done()
w.Header().Set(http3.CapsuleProtocolHeader, capsuleProtocolHeaderValue)
w.WriteHeader(http.StatusOK)
str := w.(http3.HTTPStreamer).HTTPStream()
s.mx.Lock()
if s.conns == nil {
s.conns = make(map[proxyEntry]struct{})
}
s.conns[proxyEntry{str: str, conn: conn}] = struct{}{}
s.mx.Unlock()
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
if err := s.proxyConnSend(conn, str); err != nil {
log.Printf("proxying send side to %s failed: %v", conn.RemoteAddr(), err)
}
str.Close()
}()
go func() {
defer wg.Done()
if err := s.proxyConnReceive(conn, str); err != nil && !s.closed.Load() {
log.Printf("proxying receive side to %s failed: %v", conn.RemoteAddr(), err)
}
str.Close()
}()
go func() {
defer wg.Done()
// discard all capsules sent on the request stream
if err := skipCapsules(quicvarint.NewReader(str)); err == io.EOF {
log.Printf("reading from request stream failed: %v", err)
}
str.Close()
conn.Close()
}()
wg.Wait()
return nil
}
func (s *Proxy) proxyConnSend(conn *net.UDPConn, str http3.Stream) error {
for {
data, err := str.ReceiveDatagram(context.Background())
if err != nil {
return err
}
contextID, n, err := quicvarint.Parse(data)
if err != nil {
return err
}
if contextID != 0 {
// Drop this datagram. We currently only support proxying of UDP payloads.
continue
}
if _, err := conn.Write(data[n:]); err != nil {
return err
}
}
}
func (s *Proxy) proxyConnReceive(conn *net.UDPConn, str http3.Stream) error {
b := make([]byte, 1500)
for {
n, err := conn.Read(b)
if err != nil {
return err
}
data := make([]byte, 0, len(contextIDZero)+n)
data = append(data, contextIDZero...)
data = append(data, b[:n]...)
if err := str.SendDatagram(data); err != nil {
return err
}
}
}
// Close closes the proxy, immeidately terminating all proxied flows.
func (s *Proxy) Close() error {
s.closed.Store(true)
s.mx.Lock()
for entry := range s.conns {
entry.str.CancelRead(quic.StreamErrorCode(http3.ErrCodeNoError))
entry.str.Close()
entry.conn.Close()
}
s.conns = nil
s.mx.Unlock()
s.refCount.Wait()
return nil
}