Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add REST for tso microservice #6084

Merged
merged 25 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ func (s *Server) startServer() error {
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
s.participant = member.NewParticipant(s.etcdClient, uniqueID)
s.participant.InitInfo(uniqueName, resourceManagerKeyspaceGroupPrimaryElectionPrefix+fmt.Sprintf("%05d", 0), "primary", "keyspace group primary election")
s.participant.InitInfo(uniqueName, resourceManagerKeyspaceGroupPrimaryElectionPrefix+fmt.Sprintf("%05d", 0),
"primary", "keyspace group primary election", s.cfg.ListenAddr)
s.participant.SetMemberDeployPath(s.participant.ID())
s.participant.SetMemberBinaryVersion(s.participant.ID(), versioninfo.PDReleaseVersion)
s.participant.SetMemberGitHash(s.participant.ID(), versioninfo.PDGitHash)
Expand Down
96 changes: 96 additions & 0 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apis

import (
"net/http"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/tso/api/v1"

var (
apiServiceGroup = apiutil.APIServiceGroup{
hnes marked this conversation as resolved.
Show resolved Hide resolved
Name: "tso",
Version: "v1",
IsCore: false,
PathPrefix: APIPathPrefix,
}
)

func init() {
tsoserver.SetUpRestHandler = func(srv *tsoserver.Service) (http.Handler, apiutil.APIServiceGroup) {
s := NewService(srv)
return s.handler(), apiServiceGroup
}
}

// Service is the tso service.
type Service struct {
apiHandlerEngine *gin.Engine
baseEndpoint *gin.RouterGroup

srv *tsoserver.Service
rd *render.Render
}

func createIndentRender() *render.Render {
return render.New(render.Options{
IndentJSON: true,
})
}

// NewService returns a new Service.
func NewService(srv *tsoserver.Service) *Service {
apiHandlerEngine := gin.New()
apiHandlerEngine.Use(gin.Recovery())
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
hnes marked this conversation as resolved.
Show resolved Hide resolved
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set("service", srv)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
endpoint := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
srv: srv,
apiHandlerEngine: apiHandlerEngine,
baseEndpoint: endpoint,
rd: createIndentRender(),
}
s.RegisterRouter()
return s
}

// RegisterRouter registers the router of the service.
func (s *Service) RegisterRouter() {
tsoAdminHandler := tso.NewAdminHandler(s.srv.GetHandler(), s.rd)
s.baseEndpoint.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
}

func (s *Service) handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.apiHandlerEngine.ServeHTTP(w, r)
})
}
47 changes: 47 additions & 0 deletions pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
"go.uber.org/zap"
)

// Handler is a helper to export methods to handle API/RPC requests.
type Handler struct {
s *Server
}

func newHandler(s *Server) *Handler {
return &Handler{s: s}
}

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return err
}
if tsoAllocator == nil {
return errs.ErrServerNotStarted
}
return tsoAllocator.SetTSO(ts, ignoreSmaller, skipUpperBoundCheck)
}
16 changes: 15 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

handler *Handler

cfg *Config
clusterID uint64
rootPath string
Expand Down Expand Up @@ -135,6 +137,16 @@ func (s *Server) Context() context.Context {
return s.ctx
}

// GetHandler returns the handler.
func (s *Server) GetHandler() *Handler {
return s.handler
}

// GetBasicServer returns the basic server.
func (s *Server) GetBasicServer() bs.Server {
return s
}

