diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 2f514f776fb..4997a334a2d 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -81,12 +81,13 @@ func NewYurtReverseProxyHandler( return yurtProxy.buildHandlerChain(yurtProxy), nil } -func (p *yurtReverseProxy) buildHandlerChain(apiHandler http.Handler) http.Handler { - handler := util.WithRequestContentType(apiHandler) +func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler { + handler = util.WithRequestTrace(handler) + handler = util.WithRequestContentType(handler) handler = util.WithCacheHeaderCheck(handler) - handler = util.WithRequestTrace(handler, p.maxRequestsInFlight) handler = util.WithRequestClientComponent(handler) handler = filters.WithRequestInfo(handler, p.resolver) + handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight) return handler } diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index adefba53d4d..80552eef6d3 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -17,7 +17,6 @@ limitations under the License. package util import ( - "context" "net/http" "strings" "time" @@ -109,25 +108,27 @@ func WithRequestClientComponent(handler http.Handler) http.Handler { type wrapperResponseWriter struct { http.ResponseWriter - statusCode int - closeNotifyCh chan bool - ctx context.Context + http.Flusher + http.CloseNotifier + statusCode int } -func newWrapperResponseWriter(ctx context.Context, rw http.ResponseWriter) *wrapperResponseWriter { - return &wrapperResponseWriter{ - ResponseWriter: rw, - closeNotifyCh: make(chan bool, 1), - ctx: ctx, +func newWrapperResponseWriter(w http.ResponseWriter) *wrapperResponseWriter { + cn, ok := w.(http.CloseNotifier) + if !ok { + klog.Error("can not get http.CloseNotifier") } -} -func (wrw *wrapperResponseWriter) Write(b []byte) (int, error) { - return wrw.ResponseWriter.Write(b) -} + flusher, ok := w.(http.Flusher) + if !ok { + klog.Error("can not get http.Flusher") + } -func (wrw *wrapperResponseWriter) Header() http.Header { - return wrw.ResponseWriter.Header() + return &wrapperResponseWriter{ + ResponseWriter: w, + Flusher: flusher, + CloseNotifier: cn, + } } func (wrw *wrapperResponseWriter) WriteHeader(statusCode int) { @@ -135,46 +136,30 @@ func (wrw *wrapperResponseWriter) WriteHeader(statusCode int) { wrw.ResponseWriter.WriteHeader(statusCode) } -func (wrw *wrapperResponseWriter) CloseNotify() <-chan bool { - if cn, ok := wrw.ResponseWriter.(http.CloseNotifier); ok { - return cn.CloseNotify() - } - klog.Infof("can't get http.CloseNotifier from http.ResponseWriter") - go func() { - <-wrw.ctx.Done() - wrw.closeNotifyCh <- true - }() - - return wrw.closeNotifyCh -} - -func (wrw *wrapperResponseWriter) Flush() { - if flusher, ok := wrw.ResponseWriter.(http.Flusher); ok { - flusher.Flush() - } else { - klog.Errorf("can't get http.Flusher from http.ResponseWriter") - } +// WithRequestTrace used to trace status code and handle time for request. +func WithRequestTrace(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + wrapperRW := newWrapperResponseWriter(w) + start := time.Now() + handler.ServeHTTP(wrapperRW, req) + klog.Infof("%s with status code %d, spent %v", util.ReqString(req), wrapperRW.statusCode, time.Since(start)) + }) } -// WithRequestTrace used for tracing in flight requests. and when in flight +// WithMaxInFlightLimit limits the number of in-flight requests. and when in flight // requests exceeds the threshold, the following incoming requests will be rejected. -func WithRequestTrace(handler http.Handler, limit int) http.Handler { +func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler { var reqChan chan bool if limit > 0 { reqChan = make(chan bool, limit) } return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - wrapperRW := newWrapperResponseWriter(req.Context(), w) - start := time.Now() - select { case reqChan <- true: - defer func() { - <-reqChan - klog.Infof("%s with status code %d, spent %v, left %d requests in flight", util.ReqString(req), wrapperRW.statusCode, time.Since(start), len(reqChan)) - }() - handler.ServeHTTP(wrapperRW, req) + handler.ServeHTTP(w, req) + <-reqChan + klog.Infof("%s request completed, left %d requests in flight", util.ReqString(req), len(reqChan)) default: // Return a 429 status indicating "Too Many Requests" w.Header().Set("Retry-After", "1") diff --git a/pkg/yurthub/proxy/util/util_test.go b/pkg/yurthub/proxy/util/util_test.go index 321e57ca4e6..f23ab5b8d29 100644 --- a/pkg/yurthub/proxy/util/util_test.go +++ b/pkg/yurthub/proxy/util/util_test.go @@ -154,7 +154,7 @@ func TestWithRequestClientComponent(t *testing.T) { } } -func TestWithRequestTrace(t *testing.T) { +func TestWithMaxInFlightLimit(t *testing.T) { testcases := map[int]struct { Verb string Path string @@ -187,7 +187,7 @@ func TestWithRequestTrace(t *testing.T) { w.WriteHeader(http.StatusOK) }) - handler = WithRequestTrace(handler, 10) + handler = WithMaxInFlightLimit(handler, 10) handler = filters.WithRequestInfo(handler, resolver) respCodes := make([]int, k)