Skip to content

Commit

Permalink
keyspace: Refactor keyspace manager using RunInTxn (#5843)
Browse files Browse the repository at this point in the history
* added RunInTxn style interface to base kv

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* added unit tests

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* update comments

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* sort import

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* pin create revision when key not exist

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* init commit

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* fix test

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* fix test

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* update kvproto

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* update pd-tso-bench go sum

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* log error when fail to bootstrap keyspace manager

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* added comments

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

* go mod tidy msc

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
AmoebaProtozoa and ti-chi-bot authored Jan 11, 2023
1 parent 59a6203 commit bac5bf4
Show file tree
Hide file tree
Showing 19 changed files with 568 additions and 293 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1
github.com/pingcap/kvproto v0.0.0-20230110033234-055843a0a07d
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1 h1:jw4NjEiCleRJPPpHM7K6l8OKzOjnZAj62eKteCAY6ro=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20230110033234-055843a0a07d h1:BgioocFKd7i9WW3SkLrcqy+3lZrcBCo7ekTcB0GuuMA=
github.com/pingcap/kvproto v0.0.0-20230110033234-055843a0a07d/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
45 changes: 44 additions & 1 deletion client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package pd

import (
"context"
"go.uber.org/zap"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand All @@ -33,6 +33,8 @@ type KeyspaceClient interface {
LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error)
// WatchKeyspaces watches keyspace meta changes.
WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error)
// UpdateKeyspaceState updates target keyspace's state.
UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error)
}

// keyspaceClient returns the KeyspaceClient from current PD leader.
Expand Down Expand Up @@ -111,3 +113,44 @@ func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.Keyspac
}()
return keyspaceWatcherChan, err
}

// UpdateKeyspaceState attempts to update the keyspace specified by ID to the target state,
// it will also record StateChangedAt for the given keyspace if a state change took place.
// Currently, legal operations includes:
//
// ENABLED -> {ENABLED, DISABLED}
// DISABLED -> {ENABLED, DISABLED, ARCHIVED}
// ARCHIVED -> {ARCHIVED, TOMBSTONE}
// TOMBSTONE -> {TOMBSTONE}
//
// Updated keyspace meta will be returned.
func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.UpdateKeyspaceState", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &keyspacepb.UpdateKeyspaceStateRequest{
Header: c.requestHeader(),
Id: id,
State: state,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.keyspaceClient().UpdateKeyspaceState(ctx, req)
cancel()

if err != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, err
}

if resp.Header.GetError() != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Update state for keyspace id %d failed: %s", id, resp.Header.GetError().String())
}

return resp.Keyspace, nil
}
2 changes: 2 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var (
cmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions")
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
cmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -112,6 +113,7 @@ var (
cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point")
cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point")
cmdFailedDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
cmdFailedDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
requestDurationTSO = requestDuration.WithLabelValues("tso")
)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1
github.com/pingcap/kvproto v0.0.0-20230110033234-055843a0a07d
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1 h1:jw4NjEiCleRJPPpHM7K6l8OKzOjnZAj62eKteCAY6ro=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20230110033234-055843a0a07d h1:BgioocFKd7i9WW3SkLrcqy+3lZrcBCo7ekTcB0GuuMA=
github.com/pingcap/kvproto v0.0.0-20230110033234-055843a0a07d/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
Expand Down
108 changes: 59 additions & 49 deletions pkg/storage/endpoint/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,91 @@
package endpoint

import (
"context"
"strconv"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"go.etcd.io/etcd/clientv3"
)

const (
// spaceIDBase is base used to encode/decode spaceID.
// SpaceIDBase is base used to encode/decode spaceID.
// It's set to 10 for better readability.
spaceIDBase = 10
SpaceIDBase = 10
// spaceIDBitSizeMax is the max bitSize of spaceID.
// It's currently set to 24 (3bytes).
spaceIDBitSizeMax = 24
)

// KeyspaceStorage defines storage operations on keyspace related data.
type KeyspaceStorage interface {
// SaveKeyspace saves the given keyspace to the storage.
SaveKeyspace(*keyspacepb.KeyspaceMeta) error
// LoadKeyspace loads keyspace specified by spaceID.
LoadKeyspace(spaceID uint32, keyspace *keyspacepb.KeyspaceMeta) (bool, error)
// RemoveKeyspace removes target keyspace specified by spaceID.
RemoveKeyspace(spaceID uint32) error
SaveKeyspaceMeta(txn kv.Txn, meta *keyspacepb.KeyspaceMeta) error
LoadKeyspaceMeta(txn kv.Txn, id uint32) (*keyspacepb.KeyspaceMeta, error)
SaveKeyspaceID(txn kv.Txn, id uint32, name string) error
LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32, error)
// LoadRangeKeyspace loads no more than limit keyspaces starting at startID.
LoadRangeKeyspace(startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error)
// SaveKeyspaceIDByName saves keyspace name to ID lookup information.
// It saves the ID onto the path encoded with name.
SaveKeyspaceIDByName(spaceID uint32, name string) error
// LoadKeyspaceIDByName loads keyspace ID for the given keyspace specified by name.
// It first constructs path to spaceID with the given name, then attempt to retrieve
// target spaceID. If the target keyspace does not exist, result boolean is set to false.
LoadKeyspaceIDByName(name string) (bool, uint32, error)
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
}

