Skip to content

Commit

Permalink
*: cherry pick some keyspace related things (tikv#6840)
Browse files Browse the repository at this point in the history
ref tikv#4399

Signed-off-by: disksing <i@disksing.com>
Signed-off-by: Evan Zhou <coocood@gmail.com>
Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com>
Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: disksing <i@disksing.com>
Co-authored-by: Evan Zhou <coocood@gmail.com>
Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
5 people committed Aug 2, 2023
1 parent 62687f5 commit f0fb9b1
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 16 deletions.
9 changes: 9 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type RegionInfo struct {
readBytes uint64
readKeys uint64
approximateSize int64
approximateKvSize int64
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
Expand Down Expand Up @@ -151,6 +152,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB

region := &RegionInfo{
term: heartbeat.GetTerm(),
Expand All @@ -164,6 +166,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKvSize: int64(regionKvSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
Expand Down Expand Up @@ -230,6 +233,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
readBytes: r.readBytes,
readKeys: r.readKeys,
approximateSize: r.approximateSize,
approximateKvSize: r.approximateKvSize,
approximateKeys: r.approximateKeys,
interval: typeutil.DeepClone(r.interval, TimeIntervalFactory),
replicationStatus: r.replicationStatus,
Expand Down Expand Up @@ -520,6 +524,11 @@ func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 {
return r.approximateKeys
}

// GetApproximateKvSize returns the approximate kv size of the region.
func (r *RegionInfo) GetApproximateKvSize() int64 {
return r.approximateKvSize
}

// GetApproximateKeys returns the approximate keys of the region.
func (r *RegionInfo) GetApproximateKeys() int64 {
return r.approximateKeys
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ func SetApproximateSize(v int64) RegionCreateOption {
}
}

// SetApproximateKvSize sets the approximate size for the region.
func SetApproximateKvSize(v int64) RegionCreateOption {
return func(region *RegionInfo) {
region.approximateKvSize = v
}
}

// SetApproximateKeys sets the approximate keys for the region.
func SetApproximateKeys(v int64) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/gc/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gc

import "github.com/prometheus/client_golang/prometheus"

var (
gcSafePointGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "gc",
Name: "gc_safepoint",
Help: "The ts of gc safepoint",
}, []string{"type"})
)

func init() {
prometheus.MustRegister(gcSafePointGauge)
}
47 changes: 37 additions & 10 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,6 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
if err != nil {
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
return nil, err
}
userKind := endpoint.StringUserKind(request.Config[UserKindKey])
config, err := manager.kgm.GetKeyspaceConfigByKind(userKind)
if err != nil {
Expand All @@ -215,16 +207,51 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
request.Config[UserKindKey] = config[UserKindKey]
}
}
// Create and save keyspace metadata.
// Create a disabled keyspace meta for tikv-server to get the config on keyspace split.
keyspace := &keyspacepb.KeyspaceMeta{
Id: newID,
Name: request.Name,
State: keyspacepb.KeyspaceState_ENABLED,
State: keyspacepb.KeyspaceState_DISABLED,
CreatedAt: request.CreateTime,
StateChangedAt: request.CreateTime,
Config: request.Config,
}
err = manager.saveNewKeyspace(keyspace)
if err != nil {
log.Warn("[keyspace] failed to save keyspace before split",
zap.Uint32("keyspace-id", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
zap.Error(err),
)
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
err2 := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
idPath := endpoint.KeyspaceIDPath(request.Name)
metaPath := endpoint.KeyspaceMetaPath(newID)
e := txn.Remove(idPath)
if e != nil {
return e
}
return txn.Remove(metaPath)
})
if err2 != nil {
log.Warn("[keyspace] failed to remove pre-created keyspace after split failed",
zap.Uint32("keyspace-id", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
zap.Error(err2),
)
}
return nil, err
}
// enable the keyspace metadata after split.
keyspace.State = keyspacepb.KeyspaceState_ENABLED
_, err = manager.UpdateKeyspaceStateByID(newID, keyspacepb.KeyspaceState_ENABLED, request.CreateTime)
if err != nil {
log.Warn("[keyspace] failed to create keyspace",
zap.Uint32("keyspace-id", keyspace.GetId()),
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type RegionStats struct {
Count int `json:"count"`
EmptyCount int `json:"empty_count"`
StorageSize int64 `json:"storage_size"`
UserStorageSize int64 `json:"user_storage_size"`
StorageKeys int64 `json:"storage_keys"`
StoreLeaderCount map[uint64]int `json:"store_leader_count"`
StorePeerCount map[uint64]int `json:"store_peer_count"`
Expand Down Expand Up @@ -57,10 +58,12 @@ func (s *RegionStats) Observe(r *core.RegionInfo) {
s.Count++
approximateKeys := r.GetApproximateKeys()
approximateSize := r.GetApproximateSize()
approximateKvSize := r.GetApproximateKvSize()
if approximateSize <= core.EmptyRegionApproximateSize {
s.EmptyCount++
}
s.StorageSize += approximateSize
s.UserStorageSize += approximateKvSize
s.StorageKeys += approximateKeys
leader := r.GetLeader()
if leader != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
storeAddress := store.GetAddress()
id := strconv.FormatUint(store.GetID(), 10)
// Store state.
isDown := false
switch store.GetNodeState() {
case metapb.NodeState_Preparing, metapb.NodeState_Serving:
if store.DownTime() >= s.opt.GetMaxStoreDownTime() {
isDown = true
s.Down++
} else if store.IsUnhealthy() {
s.Unhealthy++
Expand All @@ -104,7 +106,8 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
s.resetStoreStatistics(storeAddress, id)
return
}
if store.IsLowSpace(s.opt.GetLowSpaceRatio()) {

if !isDown && store.IsLowSpace(s.opt.GetLowSpaceRatio()) {
s.LowSpace++
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,13 @@ func (txn *etcdTxn) LoadRange(key, endKey string, limit int) (keys []string, val
return keys, values, err
}

// commit perform the operations on etcd, with pre-condition that values observed by user has not been changed.
// commit perform the operations on etcd, with pre-condition that values observed by user have not been changed.
func (txn *etcdTxn) commit() error {
baseTxn := txn.kv.client.Txn(txn.ctx)
baseTxn.If(txn.conditions...)
baseTxn.Then(txn.operations...)
resp, err := baseTxn.Commit()
// Using slowLogTxn to commit transaction.
slowLogTxn := NewSlowLogTxn(txn.kv.client)
slowLogTxn.If(txn.conditions...)
slowLogTxn.Then(txn.operations...)
resp, err := slowLogTxn.Commit()
if err != nil {
return err
}
Expand Down
40 changes: 40 additions & 0 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"strconv"

"github.com/gorilla/mux"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
"go.uber.org/zap"
)

type adminHandler struct {
Expand Down Expand Up @@ -59,6 +62,43 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.")
}

// @Tags admin
// @Summary Remove target region from region cache and storage.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {string} string "The region is removed from server storage."
// @Failure 400 {string} string "The input is invalid."
// @Router /admin/storage/region/{id} [delete]
func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
vars := mux.Vars(r)
regionIDStr := vars["id"]
regionID, err := strconv.ParseUint(regionIDStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
targetRegion := rc.GetRegion(regionID)
if targetRegion == nil {
h.rd.JSON(w, http.StatusBadRequest, "failed to get target region from cache")
return
}

// Remove region from storage
if err = rc.GetStorage().DeleteRegion(targetRegion.GetMeta()); err != nil {
log.Error("failed to delete region from storage",
zap.Uint64("region-id", targetRegion.GetID()),
zap.Stringer("region-meta", core.RegionToHexMeta(targetRegion.GetMeta())),
errs.ZapError(err))
h.rd.JSON(w, http.StatusOK, "failed to delete region from storage.")
return
}
// Remove region from cache.
rc.DropCacheRegion(regionID)

h.rd.JSON(w, http.StatusOK, "The region is removed from server cache and region meta storage.")
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

adminHandler := newAdminHandler(svr, rd)
registerFunc(clusterRouter, "/admin/cache/region/{id}", adminHandler.DeleteRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/admin/storage/region/{id}", adminHandler.DeleteRegionStorage, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/admin/cache/regions", adminHandler.DeleteAllRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
Expand Down
6 changes: 6 additions & 0 deletions server/api/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (suite *statsTestSuite) TestRegionStats() {
},
&metapb.Peer{Id: 101, StoreId: 1},
core.SetApproximateSize(100),
core.SetApproximateKvSize(80),
core.SetApproximateKeys(50),
),
core.NewRegionInfo(
Expand All @@ -90,6 +91,7 @@ func (suite *statsTestSuite) TestRegionStats() {
},
&metapb.Peer{Id: 105, StoreId: 4},
core.SetApproximateSize(200),
core.SetApproximateKvSize(180),
core.SetApproximateKeys(150),
),
core.NewRegionInfo(
Expand All @@ -105,6 +107,7 @@ func (suite *statsTestSuite) TestRegionStats() {
},
&metapb.Peer{Id: 107, StoreId: 5},
core.SetApproximateSize(1),
core.SetApproximateKvSize(1),
core.SetApproximateKeys(1),
),
core.NewRegionInfo(
Expand All @@ -119,6 +122,7 @@ func (suite *statsTestSuite) TestRegionStats() {
},
&metapb.Peer{Id: 108, StoreId: 4},
core.SetApproximateSize(50),
core.SetApproximateKvSize(30),
core.SetApproximateKeys(20),
),
}
Expand All @@ -139,6 +143,7 @@ func (suite *statsTestSuite) TestRegionStats() {
Count: 4,
EmptyCount: 1,
StorageSize: 351,
UserStorageSize: 291,
StorageKeys: 221,
StoreLeaderCount: map[uint64]int{1: 1, 4: 2, 5: 1},
StorePeerCount: map[uint64]int{1: 3, 2: 1, 3: 1, 4: 2, 5: 2},
Expand All @@ -152,6 +157,7 @@ func (suite *statsTestSuite) TestRegionStats() {
Count: 2,
EmptyCount: 1,
StorageSize: 201,
UserStorageSize: 181,
StorageKeys: 151,
StoreLeaderCount: map[uint64]int{4: 1, 5: 1},
StorePeerCount: map[uint64]int{1: 2, 4: 1, 5: 2},
Expand Down
3 changes: 3 additions & 0 deletions server/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe
return
}
err = manager.store.SaveGCSafePoint(newSafePoint)
if err == nil {
gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(newSafePoint))
}
return
}

Expand Down

0 comments on commit f0fb9b1

Please sign in to comment.