From 2c5209527e5622a629abd2cf8922cd216ea074e0 Mon Sep 17 00:00:00 2001 From: husharp Date: Wed, 2 Aug 2023 14:17:31 +0800 Subject: [PATCH] expand min-resolved-ts to support stores Signed-off-by: husharp --- server/api/min_resolved_ts.go | 33 ++++++++++++++++++++----- server/api/min_resolved_ts_test.go | 39 ++++++++++++++++++++++++++++++ server/cluster/cluster.go | 14 +++++++++++ 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go index 0d30ea3395e5..3a70f673cc29 100644 --- a/server/api/min_resolved_ts.go +++ b/server/api/min_resolved_ts.go @@ -15,6 +15,8 @@ package api import ( + "encoding/json" + "io" "net/http" "strconv" @@ -38,9 +40,10 @@ func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolved // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. type minResolvedTS struct { - IsRealTime bool `json:"is_real_time,omitempty"` - MinResolvedTS uint64 `json:"min_resolved_ts"` - PersistInterval typeutil.Duration `json:"persist_interval,omitempty"` + IsRealTime bool `json:"is_real_time,omitempty"` + MinResolvedTS uint64 `json:"min_resolved_ts"` + PersistInterval typeutil.Duration `json:"persist_interval,omitempty"` + StoreMinResolvedTS map[uint64]uint64 `json:"store_min_resolved_ts"` } // @Tags min_store_resolved_ts @@ -69,6 +72,8 @@ func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *h // @Tags min_resolved_ts // @Summary Get cluster-level min resolved ts. +// @Description Optionally, if a list of store IDs is provided in the request body, +// it also returns the min_resolved_ts for the specified stores in a separate map. // @Produce json // @Success 200 {array} minResolvedTS // @Failure 500 {string} string "PD server failed to proceed the request." @@ -77,9 +82,25 @@ func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.R c := h.svr.GetRaftCluster() value := c.GetMinResolvedTS() persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval + + var storeMinResolvedTS map[uint64]uint64 + if b, err := io.ReadAll(r.Body); err == nil && len(b) != 0 { + // stores ids is an optional parameter. + // if it is not empty, return the min resolved ts of the specified stores into map. + var ids []string + err = json.Unmarshal(b, &ids) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + c := h.svr.GetRaftCluster() + storeMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids) + } + h.rd.JSON(w, http.StatusOK, minResolvedTS{ - MinResolvedTS: value, - PersistInterval: persistInterval, - IsRealTime: persistInterval.Duration != 0, + MinResolvedTS: value, + PersistInterval: persistInterval, + IsRealTime: persistInterval.Duration != 0, + StoreMinResolvedTS: storeMinResolvedTS, }) } diff --git a/server/api/min_resolved_ts_test.go b/server/api/min_resolved_ts_test.go index 79ab71e2be1d..bcf28817d234 100644 --- a/server/api/min_resolved_ts_test.go +++ b/server/api/min_resolved_ts_test.go @@ -15,7 +15,10 @@ package api import ( + "bytes" + "encoding/json" "fmt" + "net/http" "reflect" "testing" "time" @@ -116,6 +119,28 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() { }) } +func (suite *minResolvedTSTestSuite) TestMinResolvedTSByStores() { + // default run job + interval := typeutil.Duration{Duration: suite.defaultInterval} + suite.setMinResolvedTSPersistenceInterval(interval) + suite.Eventually(func() bool { + return interval == suite.svr.GetRaftCluster().GetPDServerConfig().MinResolvedTSPersistenceInterval + }, time.Second*10, time.Millisecond*20) + // set min resolved ts + rc := suite.svr.GetRaftCluster() + ts := uint64(233) + rc.SetMinResolvedTS(1, ts) + storeIDs := []string{"1"} + suite.checkMinResolvedTSByStores(&minResolvedTS{ + MinResolvedTS: 0, + IsRealTime: true, + PersistInterval: interval, + StoreMinResolvedTS: map[uint64]uint64{ + 1: ts, + }, + }, storeIDs) +} + func (suite *minResolvedTSTestSuite) setMinResolvedTSPersistenceInterval(duration typeutil.Duration) { cfg := suite.svr.GetRaftCluster().GetPDServerConfig().Clone() cfg.MinResolvedTSPersistenceInterval = duration @@ -133,3 +158,17 @@ func (suite *minResolvedTSTestSuite) checkMinResolvedTS(expect *minResolvedTS) { return reflect.DeepEqual(expect, listResp) }, time.Second*10, time.Millisecond*20) } + +func (suite *minResolvedTSTestSuite) checkMinResolvedTSByStores(expect *minResolvedTS, storeIDs []string) { + suite.Eventually(func() bool { + data, _ := json.Marshal(storeIDs) + req, _ := http.NewRequest(http.MethodGet, suite.url, bytes.NewBuffer(data)) + res, err := testDialClient.Do(req) + suite.NoError(err) + defer res.Body.Close() + listResp := &minResolvedTS{} + err = apiutil.ReadJSON(res.Body, listResp) + suite.NoError(err) + return reflect.DeepEqual(expect, listResp) + }, time.Second*10, time.Millisecond*20) +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b2ad25cf0cab..a6cbe655c1a3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2487,6 +2487,20 @@ func (c *RaftCluster) GetStoreMinResolvedTS(storeID uint64) uint64 { return c.GetStore(storeID).GetMinResolvedTS() } +// GetMinResolvedTSByStoreIDs returns the min resolved ts of the stores. +func (c *RaftCluster) GetMinResolvedTSByStoreIDs(ids []string) map[uint64]uint64 { + allMinResolvedTS := make(map[uint64]uint64) + for _, idStr := range ids { + storeID, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + log.Error("parse store id failed", errs.ZapError(err)) + continue + } + allMinResolvedTS[storeID] = c.GetStoreMinResolvedTS(storeID) + } + return allMinResolvedTS +} + // GetExternalTS returns the external timestamp. func (c *RaftCluster) GetExternalTS() uint64 { c.RLock()