diff --git a/pkg/basicserver/basic_server.go b/pkg/basicserver/basic_server.go index 4fcaad96741..d0c1bd17b98 100644 --- a/pkg/basicserver/basic_server.go +++ b/pkg/basicserver/basic_server.go @@ -21,6 +21,12 @@ import ( "go.etcd.io/etcd/clientv3" ) +// MemberProvider defines the common basic behaviors of a member +type MemberProvider interface { + GetName() string + GetClientUrls() []string +} + // Server defines the common basic behaviors of a server type Server interface { // Name returns the unique Name for this server in the cluster. @@ -31,6 +37,8 @@ type Server interface { Run() error // Close closes the server. Close() + // GetPrimary returns the primary of the server. + GetPrimary() MemberProvider // GetClient returns builtin etcd client. GetClient() *clientv3.Client // GetHTTPClient returns builtin http client. diff --git a/pkg/mcs/resource_manager/server/apis/v1/api.go b/pkg/mcs/resource_manager/server/apis/v1/api.go index ec0af1941ed..7ae040cba96 100644 --- a/pkg/mcs/resource_manager/server/apis/v1/api.go +++ b/pkg/mcs/resource_manager/server/apis/v1/api.go @@ -24,6 +24,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" rmserver "github.com/tikv/pd/pkg/mcs/resource_manager/server" "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" ) // APIPathPrefix is the prefix of the API path. @@ -59,9 +60,14 @@ func NewService(srv *rmserver.Service) *Service { apiHandlerEngine.Use(gin.Recovery()) apiHandlerEngine.Use(cors.Default()) apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) - endpoint := apiHandlerEngine.Group(APIPathPrefix) manager := srv.GetManager() - + apiHandlerEngine.Use(func(c *gin.Context) { + // manager implements the interface of basicserver.Service. + c.Set("service", manager.GetBasicServer()) + c.Next() + }) + apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) + endpoint := apiHandlerEngine.Group(APIPathPrefix) s := &Service{ manager: manager, apiHandlerEngine: apiHandlerEngine, diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index 13bd5b7b0e1..424699d5d46 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -79,6 +79,11 @@ func NewManager(srv bs.Server) *Manager { return m } +// GetBasicServer returns the basic server. +func (m *Manager) GetBasicServer() bs.Server { + return m.srv +} + // Init initializes the resource group manager. func (m *Manager) Init(ctx context.Context) { // Reset the resource groups first. diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index b5c648f0585..e854685d5f4 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/log" "github.com/soheilhy/cmux" "github.com/spf13/cobra" + bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -245,6 +246,12 @@ func (s *Server) startGRPCAndHTTPServers(l net.Listener) { } } +// GetPrimary returns the primary member. +func (s *Server) GetPrimary() bs.MemberProvider { + // TODO: implement this function with primary. + return nil +} + func (s *Server) startServer() error { manager := NewManager(s) s.service = &Service{ diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index c9e65786dd3..39b29979b89 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -126,6 +126,12 @@ func (s *Server) IsServing() bool { return true } +// GetPrimary returns the primary provider of this tso server. +func (s *Server) GetPrimary() bs.MemberProvider { + // TODO: implement this + return nil +} + // AddServiceReadyCallback adds the callback function when the server becomes the leader, if there is embedded etcd, or the primary otherwise. // the global TSO allocator after the flag 'enable-local-tso' is set to true. func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { diff --git a/pkg/utils/apiutil/apiutil.go b/pkg/utils/apiutil/apiutil.go index 4b393eccf37..0af40f6293b 100644 --- a/pkg/utils/apiutil/apiutil.go +++ b/pkg/utils/apiutil/apiutil.go @@ -16,6 +16,7 @@ package apiutil import ( "bytes" + "compress/gzip" "encoding/hex" "encoding/json" "fmt" @@ -32,6 +33,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/slice" "github.com/unrolled/render" "go.uber.org/zap" ) @@ -44,6 +46,15 @@ var ( componentAnonymousValue = "anonymous" ) +const ( + // ErrRedirectFailed is the error message for redirect failed. + ErrRedirectFailed = "redirect failed" + // ErrRedirectToNotLeader is the error message for redirect to not leader. + ErrRedirectToNotLeader = "redirect to not leader" + + chunkSize = 4096 +) + // DeferClose captures the error returned from closing (if an error occurs). // This is designed to be used in a defer statement. func DeferClose(c io.Closer, err *error) { @@ -367,3 +378,75 @@ func RegisterUserDefinedHandlers(registerMap map[string]http.Handler, group *API log.Info("register REST path", zap.String("path", pathPrefix)) return nil } + +type customReverseProxies struct { + urls []url.URL + client *http.Client +} + +// NewCustomReverseProxies returns the custom reverse proxies. +func NewCustomReverseProxies(dialClient *http.Client, urls []url.URL) http.Handler { + p := &customReverseProxies{ + client: dialClient, + } + + p.urls = append(p.urls, urls...) + + return p +} + +func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request) { + for _, url := range p.urls { + r.RequestURI = "" + r.URL.Host = url.Host + r.URL.Scheme = url.Scheme + + resp, err := p.client.Do(r) + if err != nil { + log.Error("request failed", errs.ZapError(errs.ErrSendRequest, err)) + continue + } + defer resp.Body.Close() + var reader io.ReadCloser + switch resp.Header.Get("Content-Encoding") { + case "gzip": + reader, err = gzip.NewReader(resp.Body) + if err != nil { + log.Error("failed to parse response with gzip compress", zap.Error(err)) + continue + } + defer reader.Close() + default: + reader = resp.Body + } + + copyHeader(w.Header(), resp.Header) + w.WriteHeader(resp.StatusCode) + for { + if _, err = io.CopyN(w, reader, chunkSize); err != nil { + if err == io.EOF { + err = nil + } + break + } + } + if err != nil { + log.Error("write failed", errs.ZapError(errs.ErrWriteHTTPBody, err), zap.String("target-address", url.String())) + // try next url. + continue + } + return + } + http.Error(w, ErrRedirectFailed, http.StatusInternalServerError) +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + values := dst[k] + for _, v := range vv { + if !slice.Contains(values, v) { + dst.Add(k, v) + } + } + } +} diff --git a/pkg/utils/apiutil/multiservicesapi/middleware.go b/pkg/utils/apiutil/multiservicesapi/middleware.go new file mode 100644 index 00000000000..146f4c172cb --- /dev/null +++ b/pkg/utils/apiutil/multiservicesapi/middleware.go @@ -0,0 +1,76 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multiservicesapi + +import ( + "net/http" + "net/url" + + "github.com/gin-gonic/gin" + "github.com/pingcap/log" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/apiutil" + "go.uber.org/zap" +) + +// HTTP headers. +const ( + ServiceAllowDirectHandle = "service-allow-direct-handle" + ServiceRedirectorHeader = "service-redirector" +) + +// ServiceRedirector is a middleware to redirect the request to the right place. +func ServiceRedirector() gin.HandlerFunc { + return func(c *gin.Context) { + svr := c.MustGet("service").(bs.Server) + allowDirectHandle := len(c.Request.Header.Get(ServiceAllowDirectHandle)) > 0 + isServing := svr.IsServing() + if allowDirectHandle || isServing { + c.Next() + return + } + + // Prevent more than one redirection. + if name := c.Request.Header.Get(ServiceRedirectorHeader); len(name) != 0 { + log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect)) + c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error()) + return + } + + c.Request.Header.Set(ServiceRedirectorHeader, svr.Name()) + + primary := svr.GetPrimary() + if primary == nil { + c.AbortWithStatusJSON(http.StatusServiceUnavailable, errs.ErrLeaderNil.FastGenByArgs().Error()) + return + } + clientUrls := primary.GetClientUrls() + urls := make([]url.URL, 0, len(clientUrls)) + for _, item := range clientUrls { + u, err := url.Parse(item) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrURLParse.Wrap(err).GenWithStackByCause().Error()) + return + } + + urls = append(urls, *u) + } + + client := svr.GetHTTPClient() + apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(c.Writer, c.Request) + c.Abort() + } +} diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 0e6f1f38ccb..c650177de9e 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -15,13 +15,11 @@ package serverapi import ( - "io" "net/http" "net/url" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" "github.com/urfave/negroni" @@ -30,13 +28,8 @@ import ( // HTTP headers. const ( - RedirectorHeader = "PD-Redirector" - AllowFollowerHandle = "PD-Allow-follower-handle" -) - -const ( - errRedirectFailed = "redirect failed" - errRedirectToNotLeader = "redirect to not leader" + PDRedirectorHeader = "PD-Redirector" + PDAllowFollowerHandle = "PD-Allow-follower-handle" ) type runtimeServiceValidator struct { @@ -88,7 +81,7 @@ func NewRedirector(s *server.Server) negroni.Handler { } func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { - allowFollowerHandle := len(r.Header.Get(AllowFollowerHandle)) > 0 + allowFollowerHandle := len(r.Header.Get(PDAllowFollowerHandle)) > 0 isLeader := h.s.GetMember().IsLeader() if !h.s.IsClosed() && (allowFollowerHandle || isLeader) { next(w, r) @@ -96,13 +89,13 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http } // Prevent more than one redirection. - if name := r.Header.Get(RedirectorHeader); len(name) != 0 { + if name := r.Header.Get(PDRedirectorHeader); len(name) != 0 { log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) - http.Error(w, errRedirectToNotLeader, http.StatusInternalServerError) + http.Error(w, apiutil.ErrRedirectToNotLeader, http.StatusInternalServerError) return } - r.Header.Set(RedirectorHeader, h.s.Name()) + r.Header.Set(PDRedirectorHeader, h.s.Name()) leader := h.s.GetMember().GetLeader() if leader == nil { @@ -121,63 +114,5 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http urls = append(urls, *u) } client := h.s.GetHTTPClient() - NewCustomReverseProxies(client, urls).ServeHTTP(w, r) -} - -type customReverseProxies struct { - urls []url.URL - client *http.Client -} - -// NewCustomReverseProxies returns the custom reverse proxies. -func NewCustomReverseProxies(dialClient *http.Client, urls []url.URL) http.Handler { - p := &customReverseProxies{ - client: dialClient, - } - - p.urls = append(p.urls, urls...) - - return p -} - -func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request) { - for _, url := range p.urls { - r.RequestURI = "" - r.URL.Host = url.Host - r.URL.Scheme = url.Scheme - - resp, err := p.client.Do(r) - if err != nil { - log.Error("request failed", errs.ZapError(errs.ErrSendRequest, err)) - continue - } - - b, err := io.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - log.Error("read failed", errs.ZapError(errs.ErrIORead, err)) - continue - } - - copyHeader(w.Header(), resp.Header) - w.WriteHeader(resp.StatusCode) - if _, err := w.Write(b); err != nil { - log.Error("write failed", errs.ZapError(errs.ErrWriteHTTPBody, err)) - continue - } - - return - } - http.Error(w, errRedirectFailed, http.StatusInternalServerError) -} - -func copyHeader(dst, src http.Header) { - for k, vv := range src { - values := dst[k] - for _, v := range vv { - if !slice.Contains(values, v) { - dst.Add(k, v) - } - } - } + apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(w, r) } diff --git a/server/api/metric.go b/server/api/metric.go index 434f6cc728e..6a9dc4a7cde 100644 --- a/server/api/metric.go +++ b/server/api/metric.go @@ -20,7 +20,7 @@ import ( "net/url" "strings" - "github.com/tikv/pd/pkg/utils/apiutil/serverapi" + "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" ) @@ -48,7 +48,7 @@ func (h *queryMetric) QueryMetric(w http.ResponseWriter, r *http.Request) { case "http", "https": // Replace the pd path with the prometheus http API path. r.URL.Path = strings.Replace(r.URL.Path, "pd/api/v1/metric", "api/v1", 1) - serverapi.NewCustomReverseProxies(h.s.GetHTTPClient(), []url.URL{*u}).ServeHTTP(w, r) + apiutil.NewCustomReverseProxies(h.s.GetHTTPClient(), []url.URL{*u}).ServeHTTP(w, r) default: // TODO: Support read data by self after support store metric data in PD/TiKV. http.Error(w, fmt.Sprintf("schema of metric storage address is no supported, address: %v", metricAddr), http.StatusInternalServerError) diff --git a/server/apiv2/middlewares/redirector.go b/server/apiv2/middlewares/redirector.go index e2997de5bf8..6f92c15754e 100644 --- a/server/apiv2/middlewares/redirector.go +++ b/server/apiv2/middlewares/redirector.go @@ -21,6 +21,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/serverapi" "github.com/tikv/pd/server" "go.uber.org/zap" @@ -30,7 +31,7 @@ import ( func Redirector() gin.HandlerFunc { return func(c *gin.Context) { svr := c.MustGet("server").(*server.Server) - allowFollowerHandle := len(c.Request.Header.Get(serverapi.AllowFollowerHandle)) > 0 + allowFollowerHandle := len(c.Request.Header.Get(serverapi.PDAllowFollowerHandle)) > 0 isLeader := svr.GetMember().IsLeader() if !svr.IsClosed() && (allowFollowerHandle || isLeader) { c.Next() @@ -38,13 +39,13 @@ func Redirector() gin.HandlerFunc { } // Prevent more than one redirection. - if name := c.Request.Header.Get(serverapi.RedirectorHeader); len(name) != 0 { + if name := c.Request.Header.Get(serverapi.PDRedirectorHeader); len(name) != 0 { log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect)) c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error()) return } - c.Request.Header.Set(serverapi.RedirectorHeader, svr.Name()) + c.Request.Header.Set(serverapi.PDRedirectorHeader, svr.Name()) leader := svr.GetMember().GetLeader() if leader == nil { @@ -64,7 +65,7 @@ func Redirector() gin.HandlerFunc { } client := svr.GetHTTPClient() - serverapi.NewCustomReverseProxies(client, urls).ServeHTTP(c.Writer, c.Request) + apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(c.Writer, c.Request) c.Abort() } } diff --git a/server/server.go b/server/server.go index adf75493eee..4ea0ef231f0 100644 --- a/server/server.go +++ b/server/server.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/sysutil" "github.com/tikv/pd/pkg/audit" + bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" @@ -706,6 +707,12 @@ func (s *Server) GetLeader() *pdpb.Member { return s.member.GetLeader() } +// GetPrimary returns the primary member provider of the api server. +// api service's leader is equal to the primary member. +func (s *Server) GetPrimary() bs.MemberProvider { + return s.member.GetLeader() +} + // GetMember returns the member of server. func (s *Server) GetMember() *member.Member { return s.member diff --git a/tests/mcs/resource_manager/resource_manager_test.go b/tests/mcs/resource_manager/resource_manager_test.go index 62e439f33d4..b087f6d607c 100644 --- a/tests/mcs/resource_manager/resource_manager_test.go +++ b/tests/mcs/resource_manager/resource_manager_test.go @@ -432,10 +432,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { re := suite.Require() cli := suite.client - - leaderName := suite.cluster.WaitLeader() - leader := suite.cluster.GetServer(leaderName) - testCasesSet1 := []struct { name string mode rmpb.GroupMode @@ -565,7 +561,6 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { // to test the deletion of persistence suite.resignAndWaitLeader() - leader = suite.cluster.GetServer(suite.cluster.GetLeader()) // List Resource Group lresp, err = cli.ListResourceGroups(suite.ctx) re.NoError(err) @@ -575,6 +570,13 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { // Test Resource Group CURD via HTTP finalNum = 0 + getAddr := func(i int) string { + server := suite.cluster.GetServer(suite.cluster.GetLeader()) + if i%2 == 1 { + server = suite.cluster.GetServer(suite.cluster.GetFollower()) + } + return server.GetAddr() + } for i, tcase := range testCasesSet1 { // Create Resource Group group := &rmpb.ResourceGroup{ @@ -583,7 +585,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } createJSON, err := json.Marshal(group) re.NoError(err) - resp, err := http.Post(leader.GetAddr()+"/resource-manager/api/v1/config/group", "application/json", strings.NewReader(string(createJSON))) + resp, err := http.Post(getAddr(i)+"/resource-manager/api/v1/config/group", "application/json", strings.NewReader(string(createJSON))) re.NoError(err) defer resp.Body.Close() if tcase.addSuccess { @@ -597,7 +599,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { tcase.modifySettings(group) modifyJSON, err := json.Marshal(group) re.NoError(err) - req, err := http.NewRequest(http.MethodPut, leader.GetAddr()+"/resource-manager/api/v1/config/group", strings.NewReader(string(modifyJSON))) + req, err := http.NewRequest(http.MethodPut, getAddr(i+1)+"/resource-manager/api/v1/config/group", strings.NewReader(string(modifyJSON))) re.NoError(err) req.Header.Set("Content-Type", "application/json") resp, err = http.DefaultClient.Do(req) @@ -610,7 +612,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } // Get Resource Group - resp, err = http.Get(leader.GetAddr() + "/resource-manager/api/v1/config/group/" + tcase.name) + resp, err = http.Get(getAddr(i) + "/resource-manager/api/v1/config/group/" + tcase.name) re.NoError(err) defer resp.Body.Close() re.Equal(http.StatusOK, resp.StatusCode) @@ -623,7 +625,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { // Last one, Check list and delete all resource groups if i == len(testCasesSet1)-1 { - resp, err := http.Get(leader.GetAddr() + "/resource-manager/api/v1/config/groups") + resp, err := http.Get(getAddr(i) + "/resource-manager/api/v1/config/groups") re.NoError(err) defer resp.Body.Close() re.Equal(http.StatusOK, resp.StatusCode) @@ -635,7 +637,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { // Delete all resource groups for _, g := range groups { - req, err := http.NewRequest(http.MethodDelete, leader.GetAddr()+"/resource-manager/api/v1/config/group/"+g.Name, nil) + req, err := http.NewRequest(http.MethodDelete, getAddr(i+1)+"/resource-manager/api/v1/config/group/"+g.Name, nil) re.NoError(err) resp, err := http.DefaultClient.Do(req) re.NoError(err) @@ -647,7 +649,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } // verify again - resp1, err := http.Get(leader.GetAddr() + "/resource-manager/api/v1/config/groups") + resp1, err := http.Get(getAddr(i) + "/resource-manager/api/v1/config/groups") re.NoError(err) defer resp1.Body.Close() re.Equal(http.StatusOK, resp1.StatusCode) diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index ca5954f8c6d..7fb7e7d1236 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -613,10 +613,10 @@ func (suite *redirectorTestSuite) TestAllowFollowerHandle() { addr := follower.GetAddr() + "/pd/api/v1/version" request, err := http.NewRequest(http.MethodGet, addr, nil) suite.NoError(err) - request.Header.Add(serverapi.AllowFollowerHandle, "true") + request.Header.Add(serverapi.PDAllowFollowerHandle, "true") resp, err := dialClient.Do(request) suite.NoError(err) - suite.Equal("", resp.Header.Get(serverapi.RedirectorHeader)) + suite.Equal("", resp.Header.Get(serverapi.PDRedirectorHeader)) defer resp.Body.Close() suite.Equal(http.StatusOK, resp.StatusCode) _, err = io.ReadAll(resp.Body) @@ -647,7 +647,7 @@ func (suite *redirectorTestSuite) TestNotLeader() { // Request to follower with redirectorHeader will fail. request.RequestURI = "" - request.Header.Set(serverapi.RedirectorHeader, "pd") + request.Header.Set(serverapi.PDRedirectorHeader, "pd") resp1, err := dialClient.Do(request) suite.NoError(err) defer resp1.Body.Close()