diff --git a/pkg/core/region.go b/pkg/core/region.go index ba547ca6c4d..450fab499e6 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -64,6 +64,7 @@ type RegionInfo struct { readBytes uint64 readKeys uint64 approximateSize int64 + approximateKvSize int64 approximateKeys int64 interval *pdpb.TimeInterval replicationStatus *replication_modepb.RegionReplicationStatus @@ -151,6 +152,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize { regionSize = EmptyRegionApproximateSize } + regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB region := &RegionInfo{ term: heartbeat.GetTerm(), @@ -164,6 +166,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC readBytes: heartbeat.GetBytesRead(), readKeys: heartbeat.GetKeysRead(), approximateSize: int64(regionSize), + approximateKvSize: int64(regionKvSize), approximateKeys: int64(heartbeat.GetApproximateKeys()), interval: heartbeat.GetInterval(), replicationStatus: heartbeat.GetReplicationStatus(), @@ -230,6 +233,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo { readBytes: r.readBytes, readKeys: r.readKeys, approximateSize: r.approximateSize, + approximateKvSize: r.approximateKvSize, approximateKeys: r.approximateKeys, interval: typeutil.DeepClone(r.interval, TimeIntervalFactory), replicationStatus: r.replicationStatus, @@ -520,6 +524,11 @@ func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 { return r.approximateKeys } +// GetApproximateKvSize returns the approximate kv size of the region. +func (r *RegionInfo) GetApproximateKvSize() int64 { + return r.approximateKvSize +} + // GetApproximateKeys returns the approximate keys of the region. func (r *RegionInfo) GetApproximateKeys() int64 { return r.approximateKeys diff --git a/pkg/core/region_option.go b/pkg/core/region_option.go index 92820e3f549..ba46fab9420 100644 --- a/pkg/core/region_option.go +++ b/pkg/core/region_option.go @@ -274,6 +274,13 @@ func SetApproximateSize(v int64) RegionCreateOption { } } +// SetApproximateKvSize sets the approximate size for the region. +func SetApproximateKvSize(v int64) RegionCreateOption { + return func(region *RegionInfo) { + region.approximateKvSize = v + } +} + // SetApproximateKeys sets the approximate keys for the region. func SetApproximateKeys(v int64) RegionCreateOption { return func(region *RegionInfo) { diff --git a/pkg/gc/metrics.go b/pkg/gc/metrics.go new file mode 100644 index 00000000000..e70ecbf2209 --- /dev/null +++ b/pkg/gc/metrics.go @@ -0,0 +1,31 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import "github.com/prometheus/client_golang/prometheus" + +var ( + gcSafePointGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "gc", + Name: "gc_safepoint", + Help: "The ts of gc safepoint", + }, []string{"type"}) +) + +func init() { + prometheus.MustRegister(gcSafePointGauge) +} diff --git a/pkg/gc/safepoint.go b/pkg/gc/safepoint.go index 3a8f14ff787..be12cfb78a5 100644 --- a/pkg/gc/safepoint.go +++ b/pkg/gc/safepoint.go @@ -64,6 +64,9 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe return } err = manager.store.SaveGCSafePoint(newSafePoint) + if err == nil { + gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(newSafePoint)) + } return } diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 3d208aef2a2..1ce599b5f81 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -194,14 +194,6 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac if err != nil { return nil, err } - // If the request to create a keyspace is pre-allocated when the PD starts, - // there is no need to wait for the region split, because TiKV has not started. - waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit() - // Split keyspace region. - err = manager.splitKeyspaceRegion(newID, waitRegionSplit) - if err != nil { - return nil, err - } userKind := endpoint.StringUserKind(request.Config[UserKindKey]) config, err := manager.kgm.GetKeyspaceConfigByKind(userKind) if err != nil { @@ -215,16 +207,51 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac request.Config[UserKindKey] = config[UserKindKey] } } - // Create and save keyspace metadata. + // Create a disabled keyspace meta for tikv-server to get the config on keyspace split. keyspace := &keyspacepb.KeyspaceMeta{ Id: newID, Name: request.Name, - State: keyspacepb.KeyspaceState_ENABLED, + State: keyspacepb.KeyspaceState_DISABLED, CreatedAt: request.CreateTime, StateChangedAt: request.CreateTime, Config: request.Config, } err = manager.saveNewKeyspace(keyspace) + if err != nil { + log.Warn("[keyspace] failed to save keyspace before split", + zap.Uint32("keyspace-id", keyspace.GetId()), + zap.String("name", keyspace.GetName()), + zap.Error(err), + ) + return nil, err + } + // If the request to create a keyspace is pre-allocated when the PD starts, + // there is no need to wait for the region split, because TiKV has not started. + waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit() + // Split keyspace region. + err = manager.splitKeyspaceRegion(newID, waitRegionSplit) + if err != nil { + err2 := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { + idPath := endpoint.KeyspaceIDPath(request.Name) + metaPath := endpoint.KeyspaceMetaPath(newID) + e := txn.Remove(idPath) + if e != nil { + return e + } + return txn.Remove(metaPath) + }) + if err2 != nil { + log.Warn("[keyspace] failed to remove pre-created keyspace after split failed", + zap.Uint32("keyspace-id", keyspace.GetId()), + zap.String("name", keyspace.GetName()), + zap.Error(err2), + ) + } + return nil, err + } + // enable the keyspace metadata after split. + keyspace.State = keyspacepb.KeyspaceState_ENABLED + _, err = manager.UpdateKeyspaceStateByID(newID, keyspacepb.KeyspaceState_ENABLED, request.CreateTime) if err != nil { log.Warn("[keyspace] failed to create keyspace", zap.Uint32("keyspace-id", keyspace.GetId()), diff --git a/pkg/statistics/region.go b/pkg/statistics/region.go index 51030b676d2..f39c58ed81c 100644 --- a/pkg/statistics/region.go +++ b/pkg/statistics/region.go @@ -23,6 +23,7 @@ type RegionStats struct { Count int `json:"count"` EmptyCount int `json:"empty_count"` StorageSize int64 `json:"storage_size"` + UserStorageSize int64 `json:"user_storage_size"` StorageKeys int64 `json:"storage_keys"` StoreLeaderCount map[uint64]int `json:"store_leader_count"` StorePeerCount map[uint64]int `json:"store_peer_count"` @@ -57,10 +58,12 @@ func (s *RegionStats) Observe(r *core.RegionInfo) { s.Count++ approximateKeys := r.GetApproximateKeys() approximateSize := r.GetApproximateSize() + approximateKvSize := r.GetApproximateKvSize() if approximateSize <= core.EmptyRegionApproximateSize { s.EmptyCount++ } s.StorageSize += approximateSize + s.UserStorageSize += approximateKvSize s.StorageKeys += approximateKeys leader := r.GetLeader() if leader != nil { diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index 359d9106b23..c362041ca00 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -77,9 +77,11 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) { storeAddress := store.GetAddress() id := strconv.FormatUint(store.GetID(), 10) // Store state. + isDown := false switch store.GetNodeState() { case metapb.NodeState_Preparing, metapb.NodeState_Serving: if store.DownTime() >= s.opt.GetMaxStoreDownTime() { + isDown = true s.Down++ } else if store.IsUnhealthy() { s.Unhealthy++ @@ -104,7 +106,8 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) { s.resetStoreStatistics(storeAddress, id) return } - if store.IsLowSpace(s.opt.GetLowSpaceRatio()) { + + if !isDown && store.IsLowSpace(s.opt.GetLowSpaceRatio()) { s.LowSpace++ } diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index 845a5d7ab16..767aeff77a6 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -275,12 +275,13 @@ func (txn *etcdTxn) LoadRange(key, endKey string, limit int) (keys []string, val return keys, values, err } -// commit perform the operations on etcd, with pre-condition that values observed by user has not been changed. +// commit perform the operations on etcd, with pre-condition that values observed by user have not been changed. func (txn *etcdTxn) commit() error { - baseTxn := txn.kv.client.Txn(txn.ctx) - baseTxn.If(txn.conditions...) - baseTxn.Then(txn.operations...) - resp, err := baseTxn.Commit() + // Using slowLogTxn to commit transaction. + slowLogTxn := NewSlowLogTxn(txn.kv.client) + slowLogTxn.If(txn.conditions...) + slowLogTxn.Then(txn.operations...) + resp, err := slowLogTxn.Commit() if err != nil { return err } diff --git a/server/api/admin.go b/server/api/admin.go index 334d1882a66..c81193f1468 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -21,10 +21,13 @@ import ( "strconv" "github.com/gorilla/mux" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" "github.com/unrolled/render" + "go.uber.org/zap" ) type adminHandler struct { @@ -59,6 +62,43 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.") } +// @Tags admin +// @Summary Remove target region from region cache and storage. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {string} string "The region is removed from server storage." +// @Failure 400 {string} string "The input is invalid." +// @Router /admin/storage/region/{id} [delete] +func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + vars := mux.Vars(r) + regionIDStr := vars["id"] + regionID, err := strconv.ParseUint(regionIDStr, 10, 64) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + targetRegion := rc.GetRegion(regionID) + if targetRegion == nil { + h.rd.JSON(w, http.StatusBadRequest, "failed to get target region from cache") + return + } + + // Remove region from storage + if err = rc.GetStorage().DeleteRegion(targetRegion.GetMeta()); err != nil { + log.Error("failed to delete region from storage", + zap.Uint64("region-id", targetRegion.GetID()), + zap.Stringer("region-meta", core.RegionToHexMeta(targetRegion.GetMeta())), + errs.ZapError(err)) + h.rd.JSON(w, http.StatusOK, "failed to delete region from storage.") + return + } + // Remove region from cache. + rc.DropCacheRegion(regionID) + + h.rd.JSON(w, http.StatusOK, "The region is removed from server cache and region meta storage.") +} + // @Tags admin // @Summary Drop all regions from cache. // @Produce json diff --git a/server/api/router.go b/server/api/router.go index 02f0621f8da..1e0d12d53b6 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -297,6 +297,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { adminHandler := newAdminHandler(svr, rd) registerFunc(clusterRouter, "/admin/cache/region/{id}", adminHandler.DeleteRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) + registerFunc(clusterRouter, "/admin/storage/region/{id}", adminHandler.DeleteRegionStorage, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/admin/cache/regions", adminHandler.DeleteAllRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.IsSnapshotRecovering, setMethods(http.MethodGet), setAuditBackend(localLog, prometheus)) diff --git a/server/api/stats_test.go b/server/api/stats_test.go index 60391b68de0..4003f53ed9e 100644 --- a/server/api/stats_test.go +++ b/server/api/stats_test.go @@ -74,6 +74,7 @@ func (suite *statsTestSuite) TestRegionStats() { }, &metapb.Peer{Id: 101, StoreId: 1}, core.SetApproximateSize(100), + core.SetApproximateKvSize(80), core.SetApproximateKeys(50), ), core.NewRegionInfo( @@ -90,6 +91,7 @@ func (suite *statsTestSuite) TestRegionStats() { }, &metapb.Peer{Id: 105, StoreId: 4}, core.SetApproximateSize(200), + core.SetApproximateKvSize(180), core.SetApproximateKeys(150), ), core.NewRegionInfo( @@ -105,6 +107,7 @@ func (suite *statsTestSuite) TestRegionStats() { }, &metapb.Peer{Id: 107, StoreId: 5}, core.SetApproximateSize(1), + core.SetApproximateKvSize(1), core.SetApproximateKeys(1), ), core.NewRegionInfo( @@ -119,6 +122,7 @@ func (suite *statsTestSuite) TestRegionStats() { }, &metapb.Peer{Id: 108, StoreId: 4}, core.SetApproximateSize(50), + core.SetApproximateKvSize(30), core.SetApproximateKeys(20), ), } @@ -139,6 +143,7 @@ func (suite *statsTestSuite) TestRegionStats() { Count: 4, EmptyCount: 1, StorageSize: 351, + UserStorageSize: 291, StorageKeys: 221, StoreLeaderCount: map[uint64]int{1: 1, 4: 2, 5: 1}, StorePeerCount: map[uint64]int{1: 3, 2: 1, 3: 1, 4: 2, 5: 2}, @@ -152,6 +157,7 @@ func (suite *statsTestSuite) TestRegionStats() { Count: 2, EmptyCount: 1, StorageSize: 201, + UserStorageSize: 181, StorageKeys: 151, StoreLeaderCount: map[uint64]int{4: 1, 5: 1}, StorePeerCount: map[uint64]int{1: 2, 4: 1, 5: 2},