Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: cherry pick some keyspace related things #6840

Merged
merged 8 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type RegionInfo struct {
readBytes uint64
readKeys uint64
approximateSize int64
approximateKvSize int64
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
Expand Down Expand Up @@ -151,6 +152,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB

region := &RegionInfo{
term: heartbeat.GetTerm(),
Expand All @@ -164,6 +166,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKvSize: int64(regionKvSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
Expand Down Expand Up @@ -230,6 +233,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
readBytes: r.readBytes,
readKeys: r.readKeys,
approximateSize: r.approximateSize,
approximateKvSize: r.approximateKvSize,
approximateKeys: r.approximateKeys,
interval: typeutil.DeepClone(r.interval, TimeIntervalFactory),
replicationStatus: r.replicationStatus,
Expand Down Expand Up @@ -520,6 +524,11 @@ func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 {
return r.approximateKeys
}

// GetApproximateKvSize returns the approximate kv size of the region.
func (r *RegionInfo) GetApproximateKvSize() int64 {
return r.approximateKvSize
}

// GetApproximateKeys returns the approximate keys of the region.
func (r *RegionInfo) GetApproximateKeys() int64 {
return r.approximateKeys
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ func SetApproximateSize(v int64) RegionCreateOption {
}
}

// SetApproximateKvSize sets the approximate size for the region.
func SetApproximateKvSize(v int64) RegionCreateOption {
return func(region *RegionInfo) {
region.approximateKvSize = v
}
}

// SetApproximateKeys sets the approximate keys for the region.
func SetApproximateKeys(v int64) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/gc/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gc

import "github.com/prometheus/client_golang/prometheus"

var (
gcSafePointGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "gc",
Name: "gc_safepoint",
Help: "The ts of gc safepoint",
}, []string{"type"})
)

func init() {
prometheus.MustRegister(gcSafePointGauge)
}
3 changes: 3 additions & 0 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe
return
}
err = manager.store.SaveGCSafePoint(newSafePoint)
if err == nil {
gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(newSafePoint))
}
return
}

