From ccb0bba4e10b45a04f9f79a2cf39ed329f2bec3c Mon Sep 17 00:00:00 2001 From: ystaticy Date: Thu, 25 May 2023 14:29:39 +0800 Subject: [PATCH 1/2] Introduce keyspace safepoint interface (#6419) ref tikv/pd#6487 Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> --- client/client.go | 2 + client/errs/errno.go | 29 +-- client/gc_client.go | 137 ++++++++++++ client/go.mod | 2 +- client/go.sum | 4 +- client/metrics.go | 8 + go.mod | 2 +- go.sum | 4 +- {server => pkg}/gc/safepoint.go | 0 {server => pkg}/gc/safepoint_test.go | 0 pkg/gc/safepoint_v2.go | 214 +++++++++++++++++++ pkg/keyspace/keyspace.go | 2 +- pkg/keyspace/util.go | 4 +- pkg/storage/endpoint/gc_key_space.go | 203 ------------------ pkg/storage/endpoint/gc_safe_point.go | 8 +- pkg/storage/endpoint/key_path.go | 84 ++++---- pkg/storage/endpoint/safepoint_v2.go | 220 ++++++++++++++++++++ pkg/storage/storage.go | 2 +- pkg/storage/storage_gc_test.go | 191 ++++++----------- server/cluster/cluster.go | 2 + server/gc_service.go | 199 ++++++++++++++++++ server/grpc_service.go | 29 +-- server/server.go | 10 +- tests/integrations/client/gc_client_test.go | 208 ++++++++++++++++++ tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 +- tests/integrations/mcs/go.mod | 3 +- tests/integrations/mcs/go.sum | 7 +- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 6 +- tools/pd-tso-bench/go.sum | 4 +- 31 files changed, 1154 insertions(+), 438 deletions(-) create mode 100644 client/gc_client.go rename {server => pkg}/gc/safepoint.go (100%) rename {server => pkg}/gc/safepoint_test.go (100%) create mode 100644 pkg/gc/safepoint_v2.go delete mode 100644 pkg/storage/endpoint/gc_key_space.go create mode 100644 pkg/storage/endpoint/safepoint_v2.go create mode 100644 server/gc_service.go create mode 100644 tests/integrations/client/gc_client_test.go diff --git a/client/client.go b/client/client.go index 298896f98ac8..631dfb39b0d0 100644 --- a/client/client.go +++ b/client/client.go @@ -151,6 +151,8 @@ type Client interface { MetaStorageClient // KeyspaceClient manages keyspace metadata. KeyspaceClient + // GCClient manages gcSafePointV2 and serviceSafePointV2 + GCClient // ResourceManagerClient manages resource group metadata and token assignment. ResourceManagerClient // Close closes the client. diff --git a/client/errs/errno.go b/client/errs/errno.go index 73bbd41541ec..a40650047650 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -38,20 +38,21 @@ const ( // client errors var ( - ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient")) - ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) - ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) - ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) - ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) - ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO")) - ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader")) - ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) - ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo")) - ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember")) - ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal")) - ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse")) - ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint")) - ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID")) + ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient")) + ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) + ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) + ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) + ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) + ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO")) + ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader")) + ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) + ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo")) + ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember")) + ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal")) + ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse")) + ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint")) + ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID")) + ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed, %s", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream")) ) // grpcutil errors diff --git a/client/gc_client.go b/client/gc_client.go new file mode 100644 index 000000000000..c573836d2ba8 --- /dev/null +++ b/client/gc_client.go @@ -0,0 +1,137 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/grpcutil" + "go.uber.org/zap" +) + +// GCClient is a client for doing GC +type GCClient interface { + UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) + UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) + WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) +} + +// UpdateGCSafePointV2 update gc safe point for the given keyspace. +func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &pdpb.UpdateGCSafePointV2Request{ + Header: c.requestHeader(), + KeyspaceId: keyspaceID, + SafePoint: safePoint, + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + protoClient := c.getClient() + if protoClient == nil { + cancel() + return 0, errs.ErrClientGetProtoClient + } + resp, err := protoClient.UpdateGCSafePointV2(ctx, req) + cancel() + + if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil { + return 0, err + } + return resp.GetNewSafePoint(), nil +} + +// UpdateServiceSafePointV2 update service safe point for the given keyspace. +func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &pdpb.UpdateServiceSafePointV2Request{ + Header: c.requestHeader(), + KeyspaceId: keyspaceID, + ServiceId: []byte(serviceID), + SafePoint: safePoint, + Ttl: ttl, + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + protoClient := c.getClient() + if protoClient == nil { + cancel() + return 0, errs.ErrClientGetProtoClient + } + resp, err := protoClient.UpdateServiceSafePointV2(ctx, req) + cancel() + if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil { + return 0, err + } + return resp.GetMinSafePoint(), nil +} + +// WatchGCSafePointV2 watch gc safe point change. +func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) { + SafePointEventsChan := make(chan []*pdpb.SafePointEvent) + req := &pdpb.WatchGCSafePointV2Request{ + Header: c.requestHeader(), + Revision: revision, + } + + protoClient := c.getClient() + if protoClient == nil { + return nil, errs.ErrClientGetProtoClient + } + stream, err := protoClient.WatchGCSafePointV2(ctx, req) + if err != nil { + close(SafePointEventsChan) + return nil, err + } + go func() { + defer func() { + close(SafePointEventsChan) + if r := recover(); r != nil { + log.Error("[pd] panic in gc client `WatchGCSafePointV2`", zap.Any("error", r)) + return + } + }() + for { + select { + case <-ctx.Done(): + return + default: + resp, err := stream.Recv() + if err != nil { + log.Error("watch gc safe point v2 error", errs.ZapError(errs.ErrClientWatchGCSafePointV2Stream, err)) + return + } + SafePointEventsChan <- resp.GetEvents() + } + } + }() + return SafePointEventsChan, err +} diff --git a/client/go.mod b/client/go.mod index 5e7bef4f4a1b..1dd6df7372cb 100644 --- a/client/go.mod +++ b/client/go.mod @@ -8,7 +8,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e + github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.8.2 diff --git a/client/go.sum b/client/go.sum index 6416449889f7..edf4ca1a40b0 100644 --- a/client/go.sum +++ b/client/go.sum @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/metrics.go b/client/metrics.go index d965304a5bba..e88711d1da92 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -141,6 +141,8 @@ var ( cmdDurationUpdateKeyspaceState prometheus.Observer cmdDurationGet prometheus.Observer cmdDurationPut prometheus.Observer + cmdDurationUpdateGCSafePointV2 prometheus.Observer + cmdDurationUpdateServiceSafePointV2 prometheus.Observer cmdFailDurationGetRegion prometheus.Observer cmdFailDurationTSO prometheus.Observer @@ -157,6 +159,8 @@ var ( requestDurationTSO prometheus.Observer cmdFailedDurationGet prometheus.Observer cmdFailedDurationPut prometheus.Observer + cmdFailedDurationUpdateGCSafePointV2 prometheus.Observer + cmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer ) func initCmdDurations() { @@ -182,6 +186,8 @@ func initCmdDurations() { cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state") cmdDurationGet = cmdDuration.WithLabelValues("get") cmdDurationPut = cmdDuration.WithLabelValues("put") + cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2") + cmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2") cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region") cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso") @@ -198,6 +204,8 @@ func initCmdDurations() { requestDurationTSO = requestDuration.WithLabelValues("tso") cmdFailedDurationGet = cmdFailedDuration.WithLabelValues("get") cmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put") + cmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2") + cmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2") } func registerMetrics() { diff --git a/go.mod b/go.mod index 1316ff144beb..d03d7e208c7d 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e + github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 diff --git a/go.sum b/go.sum index aafd65d09f28..6da0bd09dedd 100644 --- a/go.sum +++ b/go.sum @@ -422,8 +422,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/server/gc/safepoint.go b/pkg/gc/safepoint.go similarity index 100% rename from server/gc/safepoint.go rename to pkg/gc/safepoint.go diff --git a/server/gc/safepoint_test.go b/pkg/gc/safepoint_test.go similarity index 100% rename from server/gc/safepoint_test.go rename to pkg/gc/safepoint_test.go diff --git a/pkg/gc/safepoint_v2.go b/pkg/gc/safepoint_v2.go new file mode 100644 index 000000000000..f936601d2d99 --- /dev/null +++ b/pkg/gc/safepoint_v2.go @@ -0,0 +1,214 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/pingcap/log" + "github.com/pkg/errors" + "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/syncutil" + "go.uber.org/zap" +) + +var ( + // allowUpdateSafePoint specifies under which states is a keyspace allowed to update it's gc & service safe points. + allowUpdateSafePoint = []keyspacepb.KeyspaceState{ + keyspacepb.KeyspaceState_ENABLED, + keyspacepb.KeyspaceState_DISABLED, + } +) + +// SafePointV2Manager is the manager for GCSafePointV2 and ServiceSafePointV2. +type SafePointV2Manager struct { + *syncutil.LockGroup + ctx context.Context + // keyspaceStorage stores keyspace meta. + keyspaceStorage endpoint.KeyspaceStorage + // v2Storage is the storage GCSafePointV2 and ServiceSafePointV2. + v2Storage endpoint.SafePointV2Storage + // v1Storage is the storage for v1 format GCSafePoint and ServiceGCSafePoint, it's used during pd update. + v1Storage endpoint.GCSafePointStorage +} + +// NewSafePointManagerV2 returns a new SafePointV2Manager. +func NewSafePointManagerV2( + ctx context.Context, + keyspaceStore endpoint.KeyspaceStorage, + v2Storage endpoint.SafePointV2Storage, + v1Storage endpoint.GCSafePointStorage, +) *SafePointV2Manager { + return &SafePointV2Manager{ + ctx: ctx, + LockGroup: syncutil.NewLockGroup(syncutil.WithHash(keyspace.MaskKeyspaceID)), + keyspaceStorage: keyspaceStore, + v2Storage: v2Storage, + v1Storage: v1Storage, + } +} + +// LoadGCSafePoint returns GCSafePointV2 of keyspaceID. +func (manager *SafePointV2Manager) LoadGCSafePoint(keyspaceID uint32) (*endpoint.GCSafePointV2, error) { + manager.Lock(keyspaceID) + defer manager.Unlock(keyspaceID) + // Check if keyspace is valid to load. + if err := manager.checkKeyspace(keyspaceID, false); err != nil { + return nil, err + } + gcSafePoint, err := manager.getGCSafePoint(keyspaceID) + if err != nil { + log.Warn("failed to load gc safe point", + zap.Uint32("keyspace-id", keyspaceID), + zap.Error(err), + ) + return nil, err + } + return gcSafePoint, nil +} + +// checkKeyspace check if target keyspace exists, and if request is a update request, +// also check if keyspace state allows for update. +func (manager *SafePointV2Manager) checkKeyspace(keyspaceID uint32, updateRequest bool) error { + failpoint.Inject("checkKeyspace", func() { + failpoint.Return(nil) + }) + + err := manager.keyspaceStorage.RunInTxn(manager.ctx, func(txn kv.Txn) error { + meta, err := manager.keyspaceStorage.LoadKeyspaceMeta(txn, keyspaceID) + if err != nil { + return err + } + // If a keyspace does not exist, then loading its gc safe point is prohibited. + if meta == nil { + return keyspace.ErrKeyspaceNotFound + } + // If keyspace's state does not permit updating safe point, we return error. + if updateRequest && !slice.Contains(allowUpdateSafePoint, meta.GetState()) { + return errors.Errorf("cannot update keyspace that's %s", meta.GetState().String()) + } + return nil + }) + if err != nil { + log.Warn("check keyspace failed", + zap.Uint32("keyspace-id", keyspaceID), + zap.Error(err), + ) + } + return err +} + +// getGCSafePoint first try to load gc safepoint from v2 storage, if failed, load from v1 storage instead. +func (manager *SafePointV2Manager) getGCSafePoint(keyspaceID uint32) (*endpoint.GCSafePointV2, error) { + v2SafePoint, err := manager.v2Storage.LoadGCSafePointV2(keyspaceID) + if err != nil { + return nil, err + } + // If failed to find a valid safe point, check if a safe point exist in v1 storage, and use it. + if v2SafePoint.SafePoint == 0 { + v1SafePoint, err := manager.v1Storage.LoadGCSafePoint() + if err != nil { + return nil, err + } + log.Info("keyspace does not have a gc safe point, using v1 gc safe point instead", + zap.Uint32("keyspace-id", keyspaceID), + zap.Uint64("gc-safe-point-v1", v1SafePoint)) + v2SafePoint.SafePoint = v1SafePoint + } + return v2SafePoint, nil +} + +// UpdateGCSafePoint is used to update gc safe point for given keyspace. +func (manager *SafePointV2Manager) UpdateGCSafePoint(gcSafePoint *endpoint.GCSafePointV2) (oldGCSafePoint *endpoint.GCSafePointV2, err error) { + manager.Lock(gcSafePoint.KeyspaceID) + defer manager.Unlock(gcSafePoint.KeyspaceID) + // Check if keyspace is valid to load. + if err = manager.checkKeyspace(gcSafePoint.KeyspaceID, true); err != nil { + return + } + oldGCSafePoint, err = manager.getGCSafePoint(gcSafePoint.KeyspaceID) + if err != nil { + return + } + if oldGCSafePoint.SafePoint >= gcSafePoint.SafePoint { + return + } + err = manager.v2Storage.SaveGCSafePointV2(gcSafePoint) + return +} + +// UpdateServiceSafePoint update keyspace service safe point with the given serviceSafePoint. +func (manager *SafePointV2Manager) UpdateServiceSafePoint(serviceSafePoint *endpoint.ServiceSafePointV2, now time.Time) (*endpoint.ServiceSafePointV2, error) { + manager.Lock(serviceSafePoint.KeyspaceID) + defer manager.Unlock(serviceSafePoint.KeyspaceID) + // Check if keyspace is valid to update. + if err := manager.checkKeyspace(serviceSafePoint.KeyspaceID, true); err != nil { + return nil, err + } + minServiceSafePoint, err := manager.v2Storage.LoadMinServiceSafePointV2(serviceSafePoint.KeyspaceID, now) + if err != nil { + return nil, err + } + if serviceSafePoint.SafePoint < minServiceSafePoint.SafePoint { + log.Warn("failed to update service safe point, proposed safe point smaller than current min", + zap.Error(err), + zap.Uint32("keyspace-id", serviceSafePoint.KeyspaceID), + zap.Uint64("request-service-safe-point", serviceSafePoint.SafePoint), + zap.Uint64("min-service-safe-point", minServiceSafePoint.SafePoint), + ) + return minServiceSafePoint, nil + } + if err = manager.v2Storage.SaveServiceSafePointV2(serviceSafePoint); err != nil { + return nil, err + } + // If the updated safe point is the original min safe point, reload min safe point. + if serviceSafePoint.ServiceID == minServiceSafePoint.ServiceID { + minServiceSafePoint, err = manager.v2Storage.LoadMinServiceSafePointV2(serviceSafePoint.KeyspaceID, now) + } + if err != nil { + log.Info("update service safe point", + zap.String("service-id", serviceSafePoint.ServiceID), + zap.Int64("expire-at", serviceSafePoint.ExpiredAt), + zap.Uint64("safepoint", serviceSafePoint.SafePoint), + ) + } + return minServiceSafePoint, err +} + +// RemoveServiceSafePoint remove keyspace service safe point with the given keyspaceID and serviceID. +func (manager *SafePointV2Manager) RemoveServiceSafePoint(keyspaceID uint32, serviceID string, now time.Time) (*endpoint.ServiceSafePointV2, error) { + manager.Lock(keyspaceID) + defer manager.Unlock(keyspaceID) + // Check if keyspace is valid to update. + if err := manager.checkKeyspace(keyspaceID, true); err != nil { + return nil, err + } + // Remove target safe point. + if err := manager.v2Storage.RemoveServiceSafePointV2(keyspaceID, serviceID); err != nil { + return nil, err + } + // Load min safe point. + minServiceSafePoint, err := manager.v2Storage.LoadMinServiceSafePointV2(keyspaceID, now) + if err != nil { + return nil, err + } + return minServiceSafePoint, nil +} diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 4c180736a8fd..21b050839904 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -105,7 +105,7 @@ func NewKeyspaceManager( ) *Manager { return &Manager{ ctx: ctx, - metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)), + metaLock: syncutil.NewLockGroup(syncutil.WithHash(MaskKeyspaceID)), idAllocator: idAllocator, store: store, cluster: cluster, diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 4d5569701f4f..ac27f3aafd47 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -105,12 +105,12 @@ func validateName(name string) error { return nil } -// keyspaceIDHash is used to hash the spaceID inside the lockGroup. +// MaskKeyspaceID is used to hash the spaceID inside the lockGroup. // A simple mask is applied to spaceID to use its last byte as map key, // limiting the maximum map length to 256. // Since keyspaceID is sequentially allocated, this can also reduce the chance // of collision when comparing with random hashes. -func keyspaceIDHash(id uint32) uint32 { +func MaskKeyspaceID(id uint32) uint32 { return id & 0xFF } diff --git a/pkg/storage/endpoint/gc_key_space.go b/pkg/storage/endpoint/gc_key_space.go deleted file mode 100644 index 039c314cc304..000000000000 --- a/pkg/storage/endpoint/gc_key_space.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package endpoint - -import ( - "encoding/json" - "math" - "strconv" - "strings" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/log" - "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/utils/logutil" - "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" -) - -// KeyspaceGCSafePoint is gcWorker's safepoint for specific key-space -type KeyspaceGCSafePoint struct { - SpaceID string `json:"space_id"` - SafePoint uint64 `json:"safe_point,omitempty"` -} - -// KeyspaceGCSafePointStorage defines the storage operations on Keyspaces' safe points -type KeyspaceGCSafePointStorage interface { - // Service safe point interfaces. - SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error - LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) - LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) - RemoveServiceSafePoint(spaceID, serviceID string) error - // GC safe point interfaces. - SaveKeyspaceGCSafePoint(spaceID string, safePoint uint64) error - LoadKeyspaceGCSafePoint(spaceID string) (uint64, error) - LoadAllKeyspaceGCSafePoints(withGCSafePoint bool) ([]*KeyspaceGCSafePoint, error) -} - -var _ KeyspaceGCSafePointStorage = (*StorageEndpoint)(nil) - -// SaveServiceSafePoint saves service safe point under given key-space. -func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error { - if ssp.ServiceID == "" { - return errors.New("service id of service safepoint cannot be empty") - } - key := KeyspaceServiceSafePointPath(spaceID, ssp.ServiceID) - value, err := json.Marshal(ssp) - if err != nil { - return err - } - return se.Save(key, string(value)) -} - -// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name. -// Return nil if no safepoint exist for given service or just expired. -func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) { - key := KeyspaceServiceSafePointPath(spaceID, serviceID) - value, err := se.Load(key) - if err != nil || value == "" { - return nil, err - } - ssp := &ServiceSafePoint{} - if err := json.Unmarshal([]byte(value), ssp); err != nil { - return nil, err - } - if ssp.ExpiredAt < time.Now().Unix() { - go func() { - defer logutil.LogPanic() - - if err = se.Remove(key); err != nil { - log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) - } - }() - return nil, nil - } - return ssp, nil -} - -// LoadMinServiceSafePoint returns the minimum safepoint for the given key-space. -// Note that gc worker safe point are store separately. -// If no service safe point exist for the given key-space or all the service safe points just expired, return nil. -func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) { - prefix := KeyspaceServiceSafePointPrefix(spaceID) - prefixEnd := clientv3.GetPrefixRangeEnd(prefix) - keys, values, err := se.LoadRange(prefix, prefixEnd, 0) - if err != nil { - return nil, err - } - min := &ServiceSafePoint{SafePoint: math.MaxUint64} - expiredKeys := make([]string, 0) - for i, key := range keys { - ssp := &ServiceSafePoint{} - if err = json.Unmarshal([]byte(values[i]), ssp); err != nil { - return nil, err - } - - // gather expired keys - if ssp.ExpiredAt < now.Unix() { - expiredKeys = append(expiredKeys, key) - continue - } - if ssp.SafePoint < min.SafePoint { - min = ssp - } - } - // failpoint for immediate removal - failpoint.Inject("removeExpiredKeys", func() { - for _, key := range expiredKeys { - if err = se.Remove(key); err != nil { - log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) - } - } - expiredKeys = []string{} - }) - // remove expired keys asynchronously - go func() { - defer logutil.LogPanic() - - for _, key := range expiredKeys { - if err = se.Remove(key); err != nil { - log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) - } - } - }() - if min.SafePoint == math.MaxUint64 { - // no service safe point or all of them are expired. - return nil, nil - } - - // successfully found a valid min safe point. - return min, nil -} - -// RemoveServiceSafePoint removes target ServiceSafePoint -func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error { - key := KeyspaceServiceSafePointPath(spaceID, serviceID) - return se.Remove(key) -} - -// SaveKeyspaceGCSafePoint saves GCSafePoint to the given key-space. -func (se *StorageEndpoint) SaveKeyspaceGCSafePoint(spaceID string, safePoint uint64) error { - value := strconv.FormatUint(safePoint, 16) - return se.Save(KeyspaceGCSafePointPath(spaceID), value) -} - -// LoadKeyspaceGCSafePoint reads GCSafePoint for the given key-space. -// Returns 0 if target safepoint not exist. -func (se *StorageEndpoint) LoadKeyspaceGCSafePoint(spaceID string) (uint64, error) { - value, err := se.Load(KeyspaceGCSafePointPath(spaceID)) - if err != nil || value == "" { - return 0, err - } - safePoint, err := strconv.ParseUint(value, 16, 64) - if err != nil { - return 0, err - } - return safePoint, nil -} - -// LoadAllKeyspaceGCSafePoints returns slice of KeyspaceGCSafePoint. -// If withGCSafePoint set to false, returned safePoints will be 0. -func (se *StorageEndpoint) LoadAllKeyspaceGCSafePoints(withGCSafePoint bool) ([]*KeyspaceGCSafePoint, error) { - prefix := KeyspaceSafePointPrefix() - prefixEnd := clientv3.GetPrefixRangeEnd(prefix) - suffix := KeyspaceGCSafePointSuffix() - keys, values, err := se.LoadRange(prefix, prefixEnd, 0) - if err != nil { - return nil, err - } - safePoints := make([]*KeyspaceGCSafePoint, 0, len(values)) - for i := range keys { - // skip non gc safe points - if !strings.HasSuffix(keys[i], suffix) { - continue - } - safePoint := &KeyspaceGCSafePoint{} - spaceID := strings.TrimPrefix(keys[i], prefix) - spaceID = strings.TrimSuffix(spaceID, suffix) - safePoint.SpaceID = spaceID - if withGCSafePoint { - value, err := strconv.ParseUint(values[i], 16, 64) - if err != nil { - return nil, err - } - safePoint.SafePoint = value - } - safePoints = append(safePoints, safePoint) - } - return safePoints, nil -} diff --git a/pkg/storage/endpoint/gc_safe_point.go b/pkg/storage/endpoint/gc_safe_point.go index e213eca4ed5d..db5c58205c8a 100644 --- a/pkg/storage/endpoint/gc_safe_point.go +++ b/pkg/storage/endpoint/gc_safe_point.go @@ -87,7 +87,7 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { return nil, err } - if ssp.ServiceID == gcWorkerServiceSafePointID { + if ssp.ServiceID == GCWorkerServiceSafePointID { hasGCWorker = true // If gc_worker's expire time is incorrectly set, fix it. if ssp.ExpiredAt != math.MaxInt64 { @@ -125,7 +125,7 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf func (se *StorageEndpoint) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) { ssp := &ServiceSafePoint{ - ServiceID: gcWorkerServiceSafePointID, + ServiceID: GCWorkerServiceSafePointID, SafePoint: initialValue, ExpiredAt: math.MaxInt64, } @@ -165,7 +165,7 @@ func (se *StorageEndpoint) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { return errors.New("service id of service safepoint cannot be empty") } - if ssp.ServiceID == gcWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 { + if ssp.ServiceID == GCWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 { return errors.New("TTL of gc_worker's service safe point must be infinity") } @@ -180,7 +180,7 @@ func (se *StorageEndpoint) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { // RemoveServiceGCSafePoint removes a GC safepoint for the service func (se *StorageEndpoint) RemoveServiceGCSafePoint(serviceID string) error { - if serviceID == gcWorkerServiceSafePointID { + if serviceID == GCWorkerServiceSafePointID { return errors.New("cannot remove service safe point of gc_worker") } key := gcSafePointServicePath(serviceID) diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index f84459fbd7d4..9ae8a2850858 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -25,17 +25,18 @@ import ( ) const ( - clusterPath = "raft" - configPath = "config" - serviceMiddlewarePath = "service_middleware" - schedulePath = "schedule" - gcPath = "gc" - rulesPath = "rules" - ruleGroupPath = "rule_group" - regionLabelPath = "region_label" - replicationPath = "replication_mode" - customScheduleConfigPath = "scheduler_config" - gcWorkerServiceSafePointID = "gc_worker" + clusterPath = "raft" + configPath = "config" + serviceMiddlewarePath = "service_middleware" + schedulePath = "schedule" + gcPath = "gc" + rulesPath = "rules" + ruleGroupPath = "rule_group" + regionLabelPath = "region_label" + replicationPath = "replication_mode" + customScheduleConfigPath = "scheduler_config" + // GCWorkerServiceSafePointID is the service id of GC worker. + GCWorkerServiceSafePointID = "gc_worker" minResolvedTS = "min_resolved_ts" externalTimeStamp = "external_timestamp" keyspaceSafePointPrefix = "keyspaces/gc_safepoint" @@ -44,6 +45,8 @@ const ( keyspaceMetaInfix = "meta" keyspaceIDInfix = "id" keyspaceAllocID = "alloc_id" + gcSafePointInfix = "gc_safe_point" + serviceSafePointInfix = "service_safe_point" regionPathPrefix = "raft/r" // resource group storage endpoint has prefix `resource_group` resourceGroupSettingsPath = "settings" @@ -165,34 +168,29 @@ func ExternalTimestampPath() string { return path.Join(clusterPath, externalTimeStamp) } -// KeyspaceServiceSafePointPrefix returns the prefix of given service's service safe point. -// Prefix: /keyspaces/gc_safepoint/{space_id}/service/ -func KeyspaceServiceSafePointPrefix(spaceID string) string { - return path.Join(keyspaceSafePointPrefix, spaceID, "service") + "/" +// GCSafePointV2Path is the storage path of gc safe point v2. +// Path: keyspaces/gc_safe_point/{keyspaceID} +func GCSafePointV2Path(keyspaceID uint32) string { + return buildPath(false, keyspacePrefix, gcSafePointInfix, EncodeKeyspaceID(keyspaceID)) } -// KeyspaceGCSafePointPath returns the gc safe point's path of the given key-space. -// Path: /keyspaces/gc_safepoint/{space_id}/gc -func KeyspaceGCSafePointPath(spaceID string) string { - return path.Join(keyspaceSafePointPrefix, spaceID, keyspaceGCSafePointSuffix) +// GCSafePointV2Prefix is the path prefix to all gc safe point v2. +// Prefix: keyspaces/gc_safe_point/ +func GCSafePointV2Prefix() string { + return buildPath(true, keyspacePrefix, gcSafePointInfix) } -// KeyspaceServiceSafePointPath returns the path of given service's service safe point. -// Path: /keyspaces/gc_safepoint/{space_id}/service/{service_id} -func KeyspaceServiceSafePointPath(spaceID, serviceID string) string { - return path.Join(KeyspaceServiceSafePointPrefix(spaceID), serviceID) +// ServiceSafePointV2Path is the storage path of service safe point v2. +// Path: keyspaces/service_safe_point/{spaceID}/{serviceID} +func ServiceSafePointV2Path(keyspaceID uint32, serviceID string) string { + return buildPath(false, keyspacePrefix, serviceSafePointInfix, EncodeKeyspaceID(keyspaceID), serviceID) } -// KeyspaceSafePointPrefix returns prefix for all key-spaces' safe points. -// Path: /keyspaces/gc_safepoint/ -func KeyspaceSafePointPrefix() string { - return keyspaceSafePointPrefix + "/" -} - -// KeyspaceGCSafePointSuffix returns the suffix for any gc safepoint. -// Postfix: /gc -func KeyspaceGCSafePointSuffix() string { - return "/" + keyspaceGCSafePointSuffix +// ServiceSafePointV2Prefix is the path prefix of all service safe point that belongs to a specific keyspace. +// Can be used to retrieve keyspace's service safe point at once. +// Path: keyspaces/service_safe_point/{spaceID}/ +func ServiceSafePointV2Prefix(keyspaceID uint32) string { + return buildPath(true, keyspacePrefix, serviceSafePointInfix, EncodeKeyspaceID(keyspaceID)) } // KeyspaceMetaPrefix returns the prefix of keyspaces' metadata. @@ -204,7 +202,7 @@ func KeyspaceMetaPrefix() string { // KeyspaceMetaPath returns the path to the given keyspace's metadata. // Path: keyspaces/meta/{space_id} func KeyspaceMetaPath(spaceID uint32) string { - idStr := encodeKeyspaceID(spaceID) + idStr := EncodeKeyspaceID(spaceID) return path.Join(KeyspaceMetaPrefix(), idStr) } @@ -220,11 +218,11 @@ func KeyspaceIDAlloc() string { return path.Join(keyspacePrefix, keyspaceAllocID) } -// encodeKeyspaceID from uint32 to string. +// EncodeKeyspaceID from uint32 to string. // It adds extra padding to make encoded ID ordered. // Encoded ID can be decoded directly with strconv.ParseUint. // Width of the padded keyspaceID is 8 (decimal representation of uint24max is 16777215). -func encodeKeyspaceID(spaceID uint32) string { +func EncodeKeyspaceID(spaceID uint32) string { return fmt.Sprintf("%08d", spaceID) } @@ -260,3 +258,17 @@ func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) { func encodeKeyspaceGroupID(groupID uint32) string { return fmt.Sprintf("%05d", groupID) } + +func buildPath(withSuffix bool, str ...string) string { + var sb strings.Builder + for i := 0; i < len(str); i++ { + if i != 0 { + sb.WriteString("/") + } + sb.WriteString(str[i]) + } + if withSuffix { + sb.WriteString("/") + } + return sb.String() +} diff --git a/pkg/storage/endpoint/safepoint_v2.go b/pkg/storage/endpoint/safepoint_v2.go new file mode 100644 index 000000000000..cac2606a470c --- /dev/null +++ b/pkg/storage/endpoint/safepoint_v2.go @@ -0,0 +1,220 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "encoding/json" + "math" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// GCSafePointV2 represents the overall safe point for a specific keyspace. +type GCSafePointV2 struct { + KeyspaceID uint32 `json:"keyspace_id"` + SafePoint uint64 `json:"safe_point"` +} + +// ServiceSafePointV2 represents a service's safepoint under a specific keyspace. +// Services can post service safe point to prevent gc safe point from incrementing. +type ServiceSafePointV2 struct { + KeyspaceID uint32 `json:"keyspace_id"` + ServiceID string `json:"service_id"` + ExpiredAt int64 `json:"expired_at"` + SafePoint uint64 `json:"safe_point"` +} + +// SafePointV2Storage defines the storage operations on safe point v2. +type SafePointV2Storage interface { + LoadGCSafePointV2(keyspaceID uint32) (*GCSafePointV2, error) + SaveGCSafePointV2(gcSafePoint *GCSafePointV2) error + LoadAllGCSafePoints() ([]*GCSafePointV2, error) + + LoadMinServiceSafePointV2(keyspaceID uint32, now time.Time) (*ServiceSafePointV2, error) + LoadServiceSafePointV2(keyspaceID uint32, serviceID string) (*ServiceSafePointV2, error) + + SaveServiceSafePointV2(serviceSafePoint *ServiceSafePointV2) error + RemoveServiceSafePointV2(keyspaceID uint32, serviceID string) error +} + +var _ SafePointV2Storage = (*StorageEndpoint)(nil) + +// LoadGCSafePointV2 loads gc safe point for the given keyspace. +func (se *StorageEndpoint) LoadGCSafePointV2(keyspaceID uint32) (*GCSafePointV2, error) { + key := GCSafePointV2Path(keyspaceID) + value, err := se.Load(key) + if err != nil { + return nil, err + } + // GC Safe Point does not exist for the given keyspace + if value == "" { + return &GCSafePointV2{ + KeyspaceID: keyspaceID, + SafePoint: 0, + }, nil + } + gcSafePoint := &GCSafePointV2{} + if err = json.Unmarshal([]byte(value), gcSafePoint); err != nil { + return nil, errs.ErrJSONUnmarshal.Wrap(err).GenWithStackByCause() + } + return gcSafePoint, nil +} + +// SaveGCSafePointV2 saves gc safe point for the given keyspace. +func (se *StorageEndpoint) SaveGCSafePointV2(gcSafePoint *GCSafePointV2) error { + key := GCSafePointV2Path(gcSafePoint.KeyspaceID) + value, err := json.Marshal(gcSafePoint) + if err != nil { + return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() + } + return se.Save(key, string(value)) +} + +// LoadAllGCSafePoints returns gc safe point for all keyspaces +func (se *StorageEndpoint) LoadAllGCSafePoints() ([]*GCSafePointV2, error) { + prefix := GCSafePointV2Prefix() + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + _, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return nil, err + } + gcSafePoints := make([]*GCSafePointV2, 0, len(values)) + for _, value := range values { + gcSafePoint := &GCSafePointV2{} + if err = json.Unmarshal([]byte(value), gcSafePoint); err != nil { + return nil, errs.ErrJSONUnmarshal.Wrap(err).GenWithStackByCause() + } + gcSafePoints = append(gcSafePoints, gcSafePoint) + } + return gcSafePoints, nil +} + +// LoadMinServiceSafePointV2 returns the minimum safepoint for the given keyspace. +// If no service safe point exist for the given key space or all the service safe points just expired, return nil. +// This also attempt to remove expired service safe point. +func (se *StorageEndpoint) LoadMinServiceSafePointV2(keyspaceID uint32, now time.Time) (*ServiceSafePointV2, error) { + prefix := ServiceSafePointV2Prefix(keyspaceID) + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + keys, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return nil, err + } + if len(keys) == 0 { + return se.initServiceSafePointV2ForGCWorker(keyspaceID, 0) + } + + hasGCWorker := false + min := &ServiceSafePointV2{KeyspaceID: keyspaceID, SafePoint: math.MaxUint64} + for i, key := range keys { + serviceSafePoint := &ServiceSafePointV2{} + if err = json.Unmarshal([]byte(values[i]), serviceSafePoint); err != nil { + return nil, err + } + if serviceSafePoint.ServiceID == GCWorkerServiceSafePointID { + hasGCWorker = true + // If gc_worker's expire time is incorrectly set, fix it. + if serviceSafePoint.ExpiredAt != math.MaxInt64 { + serviceSafePoint.ExpiredAt = math.MaxInt64 + err = se.SaveServiceSafePointV2(serviceSafePoint) + if err != nil { + return nil, errors.Trace(err) + } + } + } + if serviceSafePoint.ExpiredAt < now.Unix() { + if err = se.Remove(key); err != nil { + log.Warn("failed to remove expired service safe point", zap.Error(err)) + } + continue + } + if serviceSafePoint.SafePoint < min.SafePoint { + min = serviceSafePoint + } + } + if min.SafePoint == math.MaxUint64 { + // No service safe point or all of them are expired, set min service safe point to 0 to allow any update + log.Info("there are no valid service safepoints. init gc_worker's service safepoint to 0") + return se.initServiceSafePointV2ForGCWorker(keyspaceID, 0) + } + if !hasGCWorker { + // If there exists some service safepoints but gc_worker is missing, init it with the min value among all + // safepoints (including expired ones) + return se.initServiceSafePointV2ForGCWorker(keyspaceID, min.SafePoint) + } + return min, nil +} + +// LoadServiceSafePointV2 returns ServiceSafePointV2 for given keyspaceID and serviceID. +func (se *StorageEndpoint) LoadServiceSafePointV2(keyspaceID uint32, serviceID string) (*ServiceSafePointV2, error) { + key := ServiceSafePointV2Path(keyspaceID, serviceID) + value, err := se.Load(key) + if err != nil { + return nil, err + } + // Service Safe Point does not exist for the given keyspaceID and serviceID + if value == "" { + return nil, nil + } + serviceSafePoint := &ServiceSafePointV2{} + if err = json.Unmarshal([]byte(value), serviceSafePoint); err != nil { + return nil, err + } + return serviceSafePoint, nil +} + +func (se *StorageEndpoint) initServiceSafePointV2ForGCWorker(keyspaceID uint32, initialValue uint64) (*ServiceSafePointV2, error) { + ssp := &ServiceSafePointV2{ + KeyspaceID: keyspaceID, + ServiceID: GCWorkerServiceSafePointID, + SafePoint: initialValue, + ExpiredAt: math.MaxInt64, + } + if err := se.SaveServiceSafePointV2(ssp); err != nil { + return nil, err + } + return ssp, nil +} + +// SaveServiceSafePointV2 stores service safe point to etcd. +func (se *StorageEndpoint) SaveServiceSafePointV2(serviceSafePoint *ServiceSafePointV2) error { + if serviceSafePoint.ServiceID == "" { + return errors.New("service id of service safepoint cannot be empty") + } + + if serviceSafePoint.ServiceID == GCWorkerServiceSafePointID && serviceSafePoint.ExpiredAt != math.MaxInt64 { + return errors.New("TTL of gc_worker's service safe point must be infinity") + } + + key := ServiceSafePointV2Path(serviceSafePoint.KeyspaceID, serviceSafePoint.ServiceID) + value, err := json.Marshal(serviceSafePoint) + if err != nil { + return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() + } + return se.Save(key, string(value)) +} + +// RemoveServiceSafePointV2 removes a service safe point. +func (se *StorageEndpoint) RemoveServiceSafePointV2(keyspaceID uint32, serviceID string) error { + if serviceID == GCWorkerServiceSafePointID { + return errors.New("cannot remove service safe point of gc_worker") + } + key := ServiceSafePointV2Path(keyspaceID, serviceID) + return se.Remove(key) +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index da6a06c588a5..aba01dfa8063 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -40,7 +40,7 @@ type Storage interface { endpoint.GCSafePointStorage endpoint.MinResolvedTSStorage endpoint.ExternalTSStorage - endpoint.KeyspaceGCSafePointStorage + endpoint.SafePointV2Storage endpoint.KeyspaceStorage endpoint.ResourceGroupStorage endpoint.TSOStorage diff --git a/pkg/storage/storage_gc_test.go b/pkg/storage/storage_gc_test.go index 0825885e8c19..141777d441ed 100644 --- a/pkg/storage/storage_gc_test.go +++ b/pkg/storage/storage_gc_test.go @@ -24,62 +24,45 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" ) -func testGCSafePoints() ([]string, []uint64) { - spaceIDs := []string{ - "keySpace1", - "keySpace2", - "keySpace3", - "keySpace4", - "keySpace5", +func testGCSafePoints() []*endpoint.GCSafePointV2 { + gcSafePoint := []*endpoint.GCSafePointV2{ + {KeyspaceID: uint32(1), SafePoint: 0}, + {KeyspaceID: uint32(2), SafePoint: 1}, + {KeyspaceID: uint32(3), SafePoint: 4396}, + {KeyspaceID: uint32(4), SafePoint: 23333333333}, + {KeyspaceID: uint32(5), SafePoint: math.MaxUint64}, } - safePoints := []uint64{ - 0, - 1, - 4396, - 23333333333, - math.MaxUint64, - } - return spaceIDs, safePoints + + return gcSafePoint } -func testServiceSafePoints() ([]string, []*endpoint.ServiceSafePoint) { - spaceIDs := []string{ - "keySpace1", - "keySpace1", - "keySpace1", - "keySpace2", - "keySpace2", - "keySpace2", - "keySpace3", - "keySpace3", - "keySpace3", - } +func testServiceSafePoints() []*endpoint.ServiceSafePointV2 { expireAt := time.Now().Add(100 * time.Second).Unix() - serviceSafePoints := []*endpoint.ServiceSafePoint{ - {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, - {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, - {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, - {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, - {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, - {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, - {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, - {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, - {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, - } - return spaceIDs, serviceSafePoints + serviceSafePoints := []*endpoint.ServiceSafePointV2{ + {KeyspaceID: uint32(1), ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, + {KeyspaceID: uint32(1), ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, + {KeyspaceID: uint32(1), ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, + {KeyspaceID: uint32(2), ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, + {KeyspaceID: uint32(2), ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, + {KeyspaceID: uint32(2), ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, + {KeyspaceID: uint32(3), ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, + {KeyspaceID: uint32(3), ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, + {KeyspaceID: uint32(3), ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, + } + return serviceSafePoints } func TestSaveLoadServiceSafePoint(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() - testSpaceID, testSafePoints := testServiceSafePoints() - for i := range testSpaceID { - re.NoError(storage.SaveServiceSafePoint(testSpaceID[i], testSafePoints[i])) + testServiceSafepoints := testServiceSafePoints() + for i := range testServiceSafepoints { + re.NoError(storage.SaveServiceSafePointV2(testServiceSafepoints[i])) } - for i := range testSpaceID { - loadedSafePoint, err := storage.LoadServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID) + for i := range testServiceSafepoints { + loadedServiceSafePoint, err := storage.LoadServiceSafePointV2(testServiceSafepoints[i].KeyspaceID, testServiceSafepoints[i].ServiceID) re.NoError(err) - re.Equal(testSafePoints[i], loadedSafePoint) + re.Equal(testServiceSafepoints[i].SafePoint, loadedServiceSafePoint.SafePoint) } } @@ -87,58 +70,48 @@ func TestLoadMinServiceSafePoint(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() currentTime := time.Now() - expireAt1 := currentTime.Add(100 * time.Second).Unix() - expireAt2 := currentTime.Add(200 * time.Second).Unix() - expireAt3 := currentTime.Add(300 * time.Second).Unix() - - serviceSafePoints := []*endpoint.ServiceSafePoint{ - {ServiceID: "0", ExpiredAt: expireAt1, SafePoint: 100}, - {ServiceID: "1", ExpiredAt: expireAt2, SafePoint: 200}, - {ServiceID: "2", ExpiredAt: expireAt3, SafePoint: 300}, + expireAt1 := currentTime.Add(1000 * time.Second).Unix() + expireAt2 := currentTime.Add(2000 * time.Second).Unix() + expireAt3 := currentTime.Add(3000 * time.Second).Unix() + + testKeyspaceID := uint32(1) + serviceSafePoints := []*endpoint.ServiceSafePointV2{ + {KeyspaceID: testKeyspaceID, ServiceID: "0", ExpiredAt: expireAt1, SafePoint: 300}, + {KeyspaceID: testKeyspaceID, ServiceID: "1", ExpiredAt: expireAt2, SafePoint: 400}, + {KeyspaceID: testKeyspaceID, ServiceID: "2", ExpiredAt: expireAt3, SafePoint: 500}, } - testKeyspace := "test" for _, serviceSafePoint := range serviceSafePoints { - re.NoError(storage.SaveServiceSafePoint(testKeyspace, serviceSafePoint)) + re.NoError(storage.SaveServiceSafePointV2(serviceSafePoint)) } // enabling failpoint to make expired key removal immediately observable re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/endpoint/removeExpiredKeys", "return(true)")) - minSafePoint, err := storage.LoadMinServiceSafePoint(testKeyspace, currentTime) - re.NoError(err) - re.Equal(serviceSafePoints[0], minSafePoint) - - // the safePoint with ServiceID 0 should be removed due to expiration - minSafePoint2, err := storage.LoadMinServiceSafePoint(testKeyspace, currentTime.Add(150*time.Second)) + minSafePoint, err := storage.LoadMinServiceSafePointV2(testKeyspaceID, currentTime) re.NoError(err) - re.Equal(serviceSafePoints[1], minSafePoint2) + re.Equal(serviceSafePoints[0].SafePoint, minSafePoint.SafePoint) - // verify that service safe point with ServiceID 0 has been removed - ssp, err := storage.LoadServiceSafePoint(testKeyspace, "0") + // gc_worker service safepoint will not be removed. + ssp, err := storage.LoadMinServiceSafePointV2(testKeyspaceID, currentTime.Add(5000*time.Second)) re.NoError(err) - re.Nil(ssp) - - // all remaining service safePoints should be removed due to expiration - ssp, err = storage.LoadMinServiceSafePoint(testKeyspace, currentTime.Add(500*time.Second)) - re.NoError(err) - re.Nil(ssp) + re.Equal(ssp.ServiceID, endpoint.GCWorkerServiceSafePointID) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/endpoint/removeExpiredKeys")) } func TestRemoveServiceSafePoint(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() - testSpaceID, testSafePoints := testServiceSafePoints() + testServiceSafepoint := testServiceSafePoints() // save service safe points - for i := range testSpaceID { - re.NoError(storage.SaveServiceSafePoint(testSpaceID[i], testSafePoints[i])) + for _, serviceSafePoint := range testServiceSafepoint { + re.NoError(storage.SaveServiceSafePointV2(serviceSafePoint)) } // remove saved service safe points - for i := range testSpaceID { - re.NoError(storage.RemoveServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID)) + for _, serviceSafePoint := range testServiceSafepoint { + re.NoError(storage.RemoveServiceSafePointV2(serviceSafePoint.KeyspaceID, serviceSafePoint.ServiceID)) } // check that service safe points are empty - for i := range testSpaceID { - loadedSafePoint, err := storage.LoadServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID) + for i := range testServiceSafepoint { + loadedSafePoint, err := storage.LoadServiceSafePointV2(testServiceSafepoint[i].KeyspaceID, testServiceSafepoint[i].ServiceID) re.NoError(err) re.Nil(loadedSafePoint) } @@ -147,54 +120,19 @@ func TestRemoveServiceSafePoint(t *testing.T) { func TestSaveLoadGCSafePoint(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() - testSpaceIDs, testSafePoints := testGCSafePoints() - for i := range testSpaceIDs { - testSpaceID := testSpaceIDs[i] - testSafePoint := testSafePoints[i] - err := storage.SaveKeyspaceGCSafePoint(testSpaceID, testSafePoint) - re.NoError(err) - loaded, err := storage.LoadKeyspaceGCSafePoint(testSpaceID) + testGCSafePoints := testGCSafePoints() + for _, testGCSafePoint := range testGCSafePoints { + testSpaceID := testGCSafePoint.KeyspaceID + testSafePoint := testGCSafePoint.SafePoint + err := storage.SaveGCSafePointV2(testGCSafePoint) re.NoError(err) - re.Equal(testSafePoint, loaded) - } -} - -func TestLoadAllKeyspaceGCSafePoints(t *testing.T) { - re := require.New(t) - storage := NewStorageWithMemoryBackend() - testSpaceIDs, testSafePoints := testGCSafePoints() - for i := range testSpaceIDs { - err := storage.SaveKeyspaceGCSafePoint(testSpaceIDs[i], testSafePoints[i]) + loadGCSafePoint, err := storage.LoadGCSafePointV2(testSpaceID) re.NoError(err) - } - loadedSafePoints, err := storage.LoadAllKeyspaceGCSafePoints(true) - re.NoError(err) - for i := range loadedSafePoints { - re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) - re.Equal(testSafePoints[i], loadedSafePoints[i].SafePoint) + re.Equal(testSafePoint, loadGCSafePoint.SafePoint) } - // saving some service safe points. - spaceIDs, safePoints := testServiceSafePoints() - for i := range spaceIDs { - re.NoError(storage.SaveServiceSafePoint(spaceIDs[i], safePoints[i])) - } - - // verify that service safe points do not interfere with gc safe points. - loadedSafePoints, err = storage.LoadAllKeyspaceGCSafePoints(true) - re.NoError(err) - for i := range loadedSafePoints { - re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) - re.Equal(testSafePoints[i], loadedSafePoints[i].SafePoint) - } - - // verify that when withGCSafePoint set to false, returned safePoints is 0 - loadedSafePoints, err = storage.LoadAllKeyspaceGCSafePoints(false) - re.NoError(err) - for i := range loadedSafePoints { - re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) - re.Equal(uint64(0), loadedSafePoints[i].SafePoint) - } + _, err2 := storage.LoadGCSafePointV2(999) + re.NoError(err2) } func TestLoadEmpty(t *testing.T) { @@ -202,17 +140,12 @@ func TestLoadEmpty(t *testing.T) { storage := NewStorageWithMemoryBackend() // loading non-existing GC safepoint should return 0 - gcSafePoint, err := storage.LoadKeyspaceGCSafePoint("testKeyspace") + gcSafePoint, err := storage.LoadGCSafePointV2(1) re.NoError(err) - re.Equal(uint64(0), gcSafePoint) + re.Equal(uint64(0), gcSafePoint.SafePoint) // loading non-existing service safepoint should return nil - serviceSafePoint, err := storage.LoadServiceSafePoint("testKeyspace", "testService") + serviceSafePoint, err := storage.LoadServiceSafePointV2(1, "testService") re.NoError(err) re.Nil(serviceSafePoint) - - // loading empty key spaces should return empty slices - safePoints, err := storage.LoadAllKeyspaceGCSafePoints(true) - re.NoError(err) - re.Empty(safePoints) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b99eaed74662..5acd6113f87b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/gc" "github.com/tikv/pd/pkg/gctuner" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" @@ -115,6 +116,7 @@ type Server interface { ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error GetKeyspaceGroupManager() *keyspace.GroupManager IsAPIServiceMode() bool + GetSafePointV2Manager() *gc.SafePointV2Manager } // RaftCluster is used for cluster config management. diff --git a/server/gc_service.go b/server/gc_service.go new file mode 100644 index 000000000000..0a4f8f4a6c96 --- /dev/null +++ b/server/gc_service.go @@ -0,0 +1,199 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "encoding/json" + "fmt" + "math" + "path" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/tsoutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// GetGCSafePointV2 return gc safe point for the given keyspace. +func (s *GrpcServer) GetGCSafePointV2(ctx context.Context, request *pdpb.GetGCSafePointV2Request) (*pdpb.GetGCSafePointV2Response, error) { + fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { + return pdpb.NewPDClient(client).GetGCSafePointV2(ctx, request) + } + if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + return nil, err + } else if rsp != nil { + return rsp.(*pdpb.GetGCSafePointV2Response), err + } + + safePoint, err := s.safePointV2Manager.LoadGCSafePoint(request.GetKeyspaceId()) + + if err != nil { + return &pdpb.GetGCSafePointV2Response{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, err + } + + return &pdpb.GetGCSafePointV2Response{ + Header: s.header(), + SafePoint: safePoint.SafePoint, + }, nil +} + +// UpdateGCSafePointV2 update gc safe point for the given keyspace. +func (s *GrpcServer) UpdateGCSafePointV2(ctx context.Context, request *pdpb.UpdateGCSafePointV2Request) (*pdpb.UpdateGCSafePointV2Response, error) { + fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { + return pdpb.NewPDClient(client).UpdateGCSafePointV2(ctx, request) + } + if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + return nil, err + } else if rsp != nil { + return rsp.(*pdpb.UpdateGCSafePointV2Response), err + } + + newSafePoint := request.GetSafePoint() + oldSafePoint, err := s.safePointV2Manager.UpdateGCSafePoint(&endpoint.GCSafePointV2{ + KeyspaceID: request.KeyspaceId, + SafePoint: request.SafePoint, + }) + if err != nil { + return nil, err + } + if newSafePoint > oldSafePoint.SafePoint { + log.Info("updated gc safe point", + zap.Uint64("safe-point", newSafePoint), + zap.Uint32("keyspace-id", request.GetKeyspaceId())) + } else if newSafePoint < oldSafePoint.SafePoint { + log.Warn("trying to update gc safe point", + zap.Uint64("old-safe-point", oldSafePoint.SafePoint), + zap.Uint64("new-safe-point", newSafePoint), + zap.Uint32("keyspace-id", request.GetKeyspaceId())) + newSafePoint = oldSafePoint.SafePoint + } + + return &pdpb.UpdateGCSafePointV2Response{ + Header: s.header(), + NewSafePoint: newSafePoint, + }, nil +} + +// UpdateServiceSafePointV2 update service safe point for the given keyspace. +func (s *GrpcServer) UpdateServiceSafePointV2(ctx context.Context, request *pdpb.UpdateServiceSafePointV2Request) (*pdpb.UpdateServiceSafePointV2Response, error) { + fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { + return pdpb.NewPDClient(client).UpdateServiceSafePointV2(ctx, request) + } + if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + return nil, err + } else if rsp != nil { + return rsp.(*pdpb.UpdateServiceSafePointV2Response), err + } + + var ( + nowTSO pdpb.Timestamp + err error + ) + if s.IsAPIServiceMode() { + nowTSO, err = s.getGlobalTSOFromTSOServer(ctx) + } else { + nowTSO, err = s.tsoAllocatorManager.HandleRequest(tso.GlobalDCLocation, 1) + } + if err != nil { + return nil, err + } + now, _ := tsoutil.ParseTimestamp(nowTSO) + + var minServiceSafePoint *endpoint.ServiceSafePointV2 + if request.Ttl < 0 { + minServiceSafePoint, err = s.safePointV2Manager.RemoveServiceSafePoint(request.GetKeyspaceId(), string(request.GetServiceId()), now) + } else { + serviceSafePoint := &endpoint.ServiceSafePointV2{ + KeyspaceID: request.GetKeyspaceId(), + ServiceID: string(request.GetServiceId()), + ExpiredAt: now.Unix() + request.GetTtl(), + SafePoint: request.GetSafePoint(), + } + // Fix possible overflow. + if math.MaxInt64-now.Unix() <= request.GetTtl() { + serviceSafePoint.ExpiredAt = math.MaxInt64 + } + minServiceSafePoint, err = s.safePointV2Manager.UpdateServiceSafePoint(serviceSafePoint, now) + } + if err != nil { + return nil, err + } + return &pdpb.UpdateServiceSafePointV2Response{ + Header: s.header(), + ServiceId: []byte(minServiceSafePoint.ServiceID), + Ttl: minServiceSafePoint.ExpiredAt - now.Unix(), + MinSafePoint: minServiceSafePoint.SafePoint, + }, nil +} + +// WatchGCSafePointV2 watch keyspaces gc safe point changes. +func (s *GrpcServer) WatchGCSafePointV2(request *pdpb.WatchGCSafePointV2Request, stream pdpb.PD_WatchGCSafePointV2Server) error { + ctx, cancel := context.WithCancel(s.Context()) + defer cancel() + revision := request.GetRevision() + // If the revision is compacted, will meet required revision has been compacted error. + // - If required revision < CompactRevision, we need to reload all configs to avoid losing data. + // - If required revision >= CompactRevision, just keep watching. + // Use WithPrevKV() to get the previous key-value pair when get Delete Event. + watchChan := s.client.Watch(ctx, path.Join(s.rootPath, endpoint.GCSafePointV2Prefix()), clientv3.WithRev(revision), clientv3.WithPrefix()) + for { + select { + case <-ctx.Done(): + return nil + case res := <-watchChan: + if res.Err() != nil { + var resp pdpb.WatchGCSafePointV2Response + if revision < res.CompactRevision { + resp.Header = s.wrapErrorToHeader(pdpb.ErrorType_DATA_COMPACTED, + fmt.Sprintf("required watch revision: %d is smaller than current compact/min revision %d.", revision, res.CompactRevision)) + } else { + resp.Header = s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + fmt.Sprintf("watch channel meet other error %s.", res.Err().Error())) + } + if err := stream.Send(&resp); err != nil { + return err + } + // Err() indicates that this WatchResponse holds a channel-closing error. + return res.Err() + } + revision = res.Header.GetRevision() + + safePointEvents := make([]*pdpb.SafePointEvent, 0, len(res.Events)) + for _, event := range res.Events { + gcSafePoint := &endpoint.GCSafePointV2{} + if err := json.Unmarshal(event.Kv.Value, gcSafePoint); err != nil { + return err + } + safePointEvents = append(safePointEvents, &pdpb.SafePointEvent{ + KeyspaceId: gcSafePoint.KeyspaceID, + SafePoint: gcSafePoint.SafePoint, + Type: pdpb.EventType(event.Type), + }) + } + if len(safePointEvents) > 0 { + if err := stream.Send(&pdpb.WatchGCSafePointV2Response{Header: s.header(), Events: safePointEvents, Revision: res.Header.GetRevision()}); err != nil { + return err + } + } + } + } +} diff --git a/server/grpc_service.go b/server/grpc_service.go index bd57c28db9c0..9b349f99335f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1500,34 +1500,6 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb }, nil } -// GetGCSafePointV2 implements gRPC PDServer. -// Note: we need latest version of kvproto/master, but there was earlier commit https://github.com/pingcap/kvproto/pull/1111 -// whose server side implementation hasn't been merged, so we add this method to avoid compile error. -func (s *GrpcServer) GetGCSafePointV2(_ context.Context, _ *pdpb.GetGCSafePointV2Request) (*pdpb.GetGCSafePointV2Response, error) { - return nil, errors.New("not implemented") -} - -// WatchGCSafePointV2 implements gRPC PDServer. -// Note: we need latest version of kvproto/master, but there was earlier commit https://github.com/pingcap/kvproto/pull/1111 -// whose server side implementation hasn't been merged, so we add this method to avoid compile error. -func (s *GrpcServer) WatchGCSafePointV2(_ *pdpb.WatchGCSafePointV2Request, server pdpb.PD_WatchGCSafePointV2Server) error { - return errors.New("not implemented") -} - -// UpdateGCSafePointV2 implements gRPC PDServer. -// Note: we need latest version of kvproto/master, but there was earlier commit https://github.com/pingcap/kvproto/pull/1111 -// whose server side implementation hasn't been merged, so we add this method to avoid compile error. -func (s *GrpcServer) UpdateGCSafePointV2(_ context.Context, _ *pdpb.UpdateGCSafePointV2Request) (*pdpb.UpdateGCSafePointV2Response, error) { - return nil, errors.New("not implemented") -} - -// UpdateServiceSafePointV2 implements gRPC PDServer. -// Note: we need latest version of kvproto/master, but there was earlier commit https://github.com/pingcap/kvproto/pull/1111 -// whose server side implementation hasn't been merged, so we add this method to avoid compile error. -func (s *GrpcServer) UpdateServiceSafePointV2(_ context.Context, _ *pdpb.UpdateServiceSafePointV2Request) (*pdpb.UpdateServiceSafePointV2Response, error) { - return nil, errors.New("not implemented") -} - // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { @@ -2151,6 +2123,7 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve } } } + if len(cfgs) > 0 { if err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs, Revision: res.Header.GetRevision()}); err != nil { return err diff --git a/server/server.go b/server/server.go index 18c9759e2943..47cbca3e2a83 100644 --- a/server/server.go +++ b/server/server.go @@ -46,6 +46,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/gc" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" ms_server "github.com/tikv/pd/pkg/mcs/metastorage/server" @@ -74,7 +75,6 @@ import ( "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" - "github.com/tikv/pd/server/gc" syncer "github.com/tikv/pd/server/regionsyncer" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" @@ -169,6 +169,8 @@ type Server struct { gcSafePointManager *gc.SafePointManager // keyspace manager keyspaceManager *keyspace.Manager + // safe point V2 manager + safePointV2Manager *gc.SafePointV2Manager // keyspace group manager keyspaceGroupManager *keyspace.GroupManager // for basicCluster operation. @@ -465,6 +467,7 @@ func (s *Server) startServer(ctx context.Context) error { s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client, s.clusterID) } s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) + s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster) // initial hot_region_storage in here. s.hotRegionStorage, err = storage.NewHotRegionsStorage( @@ -847,6 +850,11 @@ func (s *Server) GetKeyspaceManager() *keyspace.Manager { return s.keyspaceManager } +// GetSafePointV2Manager returns the safe point v2 manager of server. +func (s *Server) GetSafePointV2Manager() *gc.SafePointV2Manager { + return s.safePointV2Manager +} + // GetKeyspaceGroupManager returns the keyspace group manager of server. func (s *Server) GetKeyspaceGroupManager() *keyspace.GroupManager { return s.keyspaceGroupManager diff --git a/tests/integrations/client/gc_client_test.go b/tests/integrations/client/gc_client_test.go new file mode 100644 index 000000000000..acb1c4588124 --- /dev/null +++ b/tests/integrations/client/gc_client_test.go @@ -0,0 +1,208 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client_test + +import ( + "path" + "strconv" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + pd "github.com/tikv/pd/client" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/assertutil" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/server" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// gcClientTestReceiver is the pdpb.PD_WatchGCSafePointV2Server mock for testing. +type gcClientTestReceiver struct { + re *require.Assertions + grpc.ServerStream +} + +// Send is the mock implementation for pdpb.PD_WatchGCSafePointV2Server's Send. +// Instead of sending the response to the client, it will check the response. +// In testing, we will set all keyspace's safe point to be equal to its id, +// and this mock verifies that the response is correct. +func (s gcClientTestReceiver) Send(m *pdpb.WatchGCSafePointV2Response) error { + log.Info("received", zap.Any("received", m.GetEvents())) + for _, change := range m.GetEvents() { + s.re.Equal(change.SafePoint, uint64(change.KeyspaceId)) + } + return nil +} + +type gcClientTestSuite struct { + suite.Suite + server *server.GrpcServer + client pd.Client + cleanup testutil.CleanupFunc + gcSafePointV2Prefix string +} + +func TestGcClientTestSuite(t *testing.T) { + suite.Run(t, new(gcClientTestSuite)) +} + +func (suite *gcClientTestSuite) SetupSuite() { + var err error + var gsi *server.Server + checker := assertutil.NewChecker() + checker.FailNow = func() {} + gsi, suite.cleanup, err = server.NewTestServer(suite.Require(), checker) + suite.server = &server.GrpcServer{Server: gsi} + suite.NoError(err) + addr := suite.server.GetAddr() + suite.client, err = pd.NewClientWithContext(suite.server.Context(), []string{addr}, pd.SecurityOption{}) + suite.NoError(err) + rootPath := path.Join("/pd", strconv.FormatUint(suite.server.ClusterID(), 10)) + suite.gcSafePointV2Prefix = path.Join(rootPath, endpoint.GCSafePointV2Prefix()) + // Enable the fail-point to skip checking keyspace validity. + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/gc/checkKeyspace", "return(true)")) +} + +func (suite *gcClientTestSuite) TearDownSuite() { + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/gc/checkKeyspace")) + suite.cleanup() +} + +func (suite *gcClientTestSuite) TearDownTest() { + suite.CleanupEtcdGCPath() +} + +func (suite *gcClientTestSuite) CleanupEtcdGCPath() { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.gcSafePointV2Prefix, clientv3.WithPrefix()) + suite.NoError(err) +} + +func (suite *gcClientTestSuite) TestWatch1() { + receiver := gcClientTestReceiver{re: suite.Require()} + go suite.server.WatchGCSafePointV2(&pdpb.WatchGCSafePointV2Request{ + Revision: 0, + }, receiver) + + // Init gc safe points as index value of keyspace 0 ~ 5. + for i := 0; i < 6; i++ { + suite.mustUpdateSafePoint(uint32(i), uint64(i)) + } + + // delete gc safe points of keyspace 3 ~ 5. + for i := 3; i < 6; i++ { + suite.mustDeleteSafePoint(uint32(i)) + } + + // check gc safe point equal to keyspace id for keyspace 0 ~ 2 . + for i := 0; i < 3; i++ { + suite.Equal(uint64(i), suite.mustLoadSafePoint(uint32(i))) + } + + // check gc safe point is 0 for keyspace 3 ~ 5 after delete. + for i := 3; i < 6; i++ { + suite.Equal(uint64(0), suite.mustLoadSafePoint(uint32(i))) + } +} + +func (suite *gcClientTestSuite) TestClientWatchWithRevision() { + suite.testClientWatchWithRevision(false) + suite.testClientWatchWithRevision(true) +} + +func (suite *gcClientTestSuite) testClientWatchWithRevision(fromNewRevision bool) { + testKeyspaceID := uint32(100) + initGCSafePoint := uint64(50) + updatedGCSafePoint := uint64(100) + + // Init gc safe point. + suite.mustUpdateSafePoint(testKeyspaceID, initGCSafePoint) + + // Get the initial revision. + initRevision := suite.mustGetRevision(testKeyspaceID) + + // Update the gc safe point. + suite.mustUpdateSafePoint(testKeyspaceID, updatedGCSafePoint) + + // Get the revision of the updated gc safe point. + updatedRevision := suite.mustGetRevision(testKeyspaceID) + + // Set the start revision of the watch request based on fromNewRevision. + startRevision := initRevision + if fromNewRevision { + startRevision = updatedRevision + } + watchChan, err := suite.client.WatchGCSafePointV2(suite.server.Context(), startRevision) + suite.NoError(err) + + timeout := time.After(time.Second) + + isFirstUpdate := true + for { + select { + case <-timeout: + return + case res := <-watchChan: + for _, r := range res { + suite.Equal(r.GetKeyspaceId(), testKeyspaceID) + if fromNewRevision { + // If fromNewRevision, first response should be the updated gc safe point. + suite.Equal(r.GetSafePoint(), updatedGCSafePoint) + } else if isFirstUpdate { + isFirstUpdate = false + suite.Equal(r.GetSafePoint(), initGCSafePoint) + } else { + suite.Equal(r.GetSafePoint(), updatedGCSafePoint) + continue + } + } + } + } +} + +// mustUpdateSafePoint updates the gc safe point of the given keyspace id. +func (suite *gcClientTestSuite) mustUpdateSafePoint(keyspaceID uint32, safePoint uint64) { + _, err := suite.client.UpdateGCSafePointV2(suite.server.Context(), keyspaceID, safePoint) + suite.NoError(err) +} + +// mustLoadSafePoint loads the gc safe point of the given keyspace id. +func (suite *gcClientTestSuite) mustLoadSafePoint(keyspaceID uint32) uint64 { + res, err := suite.server.GetSafePointV2Manager().LoadGCSafePoint(keyspaceID) + suite.NoError(err) + return res.SafePoint +} + +// mustDeleteSafePoint deletes the gc safe point of the given keyspace id. +func (suite *gcClientTestSuite) mustDeleteSafePoint(keyspaceID uint32) { + safePointPath := path.Join(suite.gcSafePointV2Prefix, endpoint.EncodeKeyspaceID(keyspaceID)) + log.Info("test etcd path", zap.Any("path", safePointPath)) // TODO: Delete + _, err := suite.server.GetClient().Delete(suite.server.Context(), safePointPath) + suite.NoError(err) +} + +// mustGetRevision gets the revision of the given keyspace's gc safe point. +func (suite *gcClientTestSuite) mustGetRevision(keyspaceID uint32) int64 { + safePointPath := path.Join(suite.gcSafePointV2Prefix, endpoint.EncodeKeyspaceID(keyspaceID)) + res, err := suite.server.GetClient().Get(suite.server.Context(), safePointPath) + suite.NoError(err) + return res.Header.GetRevision() +} diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index ca60c3f16d62..cd42cfec0e7f 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e + github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index a46978cbbd97..3a83bd7017d9 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 42efe03381a8..f4d10c6c0ca4 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -12,7 +12,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e + github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 @@ -79,6 +79,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.1+incompatible // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect + github.com/golang/glog v1.0.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 60402b113dcc..0fb2712e600c 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -186,8 +186,9 @@ github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0kt github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -385,8 +386,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index aa608bfa8489..ba74ff38054f 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e + github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 10dc4f1d7c78..ab77ffb48aca 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -184,8 +184,8 @@ github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0kt github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -383,8 +383,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index 87ccafd7314b..f62f93da7ac7 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -851,8 +851,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= -github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA= +github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= From 99e241955eee9461ebe79452f035bb8cfd8b649c Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 25 May 2023 16:35:40 +0800 Subject: [PATCH 2/2] config, cluster: add an option to halt the cluster scheduling (#6498) ref tikv/pd#6493 Add an option to halt the cluster scheduling. Signed-off-by: JmPotato --- errors.toml | 5 + metrics/grafana/pd.json | 109 +++++++++++++++++++- pkg/errs/errno.go | 7 +- pkg/mock/mockcluster/mockcluster.go | 8 +- pkg/schedule/config/config.go | 2 +- pkg/schedule/coordinator.go | 14 ++- pkg/schedule/core/cluster_informer.go | 2 +- pkg/schedule/diagnostic_manager.go | 2 + pkg/schedule/placement/rule_manager_test.go | 2 +- server/cluster/cluster.go | 27 ++++- server/cluster/cluster_worker.go | 8 +- server/cluster/metrics.go | 9 ++ server/config/config.go | 9 ++ server/config/persist_options.go | 19 ++-- 14 files changed, 191 insertions(+), 32 deletions(-) diff --git a/errors.toml b/errors.toml index b5d4dafdf0d8..c19b67d8b2fc 100644 --- a/errors.toml +++ b/errors.toml @@ -96,6 +96,11 @@ error = ''' TiKV cluster not bootstrapped, please start TiKV first ''' +["PD:cluster:ErrSchedulingIsHalted"] +error = ''' +scheduling is halted +''' + ["PD:cluster:ErrStoreIsUp"] error = ''' store is still up, please remove store gracefully diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 45e25ef63959..3909dc670c6b 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -2340,6 +2340,113 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": true, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The allowance status of the scheduling.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 41 + }, + "hiddenSeries": false, + "id": 1464, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": false, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 1, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "pd_scheduling_allowance_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{kind}}", + "metric": "pd_scheduling_allowance_status", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Scheduling Allowance Status", + "tooltip": { + "shared": true, + "sort": 1, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:533", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "$$hashKey": "object:534", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "cacheTimeout": null, "colorBackground": false, @@ -2967,7 +3074,7 @@ "format": "time_series", "intervalFactor": 2, "legendFormat": "{{event}}", - "metric": "pd_scheduler_status", + "metric": "pd_schedule_operators_count", "refId": "A", "step": 4 } diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index d98b5e9dfd03..2db8638a12af 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -140,9 +140,10 @@ var ( // cluster errors var ( - ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped")) - ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp")) - ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID")) + ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped")) + ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp")) + ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID")) + ErrSchedulingIsHalted = errors.Normalize("scheduling is halted", errors.RFCCodeText("PD:cluster:ErrSchedulingIsHalted")) ) // versioninfo errors diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index bf32d9476b23..5ff91567f66b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -104,11 +104,6 @@ func (mc *Cluster) GetAllocator() id.Allocator { return mc.IDAllocator } -// IsUnsafeRecovering returns if the cluster is in unsafe recovering. -func (mc *Cluster) IsUnsafeRecovering() bool { - return false -} - // GetPersistOptions returns the persist options. func (mc *Cluster) GetPersistOptions() *config.PersistOptions { return mc.PersistOptions @@ -123,6 +118,9 @@ func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false, // IsSchedulerDisabled checks if the scheduler with name is disabled or not. func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil } +// CheckSchedulingAllowance checks if the cluster allows scheduling currently. +func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil } + // ScanRegions scans region with start key, until number greater than limit. func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo { return mc.ScanRange(startKey, endKey, limit) diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 2c0842914c7d..f63aae8d2dda 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -87,7 +87,7 @@ type Config interface { SetSplitMergeInterval(time.Duration) SetMaxReplicas(int) SetPlacementRulesCacheEnabled(bool) - SetWitnessEnabled(bool) + SetEnableWitness(bool) // only for store configuration UseRaftV2() } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index e979bd9e324e..c1f36bdf7d89 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -142,8 +142,7 @@ func (c *Coordinator) PatrolRegions() { log.Info("patrol regions has been stopped") return } - if c.cluster.IsUnsafeRecovering() { - // Skip patrolling regions during unsafe recovery. + if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed { continue } @@ -559,7 +558,7 @@ func (c *Coordinator) CollectSchedulerMetrics() { var allowScheduler float64 // If the scheduler is not allowed to schedule, it will disappear in Grafana panel. // See issue #1341. - if !s.IsPaused() && !s.cluster.IsUnsafeRecovering() { + if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed { allowScheduler = 1 } schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler) @@ -1029,7 +1028,14 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool { } return false } - if s.IsPaused() || s.cluster.IsUnsafeRecovering() { + allowed, _ := s.cluster.CheckSchedulingAllowance() + if !allowed { + if diagnosable { + s.diagnosticRecorder.setResultFromStatus(halted) + } + return false + } + if s.IsPaused() { if diagnosable { s.diagnosticRecorder.setResultFromStatus(paused) } diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 345cdeb74a91..1af8b28046ad 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -44,8 +44,8 @@ type ClusterInformer interface { UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) IsSchedulerExisted(name string) (bool, error) IsSchedulerDisabled(name string) (bool, error) + CheckSchedulingAllowance() (bool, error) GetPersistOptions() *config.PersistOptions - IsUnsafeRecovering() bool } // RegionHealthCluster is an aggregate interface that wraps multiple interfaces diff --git a/pkg/schedule/diagnostic_manager.go b/pkg/schedule/diagnostic_manager.go index c68999f6cdda..9560110fea04 100644 --- a/pkg/schedule/diagnostic_manager.go +++ b/pkg/schedule/diagnostic_manager.go @@ -34,6 +34,8 @@ const ( disabled = "disabled" // paused means the current scheduler is paused paused = "paused" + // halted means the current scheduler is halted + halted = "halted" // scheduling means the current scheduler is generating. scheduling = "scheduling" // pending means the current scheduler cannot generate scheduling operator diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index 894f78f1fefa..e5be8d74cd21 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -33,7 +33,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) var err error manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) - manager.conf.SetWitnessEnabled(enableWitness) + manager.conf.SetEnableWitness(enableWitness) err = manager.Initialize(3, []string{"zone", "rack", "host"}) re.NoError(err) return store, manager diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5acd6113f87b..1a890cdfc54a 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -826,11 +826,6 @@ func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller { return c.unsafeRecoveryController } -// IsUnsafeRecovering returns if the cluster is in unsafe recovering. -func (c *RaftCluster) IsUnsafeRecovering() bool { - return c.unsafeRecoveryController.IsRunning() -} - // AddSuspectKeyRange adds the key range with the its ruleID as the key // The instance of each keyRange is like following format: // [2][]byte: start key/end key @@ -2711,3 +2706,25 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) { func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) { return c.coordinator.GetPausedSchedulerDelayUntil(name) } + +var ( + onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery") + haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling") +) + +// CheckSchedulingAllowance checks if the cluster allows scheduling currently. +func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) { + // If the cluster is in the process of online unsafe recovery, it should not allow scheduling. + if c.GetUnsafeRecoveryController().IsRunning() { + onlineUnsafeRecoveryStatus.Set(1) + return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() + } + onlineUnsafeRecoveryStatus.Set(0) + // If the halt-scheduling is set, it should not allow scheduling. + if c.opt.IsSchedulingHalted() { + haltSchedulingStatus.Set(1) + return false, errs.ErrSchedulingIsHalted.FastGenByArgs() + } + haltSchedulingStatus.Set(0) + return true, nil +} diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 06f5c1725aa8..9f87b4501ad4 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { // HandleAskSplit handles the split request. func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { - if c.GetUnsafeRecoveryController().IsRunning() { - return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() + if allowed, err := c.CheckSchedulingAllowance(); !allowed { + return nil, err } if !c.opt.IsTikvRegionSplitEnabled() { return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() @@ -105,8 +105,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { // HandleAskBatchSplit handles the batch split request. func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { - if c.GetUnsafeRecoveryController().IsRunning() { - return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() + if allowed, err := c.CheckSchedulingAllowance(); !allowed { + return nil, err } if !c.opt.IsTikvRegionSplitEnabled() { return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() diff --git a/server/cluster/metrics.go b/server/cluster/metrics.go index e43fe595f708..4306779d6813 100644 --- a/server/cluster/metrics.go +++ b/server/cluster/metrics.go @@ -95,11 +95,20 @@ var ( Name: "store_sync", Help: "The state of store sync config", }, []string{"address", "state"}) + + schedulingAllowanceStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "scheduling", + Name: "allowance_status", + Help: "Status of the scheduling allowance.", + }, []string{"kind"}) ) func init() { prometheus.MustRegister(regionEventCounter) prometheus.MustRegister(healthStatusGauge) + prometheus.MustRegister(schedulingAllowanceStatusGauge) prometheus.MustRegister(clusterStateCPUGauge) prometheus.MustRegister(clusterStateCurrent) prometheus.MustRegister(bucketEventCounter) diff --git a/server/config/config.go b/server/config/config.go index 32310d39ca19..bfe2cc370fa9 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -213,6 +213,7 @@ const ( defaultEnableGRPCGateway = true defaultDisableErrorVerbose = true defaultEnableWitness = false + defaultHaltScheduling = false defaultDashboardAddress = "auto" @@ -684,6 +685,10 @@ type ScheduleConfig struct { // v1: which is based on the region count by rate limit. // v2: which is based on region size by window size. StoreLimitVersion string `toml:"store-limit-version" json:"store-limit-version,omitempty"` + + // HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling, + // and any other scheduling configs will be ignored. + HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"` } // Clone returns a cloned scheduling configuration. @@ -820,6 +825,10 @@ func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool) configutil.AdjustString(&c.RegionScoreFormulaVersion, defaultRegionScoreFormulaVersion) } + if !meta.IsDefined("halt-scheduling") { + c.HaltScheduling = defaultHaltScheduling + } + adjustSchedulers(&c.Schedulers, DefaultSchedulers) for k, b := range c.migrateConfigurationMap() { diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 9b5fbb415e9b..1b18f9fad541 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -184,13 +184,6 @@ func (o *PersistOptions) SetPlacementRulesCacheEnabled(enabled bool) { o.SetReplicationConfig(v) } -// SetWitnessEnabled set EanbleWitness -func (o *PersistOptions) SetWitnessEnabled(enabled bool) { - v := o.GetScheduleConfig().Clone() - v.EnableWitness = enabled - o.SetScheduleConfig(v) -} - // GetStrictlyMatchLabel returns whether check label strict. func (o *PersistOptions) GetStrictlyMatchLabel() bool { return o.GetReplicationConfig().StrictlyMatchLabel @@ -926,3 +919,15 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien } return err } + +// SetHaltScheduling set HaltScheduling. +func (o *PersistOptions) SetHaltScheduling(halt bool) { + v := o.GetScheduleConfig().Clone() + v.HaltScheduling = halt + o.SetScheduleConfig(v) +} + +// IsSchedulingHalted returns if PD scheduling is halted. +func (o *PersistOptions) IsSchedulingHalted() bool { + return o.GetScheduleConfig().HaltScheduling +}