Skip to content

Commit

Permalink
resource-manager: supports redirect the http request to primary server (
Browse files Browse the repository at this point in the history
#6050)

close #6051

resource-manager: supports redirect the HTTP request to primary server

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
nolouch and ti-chi-bot authored Feb 27, 2023
1 parent 002d6ca commit 10eeab1
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 94 deletions.
8 changes: 8 additions & 0 deletions pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"go.etcd.io/etcd/clientv3"
)

// MemberProvider defines the common basic behaviors of a member
type MemberProvider interface {
GetName() string
GetClientUrls() []string
}

// Server defines the common basic behaviors of a server
type Server interface {
// Name returns the unique Name for this server in the cluster.
Expand All @@ -31,6 +37,8 @@ type Server interface {
Run() error
// Close closes the server.
Close()
// GetPrimary returns the primary of the server.
GetPrimary() MemberProvider
// GetClient returns builtin etcd client.
GetClient() *clientv3.Client
// GetHTTPClient returns builtin http client.
Expand Down
10 changes: 8 additions & 2 deletions pkg/mcs/resource_manager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
rmserver "github.com/tikv/pd/pkg/mcs/resource_manager/server"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
)

// APIPathPrefix is the prefix of the API path.
Expand Down Expand Up @@ -59,9 +60,14 @@ func NewService(srv *rmserver.Service) *Service {
apiHandlerEngine.Use(gin.Recovery())
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
endpoint := apiHandlerEngine.Group(APIPathPrefix)
manager := srv.GetManager()

apiHandlerEngine.Use(func(c *gin.Context) {
// manager implements the interface of basicserver.Service.
c.Set("service", manager.GetBasicServer())
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
endpoint := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
manager: manager,
apiHandlerEngine: apiHandlerEngine,
Expand Down
5 changes: 5 additions & 0 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func NewManager(srv bs.Server) *Manager {
return m
}

// GetBasicServer returns the basic server.
func (m *Manager) GetBasicServer() bs.Server {
return m.srv
}

// Init initializes the resource group manager.
func (m *Manager) Init(ctx context.Context) {
// Reset the resource groups first.
Expand Down
7 changes: 7 additions & 0 deletions pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/log"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand Down Expand Up @@ -245,6 +246,12 @@ func (s *Server) startGRPCAndHTTPServers(l net.Listener) {
}
}

// GetPrimary returns the primary member.
func (s *Server) GetPrimary() bs.MemberProvider {
// TODO: implement this function with primary.
return nil
}

func (s *Server) startServer() error {
manager := NewManager(s)
s.service = &Service{
Expand Down
6 changes: 6 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ func (s *Server) IsServing() bool {
return true
}

// GetPrimary returns the primary provider of this tso server.
func (s *Server) GetPrimary() bs.MemberProvider {
// TODO: implement this
return nil
}

// AddServiceReadyCallback adds the callback function when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
// the global TSO allocator after the flag 'enable-local-tso' is set to true.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
Expand Down
83 changes: 83 additions & 0 deletions pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package apiutil

import (
"bytes"
"compress/gzip"
"encoding/hex"
"encoding/json"
"fmt"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/unrolled/render"
"go.uber.org/zap"
)
Expand All @@ -44,6 +46,15 @@ var (
componentAnonymousValue = "anonymous"
)

const (
// ErrRedirectFailed is the error message for redirect failed.
ErrRedirectFailed = "redirect failed"
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = "redirect to not leader"

chunkSize = 4096
)

// DeferClose captures the error returned from closing (if an error occurs).
// This is designed to be used in a defer statement.
func DeferClose(c io.Closer, err *error) {
Expand Down Expand Up @@ -367,3 +378,75 @@ func RegisterUserDefinedHandlers(registerMap map[string]http.Handler, group *API
log.Info("register REST path", zap.String("path", pathPrefix))
return nil
}

type customReverseProxies struct {
urls []url.URL
client *http.Client
}

// NewCustomReverseProxies returns the custom reverse proxies.
func NewCustomReverseProxies(dialClient *http.Client, urls []url.URL) http.Handler {
p := &customReverseProxies{
client: dialClient,
}

p.urls = append(p.urls, urls...)

return p
}

func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for _, url := range p.urls {
r.RequestURI = ""
r.URL.Host = url.Host
r.URL.Scheme = url.Scheme

resp, err := p.client.Do(r)
if err != nil {
log.Error("request failed", errs.ZapError(errs.ErrSendRequest, err))
continue
}
defer resp.Body.Close()
var reader io.ReadCloser
switch resp.Header.Get("Content-Encoding") {
case "gzip":
reader, err = gzip.NewReader(resp.Body)
if err != nil {
log.Error("failed to parse response with gzip compress", zap.Error(err))
continue
}
defer reader.Close()
default:
reader = resp.Body
}

copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
for {
if _, err = io.CopyN(w, reader, chunkSize); err != nil {
if err == io.EOF {
err = nil
}
break
}
}
if err != nil {
log.Error("write failed", errs.ZapError(errs.ErrWriteHTTPBody, err), zap.String("target-address", url.String()))
// try next url.
continue
}
return
}
http.Error(w, ErrRedirectFailed, http.StatusInternalServerError)
}

func copyHeader(dst, src http.Header) {
for k, vv := range src {
values := dst[k]
for _, v := range vv {
if !slice.Contains(values, v) {
dst.Add(k, v)
}
}
}
}
76 changes: 76 additions & 0 deletions pkg/utils/apiutil/multiservicesapi/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package multiservicesapi

import (
"net/http"
"net/url"

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/apiutil"
"go.uber.org/zap"
)

// HTTP headers.
const (
ServiceAllowDirectHandle = "service-allow-direct-handle"
ServiceRedirectorHeader = "service-redirector"
)

// ServiceRedirector is a middleware to redirect the request to the right place.
func ServiceRedirector() gin.HandlerFunc {
return func(c *gin.Context) {
svr := c.MustGet("service").(bs.Server)
allowDirectHandle := len(c.Request.Header.Get(ServiceAllowDirectHandle)) > 0
isServing := svr.IsServing()
if allowDirectHandle || isServing {
c.Next()
return
}

// Prevent more than one redirection.
if name := c.Request.Header.Get(ServiceRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
return
}

c.Request.Header.Set(ServiceRedirectorHeader, svr.Name())

primary := svr.GetPrimary()
if primary == nil {
c.AbortWithStatusJSON(http.StatusServiceUnavailable, errs.ErrLeaderNil.FastGenByArgs().Error())
return
}
clientUrls := primary.GetClientUrls()
urls := make([]url.URL, 0, len(clientUrls))
for _, item := range clientUrls {
u, err := url.Parse(item)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrURLParse.Wrap(err).GenWithStackByCause().Error())
return
}

urls = append(urls, *u)
}

client := svr.GetHTTPClient()
apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(c.Writer, c.Request)
c.Abort()
}
}
79 changes: 7 additions & 72 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
package serverapi

import (
"io"
"net/http"
"net/url"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
"github.com/urfave/negroni"
Expand All @@ -30,13 +28,8 @@ import (

// HTTP headers.
const (
RedirectorHeader = "PD-Redirector"
AllowFollowerHandle = "PD-Allow-follower-handle"
)

const (
errRedirectFailed = "redirect failed"
errRedirectToNotLeader = "redirect to not leader"
PDRedirectorHeader = "PD-Redirector"
PDAllowFollowerHandle = "PD-Allow-follower-handle"
)

type runtimeServiceValidator struct {
Expand Down Expand Up @@ -88,21 +81,21 @@ func NewRedirector(s *server.Server) negroni.Handler {
}

func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
allowFollowerHandle := len(r.Header.Get(AllowFollowerHandle)) > 0
allowFollowerHandle := len(r.Header.Get(PDAllowFollowerHandle)) > 0
isLeader := h.s.GetMember().IsLeader()
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) {
next(w, r)
return
}

// Prevent more than one redirection.
if name := r.Header.Get(RedirectorHeader); len(name) != 0 {
if name := r.Header.Get(PDRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
http.Error(w, errRedirectToNotLeader, http.StatusInternalServerError)
http.Error(w, apiutil.ErrRedirectToNotLeader, http.StatusInternalServerError)
return
}

r.Header.Set(RedirectorHeader, h.s.Name())
r.Header.Set(PDRedirectorHeader, h.s.Name())

leader := h.s.GetMember().GetLeader()
if leader == nil {
Expand All @@ -121,63 +114,5 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
urls = append(urls, *u)
}
client := h.s.GetHTTPClient()
NewCustomReverseProxies(client, urls).ServeHTTP(w, r)
}

type customReverseProxies struct {
urls []url.URL
client *http.Client
}

// NewCustomReverseProxies returns the custom reverse proxies.
func NewCustomReverseProxies(dialClient *http.Client, urls []url.URL) http.Handler {
p := &customReverseProxies{
client: dialClient,
}

p.urls = append(p.urls, urls...)

return p
}

func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for _, url := range p.urls {
r.RequestURI = ""
r.URL.Host = url.Host
r.URL.Scheme = url.Scheme

resp, err := p.client.Do(r)
if err != nil {
log.Error("request failed", errs.ZapError(errs.ErrSendRequest, err))
continue
}

b, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Error("read failed", errs.ZapError(errs.ErrIORead, err))
continue
}

copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
if _, err := w.Write(b); err != nil {
log.Error("write failed", errs.ZapError(errs.ErrWriteHTTPBody, err))
continue
}

return
}
http.Error(w, errRedirectFailed, http.StatusInternalServerError)
}

func copyHeader(dst, src http.Header) {
for k, vv := range src {
values := dst[k]
for _, v := range vv {
if !slice.Contains(values, v) {
dst.Add(k, v)
}
}
}
apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(w, r)
}
Loading

0 comments on commit 10eeab1

Please sign in to comment.