Skip to content

Commit

Permalink
refactor: proxy handler of yurthub
Browse files Browse the repository at this point in the history
1. separate the max in-flight limit request handler and trace request handler
2. refactor newWrapperResponseWriter by including http.Flusher and http.CloseNotifier interface
3. adjust max in-flight request handler at the entry point of http request
  • Loading branch information
rambohe-ch committed Oct 29, 2020
1 parent 4b1f71b commit accdca4
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 49 deletions.
7 changes: 4 additions & 3 deletions pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ func NewYurtReverseProxyHandler(
return yurtProxy.buildHandlerChain(yurtProxy), nil
}

func (p *yurtReverseProxy) buildHandlerChain(apiHandler http.Handler) http.Handler {
handler := util.WithRequestContentType(apiHandler)
func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler {
handler = util.WithRequestTrace(handler)
handler = util.WithRequestContentType(handler)
handler = util.WithCacheHeaderCheck(handler)
handler = util.WithRequestTrace(handler, p.maxRequestsInFlight)
handler = util.WithRequestClientComponent(handler)
handler = filters.WithRequestInfo(handler, p.resolver)
handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight)
return handler
}

Expand Down
73 changes: 29 additions & 44 deletions pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package util

import (
"context"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -109,72 +108,58 @@ func WithRequestClientComponent(handler http.Handler) http.Handler {

type wrapperResponseWriter struct {
http.ResponseWriter
statusCode int
closeNotifyCh chan bool
ctx context.Context
http.Flusher
http.CloseNotifier
statusCode int
}

func newWrapperResponseWriter(ctx context.Context, rw http.ResponseWriter) *wrapperResponseWriter {
return &wrapperResponseWriter{
ResponseWriter: rw,
closeNotifyCh: make(chan bool, 1),
ctx: ctx,
func newWrapperResponseWriter(w http.ResponseWriter) *wrapperResponseWriter {
cn, ok := w.(http.CloseNotifier)
if !ok {
klog.Error("can not get http.CloseNotifier")
}
}

func (wrw *wrapperResponseWriter) Write(b []byte) (int, error) {
return wrw.ResponseWriter.Write(b)
}
flusher, ok := w.(http.Flusher)
if !ok {
klog.Error("can not get http.Flusher")
}

func (wrw *wrapperResponseWriter) Header() http.Header {
return wrw.ResponseWriter.Header()
return &wrapperResponseWriter{
ResponseWriter: w,
Flusher: flusher,
CloseNotifier: cn,
}
}

func (wrw *wrapperResponseWriter) WriteHeader(statusCode int) {
wrw.statusCode = statusCode
wrw.ResponseWriter.WriteHeader(statusCode)
}

func (wrw *wrapperResponseWriter) CloseNotify() <-chan bool {
if cn, ok := wrw.ResponseWriter.(http.CloseNotifier); ok {
return cn.CloseNotify()
}
klog.Infof("can't get http.CloseNotifier from http.ResponseWriter")
go func() {
<-wrw.ctx.Done()
wrw.closeNotifyCh <- true
}()

return wrw.closeNotifyCh
}

func (wrw *wrapperResponseWriter) Flush() {
if flusher, ok := wrw.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
} else {
klog.Errorf("can't get http.Flusher from http.ResponseWriter")
}
// WithRequestTrace used to trace status code and handle time for request.
func WithRequestTrace(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
wrapperRW := newWrapperResponseWriter(w)
start := time.Now()
handler.ServeHTTP(wrapperRW, req)
klog.Infof("%s with status code %d, spent %v", util.ReqString(req), wrapperRW.statusCode, time.Since(start))
})
}

// WithRequestTrace used for tracing in flight requests. and when in flight
// WithMaxInFlightLimit limits the number of in-flight requests. and when in flight
// requests exceeds the threshold, the following incoming requests will be rejected.
func WithRequestTrace(handler http.Handler, limit int) http.Handler {
func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler {
var reqChan chan bool
if limit > 0 {
reqChan = make(chan bool, limit)
}

return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
wrapperRW := newWrapperResponseWriter(req.Context(), w)
start := time.Now()

select {
case reqChan <- true:
defer func() {
<-reqChan
klog.Infof("%s with status code %d, spent %v, left %d requests in flight", util.ReqString(req), wrapperRW.statusCode, time.Since(start), len(reqChan))
}()
handler.ServeHTTP(wrapperRW, req)
handler.ServeHTTP(w, req)
<-reqChan
klog.Infof("%s request completed, left %d requests in flight", util.ReqString(req), len(reqChan))
default:
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", "1")
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/proxy/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestWithRequestClientComponent(t *testing.T) {
}
}

func TestWithRequestTrace(t *testing.T) {
func TestWithMaxInFlightLimit(t *testing.T) {
testcases := map[int]struct {
Verb string
Path string
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestWithRequestTrace(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

handler = WithRequestTrace(handler, 10)
handler = WithMaxInFlightLimit(handler, 10)
handler = filters.WithRequestInfo(handler, resolver)

respCodes := make([]int, k)
Expand Down

0 comments on commit accdca4

Please sign in to comment.