Expand Down
47 changes: 37 additions & 10 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,6 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
if err != nil {
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
return nil, err
}
userKind := endpoint.StringUserKind(request.Config[UserKindKey])
config, err := manager.kgm.GetKeyspaceConfigByKind(userKind)
if err != nil {
Expand All @@ -215,16 +207,51 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
request.Config[UserKindKey] = config[UserKindKey]
}
}
// Create and save keyspace metadata.
// Create a disabled keyspace meta for tikv-server to get the config on keyspace split.
keyspace := &keyspacepb.KeyspaceMeta{
Id: newID,
Name: request.Name,
State: keyspacepb.KeyspaceState_ENABLED,
State: keyspacepb.KeyspaceState_DISABLED,
CreatedAt: request.CreateTime,
StateChangedAt: request.CreateTime,
Config: request.Config,
}
err = manager.saveNewKeyspace(keyspace)
if err != nil {
log.Warn("[keyspace] failed to save keyspace before split",
zap.Uint32("keyspace-id", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
zap.Error(err),
)
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
err2 := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
idPath := endpoint.KeyspaceIDPath(request.Name)
metaPath := endpoint.KeyspaceMetaPath(newID)
e := txn.Remove(idPath)
if e != nil {
return e
}
return txn.Remove(metaPath)
})
if err2 != nil {
log.Warn("[keyspace] failed to remove pre-created keyspace after split failed",
zap.Uint32("keyspace-id", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
zap.Error(err2),
)
}
return nil, err
}
// enable the keyspace metadata after split.
keyspace.State = keyspacepb.KeyspaceState_ENABLED
_, err = manager.UpdateKeyspaceStateByID(newID, keyspacepb.KeyspaceState_ENABLED, request.CreateTime)
if err != nil {
log.Warn("[keyspace] failed to create keyspace",
zap.Uint32("keyspace-id", keyspace.GetId()),
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type RegionStats struct {
Count int `json:"count"`
EmptyCount int `json:"empty_count"`
StorageSize int64 `json:"storage_size"`
UserStorageSize int64 `json:"user_storage_size"`
StorageKeys int64 `json:"storage_keys"`
StoreLeaderCount map[uint64]int `json:"store_leader_count"`
StorePeerCount map[uint64]int `json:"store_peer_count"`
Expand Down Expand Up @@ -57,10 +58,12 @@ func (s *RegionStats) Observe(r *core.RegionInfo) {
s.Count++
approximateKeys := r.GetApproximateKeys()
approximateSize := r.GetApproximateSize()
approximateKvSize := r.GetApproximateKvSize()
if approximateSize <= core.EmptyRegionApproximateSize {
s.EmptyCount++
}
s.StorageSize += approximateSize
s.UserStorageSize += approximateKvSize
s.StorageKeys += approximateKeys
leader := r.GetLeader()
if leader != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
storeAddress := store.GetAddress()
id := strconv.FormatUint(store.GetID(), 10)
// Store state.
isDown := false
switch store.GetNodeState() {
case metapb.NodeState_Preparing, metapb.NodeState_Serving:
if store.DownTime() >= s.opt.GetMaxStoreDownTime() {
isDown = true
s.Down++
} else if store.IsUnhealthy() {
s.Unhealthy++
Expand All @@ -104,7 +106,8 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
s.resetStoreStatistics(storeAddress, id)
return
}
if store.IsLowSpace(s.opt.GetLowSpaceRatio()) {

if !isDown && store.IsLowSpace(s.opt.GetLowSpaceRatio()) {
s.LowSpace++
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,13 @@ func (txn *etcdTxn) LoadRange(key, endKey string, limit int) (keys []string, val
return keys, values, err
}

// commit perform the operations on etcd, with pre-condition that values observed by user has not been changed.
// commit perform the operations on etcd, with pre-condition that values observed by user have not been changed.
func (txn *etcdTxn) commit() error {
baseTxn := txn.kv.client.Txn(txn.ctx)
baseTxn.If(txn.conditions...)
baseTxn.Then(txn.operations...)
resp, err := baseTxn.Commit()
// Using slowLogTxn to commit transaction.
slowLogTxn := NewSlowLogTxn(txn.kv.client)
slowLogTxn.If(txn.conditions...)
slowLogTxn.Then(txn.operations...)
resp, err := slowLogTxn.Commit()
if err != nil {
return err
}
Expand Down
40 changes: 40 additions & 0 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"strconv"

"github.com/gorilla/mux"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
"go.uber.org/zap"
)

type adminHandler struct {
Expand Down Expand Up @@ -59,6 +62,43 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.")
}

// @Tags admin
// @Summary Remove target region from region cache and storage.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {string} string "The region is removed from server storage."
// @Failure 400 {string} string "The input is invalid."
// @Router /admin/storage/region/{id} [delete]
func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
vars := mux.Vars(r)
regionIDStr := vars["id"]
regionID, err := strconv.ParseUint(regionIDStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
targetRegion := rc.GetRegion(regionID)
if targetRegion == nil {
h.rd.JSON(w, http.StatusBadRequest, "failed to get target region from cache")
return
}

// Remove region from storage
if err = rc.GetStorage().DeleteRegion(targetRegion.GetMeta()); err != nil {
log.Error("failed to delete region from storage",
zap.Uint64("region-id", targetRegion.GetID()),
zap.Stringer("region-meta", core.RegionToHexMeta(targetRegion.GetMeta())),
errs.ZapError(err))
h.rd.JSON(w, http.StatusOK, "failed to delete region from storage.")
return
}
// Remove region from cache.
rc.DropCacheRegion(regionID)

h.rd.JSON(w, http.StatusOK, "The region is removed from server cache and region meta storage.")
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

adminHandler := newAdminHandler(svr, rd)
registerFunc(clusterRouter, "/admin/cache/region/{id}", adminHandler.DeleteRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/admin/storage/region/{id}", adminHandler.DeleteRegionStorage, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/admin/cache/regions", adminHandler.DeleteAllRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.IsSnapshotRecovering, setMethods(http.MethodGet), setAuditBackend(localLog, prometheus))
Expand Down
6 changes: 6 additions & 0 deletions server/api/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (suite *statsTestSuite) TestRegionStats() {
},
&metapb.Peer{Id: 101, StoreId: 1},
core.SetApproximateSize(100),
core.SetApproximateKvSize(80),
core.SetApproximateKeys(50),
),
core.NewRegionInfo(
Expand All @@ -90,6 +91,7 @@ func (suite *statsTestSuite) TestRegionStats() {
},
&metapb.Peer{Id: 105, StoreId: 4},
core.SetApproximateSize(200),
core.SetApproximateKvSize(180),
core.SetApproximateKeys(150),
),
core.NewRegionInfo(
Expand All @@ -105,6 +107,7 @@ func (suite *statsTestSuite) TestRegionStats() {
},
&metapb.Peer{Id: 107, StoreId: 5},
core.SetApproximateSize(1),
core.SetApproximateKvSize(1),
core.SetApproximateKeys(1),
),
core.NewRegionInfo(
Expand All @@ -119,6 +122,7 @@ func (suite *statsTestSuite) TestRegionStats() {
},
&metapb.Peer{Id: 108, StoreId: 4},
core.SetApproximateSize(50),
core.SetApproximateKvSize(30),
core.SetApproximateKeys(20),
),
}
Expand All @@ -139,6 +143,7 @@ func (suite *statsTestSuite) TestRegionStats() {
Count: 4,
EmptyCount: 1,
StorageSize: 351,
UserStorageSize: 291,
StorageKeys: 221,
StoreLeaderCount: map[uint64]int{1: 1, 4: 2, 5: 1},
StorePeerCount: map[uint64]int{1: 3, 2: 1, 3: 1, 4: 2, 5: 2},
Expand All @@ -152,6 +157,7 @@ func (suite *statsTestSuite) TestRegionStats() {
Count: 2,
EmptyCount: 1,
StorageSize: 201,
UserStorageSize: 181,
StorageKeys: 151,
StoreLeaderCount: map[uint64]int{4: 1, 5: 1},
StorePeerCount: map[uint64]int{1: 2, 4: 1, 5: 2},
Expand Down
Loading