Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: proxy handler of yurthub #128

Merged
merged 1 commit into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ func NewYurtReverseProxyHandler(
return yurtProxy.buildHandlerChain(yurtProxy), nil
}

func (p *yurtReverseProxy) buildHandlerChain(apiHandler http.Handler) http.Handler {
handler := util.WithRequestContentType(apiHandler)
func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler {
handler = util.WithRequestTrace(handler)
handler = util.WithRequestContentType(handler)
handler = util.WithCacheHeaderCheck(handler)
handler = util.WithRequestTrace(handler, p.maxRequestsInFlight)
handler = util.WithRequestClientComponent(handler)
handler = filters.WithRequestInfo(handler, p.resolver)
handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight)
return handler
}

Expand Down
73 changes: 29 additions & 44 deletions pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package util

import (
"context"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -109,72 +108,58 @@ func WithRequestClientComponent(handler http.Handler) http.Handler {

type wrapperResponseWriter struct {
http.ResponseWriter
statusCode int
closeNotifyCh chan bool
ctx context.Context
http.Flusher
http.CloseNotifier
statusCode int
}

func newWrapperResponseWriter(ctx context.Context, rw http.ResponseWriter) *wrapperResponseWriter {
return &wrapperResponseWriter{
ResponseWriter: rw,
closeNotifyCh: make(chan bool, 1),
ctx: ctx,
func newWrapperResponseWriter(w http.ResponseWriter) *wrapperResponseWriter {
cn, ok := w.(http.CloseNotifier)
if !ok {
klog.Error("can not get http.CloseNotifier")
}
}

func (wrw *wrapperResponseWriter) Write(b []byte) (int, error) {
return wrw.ResponseWriter.Write(b)
}
flusher, ok := w.(http.Flusher)
if !ok {
klog.Error("can not get http.Flusher")
}

func (wrw *wrapperResponseWriter) Header() http.Header {
return wrw.ResponseWriter.Header()
return &wrapperResponseWriter{
ResponseWriter: w,
Flusher: flusher,
CloseNotifier: cn,
}
}

func (wrw *wrapperResponseWriter) WriteHeader(statusCode int) {
wrw.statusCode = statusCode
wrw.ResponseWriter.WriteHeader(statusCode)
}

func (wrw *wrapperResponseWriter) CloseNotify() <-chan bool {
if cn, ok := wrw.ResponseWriter.(http.CloseNotifier); ok {
return cn.CloseNotify()
}
klog.Infof("can't get http.CloseNotifier from http.ResponseWriter")
go func() {
<-wrw.ctx.Done()
wrw.closeNotifyCh <- true
}()

return wrw.closeNotifyCh
}

func (wrw *wrapperResponseWriter) Flush() {
if flusher, ok := wrw.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
} else {
klog.Errorf("can't get http.Flusher from http.ResponseWriter")
}
// WithRequestTrace used to trace status code and handle time for request.
func WithRequestTrace(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
wrapperRW := newWrapperResponseWriter(w)
start := time.Now()
handler.ServeHTTP(wrapperRW, req)
klog.Infof("%s with status code %d, spent %v", util.ReqString(req), wrapperRW.statusCode, time.Since(start))
})
}

// WithRequestTrace used for tracing in flight requests. and when in flight
// WithMaxInFlightLimit limits the number of in-flight requests. and when in flight
// requests exceeds the threshold, the following incoming requests will be rejected.
func WithRequestTrace(handler http.Handler, limit int) http.Handler {
func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler {
var reqChan chan bool
if limit > 0 {
reqChan = make(chan bool, limit)
}

return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
wrapperRW := newWrapperResponseWriter(req.Context(), w)
start := time.Now()

select {
case reqChan <- true:
defer func() {
<-reqChan
klog.Infof("%s with status code %d, spent %v, left %d requests in flight", util.ReqString(req), wrapperRW.statusCode, time.Since(start), len(reqChan))
}()
handler.ServeHTTP(wrapperRW, req)
handler.ServeHTTP(w, req)
<-reqChan
klog.Infof("%s request completed, left %d requests in flight", util.ReqString(req), len(reqChan))
default:
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", "1")
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/proxy/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestWithRequestClientComponent(t *testing.T) {
}
}

func TestWithRequestTrace(t *testing.T) {
func TestWithMaxInFlightLimit(t *testing.T) {
testcases := map[int]struct {
Verb string
Path string
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestWithRequestTrace(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

handler = WithRequestTrace(handler, 10)
handler = WithMaxInFlightLimit(handler, 10)
handler = filters.WithRequestInfo(handler, resolver)

respCodes := make([]int, k)
Expand Down