// Run runs the TSO server.
func (s *Server) Run() error {
go systimemon.StartMonitor(s.ctx, time.Now, func() {
Expand Down Expand Up @@ -565,7 +577,8 @@ func (s *Server) startServer() (err error) {
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))

s.participant = member.NewParticipant(s.etcdClient, uniqueID)
s.participant.InitInfo(uniqueName, tsoKeyspaceGroupPrimaryElectionPrefix+fmt.Sprintf("%05d", 0), "primary", "keyspace group primary election")
s.participant.InitInfo(uniqueName, tsoKeyspaceGroupPrimaryElectionPrefix+fmt.Sprintf("%05d", 0),
"primary", "keyspace group primary election", fmt.Sprintf("http://%s", s.cfg.ListenAddr))
s.participant.SetMemberDeployPath(s.participant.ID())
s.participant.SetMemberBinaryVersion(s.participant.ID(), versioninfo.PDReleaseVersion)
s.participant.SetMemberGitHash(s.participant.ID(), versioninfo.PDGitHash)
Expand Down Expand Up @@ -617,6 +630,7 @@ func CreateServer(ctx context.Context, cfg *Config) *Server {
cfg: cfg,
ctx: ctx,
}
svr.handler = newHandler(svr)
return svr
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ func NewParticipant(client *clientv3.Client, id uint64) *Participant {
}

// InitInfo initializes the member info. The leader key is path.Join(rootPath, leaderName)
func (m *Participant) InitInfo(name string, rootPath string, leaderName string, purpose string) {
func (m *Participant) InitInfo(name string, rootPath string, leaderName string, purpose string, listenURL string) {
leader := &pdpb.Member{
Name: name,
MemberId: m.ID(),
// TODO: need refactor
ClientUrls: []string{listenURL},
}

data, err := leader.Marshal()
Expand Down
105 changes: 105 additions & 0 deletions pkg/tso/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"net/http"
"strconv"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/unrolled/render"
)

// Handler defines the common behaviors of a basic tso handler.
type Handler interface {
ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error
}

// AdminHandler wrap the basic tso handler to provide http service.
type AdminHandler struct {
handler Handler
rd *render.Render
}

// NewAdminHandler returns a new admin handler.
func NewAdminHandler(handler Handler, rd *render.Render) *AdminHandler {
return &AdminHandler{
handler: handler,
rd: rd,
}
}

// ResetTS is the http.HandlerFunc of ResetTS
// FIXME: details of input json body params
// @Tags admin
// @Summary Reset the ts.
// @Accept json
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Reset ts successfully."
// @Failure 400 {string} string "The input is invalid."
// @Failure 403 {string} string "Reset ts is forbidden."
// @Failure 500 {string} string "TSO server failed to proceed the request."
// @Router /admin/reset-ts [post]
// if force-use-larger=true:
//
// reset ts to max(current ts, input ts).
//
// else:
//
// reset ts to input ts if it > current ts and < upper bound, error if not in that range
//
// during EBS based restore, we call this to make sure ts of pd >= resolved_ts in backup.
func (h *AdminHandler) ResetTS(w http.ResponseWriter, r *http.Request) {
handler := h.handler
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}
tsValue, ok := input["tso"].(string)
if !ok || len(tsValue) == 0 {
h.rd.JSON(w, http.StatusBadRequest, "invalid tso value")
return
}
ts, err := strconv.ParseUint(tsValue, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, "invalid tso value")
return
}

forceUseLarger := false
forceUseLargerVal, contains := input["force-use-larger"]
if contains {
if forceUseLarger, ok = forceUseLargerVal.(bool); !ok {
h.rd.JSON(w, http.StatusBadRequest, "invalid force-use-larger value")
return
}
}
var ignoreSmaller, skipUpperBoundCheck bool
if forceUseLarger {
ignoreSmaller, skipUpperBoundCheck = true, true
}

if err = handler.ResetTS(ts, ignoreSmaller, skipUpperBoundCheck); err != nil {
if err == errs.ErrServerNotStarted {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
} else {
h.rd.JSON(w, http.StatusForbidden, err.Error())
}
return
}
h.rd.JSON(w, http.StatusOK, "Reset ts successfully.")
}
62 changes: 0 additions & 62 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,68 +70,6 @@ func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Reque
h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.")
}

// ResetTS
// FIXME: details of input json body params
// @Tags admin
// @Summary Reset the ts.
// @Accept json
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Reset ts successfully."
// @Failure 400 {string} string "The input is invalid."
// @Failure 403 {string} string "Reset ts is forbidden."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /admin/reset-ts [post]
// if force-use-larger=true:
//
// reset ts to max(current ts, input ts).
//
// else:
//
// reset ts to input ts if it > current ts and < upper bound, error if not in that range
//
// during EBS based restore, we call this to make sure ts of pd >= resolved_ts in backup.
func (h *adminHandler) ResetTS(w http.ResponseWriter, r *http.Request) {
handler := h.svr.GetHandler()
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}
tsValue, ok := input["tso"].(string)
if !ok || len(tsValue) == 0 {
h.rd.JSON(w, http.StatusBadRequest, "invalid tso value")
return
}
ts, err := strconv.ParseUint(tsValue, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, "invalid tso value")
return
}

forceUseLarger := false
forceUseLargerVal, contains := input["force-use-larger"]
if contains {
if forceUseLarger, ok = forceUseLargerVal.(bool); !ok {
h.rd.JSON(w, http.StatusBadRequest, "invalid force-use-larger value")
return
}
}
var ignoreSmaller, skipUpperBoundCheck bool
if forceUseLarger {
ignoreSmaller, skipUpperBoundCheck = true, true
}

if err = handler.ResetTS(ts, ignoreSmaller, skipUpperBoundCheck); err != nil {
if err == server.ErrServerNotStarted {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
} else {
h.rd.JSON(w, http.StatusForbidden, err.Error())
}
return
}
h.rd.JSON(w, http.StatusOK, "Reset ts successfully.")
}

// Intentionally no swagger mark as it is supposed to be only used in
// server-to-server. For security reason, it only accepts JSON formatted data.
func (h *adminHandler) SavePersistFile(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading