From 389cec4254af99eb195c3973139115c9ff617b3a Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 10 Sep 2024 10:44:52 +0800 Subject: [PATCH 1/9] reverse proxy: rewrite requests and responses for websocket over http2 --- .../caddyhttp/reverseproxy/reverseproxy.go | 15 +++++ modules/caddyhttp/reverseproxy/streaming.go | 65 +++++++++++++++---- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 44cc2f9d9a6..cdd6c22757e 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -17,6 +17,8 @@ package reverseproxy import ( "bytes" "context" + "crypto/rand" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -398,6 +400,19 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht return caddyhttp.Error(http.StatusInternalServerError, fmt.Errorf("preparing request for upstream round-trip: %v", err)) } + // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade + // TODO: once we can reliably detect backend support this, it can be removed for those backends + if r.ProtoMajor == 2 && r.Method == http.MethodConnect && r.Header.Get(":protocol") != "" { + r.Method = http.MethodGet + r.Header.Set("Upgrade", r.Header.Get(":protocol")) + r.Header.Set("Connection", "Upgrade") + key := make([]byte, 16) + _, randErr := rand.Read(key) + if randErr != nil { + return randErr + } + r.Header.Set("Sec-Websocket-Key", base64.StdEncoding.EncodeToString(key)) + } // we will need the original headers and Host value if // header operations are configured; this is so that each diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index a696ac7fbed..cfe3eea0382 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -19,6 +19,7 @@ package reverseproxy import ( + "bufio" "context" "errors" "fmt" @@ -34,6 +35,24 @@ import ( "golang.org/x/net/http/httpguts" ) +type h2ReadWriteCloser struct { + io.ReadCloser + http.ResponseWriter +} + +func (rwc h2ReadWriteCloser) Write(p []byte) (n int, err error) { + n, err = rwc.ResponseWriter.Write(p) + if err != nil { + return 0, err + } + + err = http.NewResponseController(rwc.ResponseWriter).Flush() + if err != nil { + return 0, err + } + return n, nil +} + func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, rw http.ResponseWriter, req *http.Request, res *http.Response) { reqUpType := upgradeType(req.Header) resUpType := upgradeType(res.Header) @@ -61,20 +80,44 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, // write header first, response headers should not be counted in size // like the rest of handler chain. copyHeader(rw.Header(), res.Header) - rw.WriteHeader(res.StatusCode) - logger.Debug("upgrading connection") + var ( + conn io.ReadWriteCloser + brw *bufio.ReadWriter + ) + // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade + // TODO: once we can reliably detect backend support this, it can be removed for those backends + if req.ProtoMajor == 2 && req.Method == http.MethodConnect && req.Header.Get(":protocol") != "" { + rw.Header().Del("Upgrade") + rw.Header().Del("Connection") + rw.Header().Del("Sec-Websocket-Accept") + rw.WriteHeader(http.StatusOK) + + flushErr := http.NewResponseController(rw).Flush() + if flushErr != nil { + h.logger.Error("failed to flush http2 websocket response", zap.Error(flushErr)) + return + } + conn = h2ReadWriteCloser{res.Body, rw} + // bufio is not needed, use minimal buffer + brw = bufio.NewReadWriter(bufio.NewReaderSize(conn, 1), bufio.NewWriterSize(conn, 1)) + } else { + rw.WriteHeader(res.StatusCode) + + logger.Debug("upgrading connection") - //nolint:bodyclose - conn, brw, hijackErr := http.NewResponseController(rw).Hijack() - if errors.Is(hijackErr, http.ErrNotSupported) { - h.logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw))) - return - } + var hijackErr error + //nolint:bodyclose + conn, brw, hijackErr = http.NewResponseController(rw).Hijack() + if errors.Is(hijackErr, http.ErrNotSupported) { + h.logger.Error("can't switch protocols using non-Hijacker ResponseWriter", zap.String("type", fmt.Sprintf("%T", rw))) + return + } - if hijackErr != nil { - h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr)) - return + if hijackErr != nil { + h.logger.Error("hijack failed on protocol switch", zap.Error(hijackErr)) + return + } } // adopted from https://github.com/golang/go/commit/8bcf2834afdf6a1f7937390903a41518715ef6f5 From 8eaa94b32262df4ef9b600b7c07ab7469e6437ac Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 10 Sep 2024 16:50:33 +0800 Subject: [PATCH 2/9] delete protocol pseudo-header --- modules/caddyhttp/reverseproxy/reverseproxy.go | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index cdd6c22757e..060c68cd391 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -403,6 +403,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends if r.ProtoMajor == 2 && r.Method == http.MethodConnect && r.Header.Get(":protocol") != "" { + r.Header.Del(":protocol") r.Method = http.MethodGet r.Header.Set("Upgrade", r.Header.Get(":protocol")) r.Header.Set("Connection", "Upgrade") From 67778fc1719f19aa734a2e556a2547ff3f8b03cc Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 10 Sep 2024 17:01:39 +0800 Subject: [PATCH 3/9] modify cloned requests --- modules/caddyhttp/reverseproxy/reverseproxy.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 060c68cd391..6e533a9bc13 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -403,16 +403,16 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends if r.ProtoMajor == 2 && r.Method == http.MethodConnect && r.Header.Get(":protocol") != "" { - r.Header.Del(":protocol") - r.Method = http.MethodGet - r.Header.Set("Upgrade", r.Header.Get(":protocol")) - r.Header.Set("Connection", "Upgrade") + clonedReq.Header.Del(":protocol") + clonedReq.Method = http.MethodGet + clonedReq.Header.Set("Upgrade", r.Header.Get(":protocol")) + clonedReq.Header.Set("Connection", "Upgrade") key := make([]byte, 16) _, randErr := rand.Read(key) if randErr != nil { return randErr } - r.Header.Set("Sec-Websocket-Key", base64.StdEncoding.EncodeToString(key)) + clonedReq.Header.Set("Sec-Websocket-Key", base64.StdEncoding.EncodeToString(key)) } // we will need the original headers and Host value if From 460c3e5d2aa359235749273f3d242088eca12d80 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 10 Sep 2024 17:38:43 +0800 Subject: [PATCH 4/9] set request variable to track if it's a h2 websocket --- modules/caddyhttp/reverseproxy/reverseproxy.go | 1 + modules/caddyhttp/reverseproxy/streaming.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 6e533a9bc13..b8253125cc0 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -403,6 +403,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends if r.ProtoMajor == 2 && r.Method == http.MethodConnect && r.Header.Get(":protocol") != "" { + caddyhttp.SetVar(clonedReq.Context(), "h2_websocket", true) clonedReq.Header.Del(":protocol") clonedReq.Method = http.MethodGet clonedReq.Header.Set("Upgrade", r.Header.Get(":protocol")) diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index cfe3eea0382..0c000268e07 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -23,6 +23,7 @@ import ( "context" "errors" "fmt" + "github.com/caddyserver/caddy/v2/modules/caddyhttp" "io" weakrand "math/rand" "mime" @@ -87,7 +88,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, ) // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends - if req.ProtoMajor == 2 && req.Method == http.MethodConnect && req.Header.Get(":protocol") != "" { + if b, ok := caddyhttp.GetVar(req.Context(), "h2_websocket").(bool); ok && b { rw.Header().Del("Upgrade") rw.Header().Del("Connection") rw.Header().Del("Sec-Websocket-Accept") From feac4b0befc5684687c041226891f5b369883613 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 10 Sep 2024 17:49:55 +0800 Subject: [PATCH 5/9] use request bodu --- modules/caddyhttp/reverseproxy/streaming.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 0c000268e07..71bc90942f6 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -99,7 +99,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, h.logger.Error("failed to flush http2 websocket response", zap.Error(flushErr)) return } - conn = h2ReadWriteCloser{res.Body, rw} + conn = h2ReadWriteCloser{req.Body, rw} // bufio is not needed, use minimal buffer brw = bufio.NewReadWriter(bufio.NewReaderSize(conn, 1), bufio.NewWriterSize(conn, 1)) } else { From ebd589ce9c637ac7715390baeb13e509eb49f649 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 10 Sep 2024 22:11:37 +0800 Subject: [PATCH 6/9] rewrite request body --- modules/caddyhttp/reverseproxy/reverseproxy.go | 4 +++- modules/caddyhttp/reverseproxy/streaming.go | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index b8253125cc0..5424da4d0d0 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -403,8 +403,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends if r.ProtoMajor == 2 && r.Method == http.MethodConnect && r.Header.Get(":protocol") != "" { - caddyhttp.SetVar(clonedReq.Context(), "h2_websocket", true) clonedReq.Header.Del(":protocol") + // keep the body for later use. http1.1 upgrade uses http.NoBody + caddyhttp.SetVar(clonedReq.Context(), "h2_websocket_body", clonedReq.Body) + clonedReq.Body = http.NoBody clonedReq.Method = http.MethodGet clonedReq.Header.Set("Upgrade", r.Header.Get(":protocol")) clonedReq.Header.Set("Connection", "Upgrade") diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 71bc90942f6..6f2c3390074 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -88,7 +88,8 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, ) // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends - if b, ok := caddyhttp.GetVar(req.Context(), "h2_websocket").(bool); ok && b { + if body, ok := caddyhttp.GetVar(req.Context(), "h2_websocket_body").(io.ReadCloser); ok { + req.Body = body rw.Header().Del("Upgrade") rw.Header().Del("Connection") rw.Header().Del("Sec-Websocket-Accept") From 0097468a347514fd647f1cef6ec828a1f5c62316 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 26 Nov 2024 09:05:30 +0800 Subject: [PATCH 7/9] use WebSocket instead of Websocket in the headers --- modules/caddyhttp/reverseproxy/reverseproxy.go | 4 ++-- modules/caddyhttp/reverseproxy/streaming.go | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 5424da4d0d0..f247afe31e5 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -400,7 +400,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht return caddyhttp.Error(http.StatusInternalServerError, fmt.Errorf("preparing request for upstream round-trip: %v", err)) } - // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade + // websocket over http2, assuming backend doesn't support this, the request will be modified to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends if r.ProtoMajor == 2 && r.Method == http.MethodConnect && r.Header.Get(":protocol") != "" { clonedReq.Header.Del(":protocol") @@ -415,7 +415,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht if randErr != nil { return randErr } - clonedReq.Header.Set("Sec-Websocket-Key", base64.StdEncoding.EncodeToString(key)) + clonedReq.Header["Sec-WebSocket-Key"] = []string{base64.StdEncoding.EncodeToString(key)} } // we will need the original headers and Host value if diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 6f2c3390074..793e5af349d 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -23,7 +23,6 @@ import ( "context" "errors" "fmt" - "github.com/caddyserver/caddy/v2/modules/caddyhttp" "io" weakrand "math/rand" "mime" @@ -34,6 +33,8 @@ import ( "go.uber.org/zap" "golang.org/x/net/http/httpguts" + + "github.com/caddyserver/caddy/v2/modules/caddyhttp" ) type h2ReadWriteCloser struct { @@ -86,13 +87,13 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, conn io.ReadWriteCloser brw *bufio.ReadWriter ) - // websocket over http2, assuming backend doesn't support this, the request will be modifided to http1.1 upgrade + // websocket over http2, assuming backend doesn't support this, the request will be modified to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends if body, ok := caddyhttp.GetVar(req.Context(), "h2_websocket_body").(io.ReadCloser); ok { req.Body = body rw.Header().Del("Upgrade") rw.Header().Del("Connection") - rw.Header().Del("Sec-Websocket-Accept") + delete(rw.Header(), "Sec-WebSocket-Accept") rw.WriteHeader(http.StatusOK) flushErr := http.NewResponseController(rw).Flush() From 12e5c5d019f27465e3e5880cd2fd199a197fcf76 Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 26 Nov 2024 09:43:27 +0800 Subject: [PATCH 8/9] use logger check for zap loggers --- modules/caddyhttp/reverseproxy/streaming.go | 45 +++++++++++++-------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 05623bc3056..37d6b36f0d6 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -102,9 +102,15 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, delete(rw.Header(), "Sec-WebSocket-Accept") rw.WriteHeader(http.StatusOK) + if c := logger.Check(zap.DebugLevel, "upgrading connection"); c != nil { + c.Write(zap.Int("http_version", 2)) + } + flushErr := http.NewResponseController(rw).Flush() if flushErr != nil { - h.logger.Error("failed to flush http2 websocket response", zap.Error(flushErr)) + if c := h.logger.Check(zap.ErrorLevel, "failed to flush http2 websocket response"); c != nil { + c.Write(zap.Error(flushErr)) + } return } conn = h2ReadWriteCloser{req.Body, rw} @@ -113,22 +119,27 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, } else { rw.WriteHeader(res.StatusCode) - //nolint:bodyclose - conn, brw, hijackErr := http.NewResponseController(rw).Hijack() - if errors.Is(hijackErr, http.ErrNotSupported) { - if c := logger.Check(zapcore.ErrorLevel, "can't switch protocols using non-Hijacker ResponseWriter"); c != nil { - c.Write(zap.String("type", fmt.Sprintf("%T", rw))) - } - return - } - - if hijackErr != nil { - if c := logger.Check(zapcore.ErrorLevel, "hijack failed on protocol switch"); c != nil { - c.Write(zap.Error(hijackErr)) - } - return - } - } + if c := logger.Check(zap.DebugLevel, "upgrading connection"); c != nil { + c.Write(zap.Int("http_version", req.ProtoMajor)) + } + + var hijackErr error + //nolint:bodyclose + conn, brw, hijackErr = http.NewResponseController(rw).Hijack() + if errors.Is(hijackErr, http.ErrNotSupported) { + if c := h.logger.Check(zap.ErrorLevel, "can't switch protocols using non-Hijacker ResponseWriter"); c != nil { + c.Write(zap.String("type", fmt.Sprintf("%T", rw))) + } + return + } + + if hijackErr != nil { + if c := h.logger.Check(zap.ErrorLevel, "hijack failed on protocol switch"); c != nil { + c.Write(zap.Error(hijackErr)) + } + return + } + } // adopted from https://github.com/golang/go/commit/8bcf2834afdf6a1f7937390903a41518715ef6f5 backConnCloseCh := make(chan struct{}) From 182153e827fb36cdc80280c1118fbaed8084366d Mon Sep 17 00:00:00 2001 From: WeidiDeng Date: Tue, 26 Nov 2024 09:47:28 +0800 Subject: [PATCH 9/9] fix lint --- modules/caddyhttp/reverseproxy/streaming.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 37d6b36f0d6..d697eb402d1 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -49,6 +49,7 @@ func (rwc h2ReadWriteCloser) Write(p []byte) (n int, err error) { return 0, err } + //nolint:bodyclose err = http.NewResponseController(rwc.ResponseWriter).Flush() if err != nil { return 0, err @@ -106,6 +107,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, c.Write(zap.Int("http_version", 2)) } + //nolint:bodyclose flushErr := http.NewResponseController(rw).Flush() if flushErr != nil { if c := h.logger.Check(zap.ErrorLevel, "failed to flush http2 websocket response"); c != nil {