var _ KeyspaceStorage = (*StorageEndpoint)(nil)

// SaveKeyspace saves the given keyspace to the storage.
func (se *StorageEndpoint) SaveKeyspace(keyspace *keyspacepb.KeyspaceMeta) error {
key := KeyspaceMetaPath(keyspace.GetId())
return se.saveProto(key, keyspace)
// SaveKeyspaceMeta adds a save keyspace meta operation to target transaction.
func (se *StorageEndpoint) SaveKeyspaceMeta(txn kv.Txn, meta *keyspacepb.KeyspaceMeta) error {
metaPath := KeyspaceMetaPath(meta.GetId())
metaVal, err := proto.Marshal(meta)
if err != nil {
return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause()
}
return txn.Save(metaPath, string(metaVal))
}

// LoadKeyspaceMeta load and return keyspace meta specified by id.
// If keyspace does not exist or error occurs, returned meta will be nil.
func (se *StorageEndpoint) LoadKeyspaceMeta(txn kv.Txn, id uint32) (*keyspacepb.KeyspaceMeta, error) {
metaPath := KeyspaceMetaPath(id)
metaVal, err := txn.Load(metaPath)
if err != nil || metaVal == "" {
return nil, err
}
meta := &keyspacepb.KeyspaceMeta{}
err = proto.Unmarshal([]byte(metaVal), meta)
if err != nil {
return nil, errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByCause()
}
return meta, nil
}

// SaveKeyspaceID saves keyspace ID to the path specified by keyspace name.
func (se *StorageEndpoint) SaveKeyspaceID(txn kv.Txn, id uint32, name string) error {
idPath := KeyspaceIDPath(name)
idVal := strconv.FormatUint(uint64(id), SpaceIDBase)
return txn.Save(idPath, idVal)
}

// LoadKeyspace loads keyspace specified by spaceID.
func (se *StorageEndpoint) LoadKeyspace(spaceID uint32, keyspace *keyspacepb.KeyspaceMeta) (bool, error) {
key := KeyspaceMetaPath(spaceID)
return se.loadProto(key, keyspace)
// LoadKeyspaceID loads keyspace ID from the path specified by keyspace name.
// An additional boolean is returned to indicate whether target id exists,
// it returns false if target id not found, or if error occurred.
func (se *StorageEndpoint) LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32, error) {
idPath := KeyspaceIDPath(name)
idVal, err := txn.Load(idPath)
// Failed to load the keyspaceID if loading operation errored, or if keyspace does not exist.
if err != nil || idVal == "" {
return false, 0, err
}
id64, err := strconv.ParseUint(idVal, SpaceIDBase, spaceIDBitSizeMax)
if err != nil {
return false, 0, err
}
return true, uint32(id64), nil
}

// RemoveKeyspace removes target keyspace specified by spaceID.
func (se *StorageEndpoint) RemoveKeyspace(spaceID uint32) error {
key := KeyspaceMetaPath(spaceID)
return se.Remove(key)
// RunInTxn runs the given function in a transaction.
func (se *StorageEndpoint) RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error {
return se.Base.RunInTxn(ctx, f)
}

// LoadRangeKeyspace loads keyspaces starting at startID.
Expand All @@ -92,25 +124,3 @@ func (se *StorageEndpoint) LoadRangeKeyspace(startID uint32, limit int) ([]*keys
}
return keyspaces, nil
}

// SaveKeyspaceIDByName saves keyspace name to ID lookup information to storage.
func (se *StorageEndpoint) SaveKeyspaceIDByName(spaceID uint32, name string) error {
key := KeyspaceIDPath(name)
idStr := strconv.FormatUint(uint64(spaceID), spaceIDBase)
return se.Save(key, idStr)
}

// LoadKeyspaceIDByName loads keyspace ID for the given keyspace name
func (se *StorageEndpoint) LoadKeyspaceIDByName(name string) (bool, uint32, error) {
key := KeyspaceIDPath(name)
idStr, err := se.Load(key)
// Failed to load the keyspaceID if loading operation errored, or if keyspace does not exist.
if err != nil || idStr == "" {
return false, 0, err
}
id64, err := strconv.ParseUint(idStr, spaceIDBase, spaceIDBitSizeMax)
if err != nil {
return false, 0, err
}
return true, uint32(id64), nil
}
Loading

0 comments on commit bac5bf4

Please sign in to comment.