Skip to content

Commit

Permalink
support batch allcating ids
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 21, 2025
1 parent 2bbeb9c commit 6ec9efc
Show file tree
Hide file tree
Showing 33 changed files with 5,242 additions and 121 deletions.
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20250221050439-78e733209da9
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -66,6 +64,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rleungx/kvproto v0.0.0-20250221050439-78e733209da9 h1:U6e9bF6hBkN5HuIljK2+9S7ocNyhz28yvlYb6e5bHvY=
github.com/rleungx/kvproto v0.0.0-20250221050439-78e733209da9/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,5 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20250221050439-78e733209da9
1,717 changes: 1,708 additions & 9 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,15 +839,15 @@ func newRegionInfoIDRandom(idAllocator id.Allocator) *RegionInfo {
// Randomly select a peer as the leader.
leaderIdx := mrand.Intn(peerNum)
for i := range peerNum {
id, _ := idAllocator.Alloc()
id, _, _ := idAllocator.Alloc(1)
// Randomly distribute the peers to different stores.
p := &metapb.Peer{Id: id, StoreId: uint64(mrand.Intn(storeNum) + 1)}
if i == leaderIdx {
leader = p
}
peers = append(peers, p)
}
regionID, _ := idAllocator.Alloc()
regionID, _, _ := idAllocator.Alloc(1)
return NewRegionInfo(
&metapb.Region{
Id: regionID,
Expand Down
18 changes: 10 additions & 8 deletions pkg/id/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Allocator interface {
// SetBase set base id
SetBase(newBase uint64) error
// Alloc allocs a unique id.
Alloc() (uint64, error)
Alloc(count uint32) (uint64, uint32, error)
// Rebase resets the base for the allocator from the persistent window boundary,
// which also resets the end of the allocator. (base, end) is the range that can
// be allocated in memory.
Expand Down Expand Up @@ -94,19 +94,21 @@ func NewAllocator(params *AllocatorParams) Allocator {
}

// Alloc returns a new id.
func (alloc *allocatorImpl) Alloc() (uint64, error) {
func (alloc *allocatorImpl) Alloc(count uint32) (uint64, uint32, error) {
alloc.mu.Lock()
defer alloc.mu.Unlock()

if alloc.base == alloc.end {
if err := alloc.rebaseLocked(true); err != nil {
return 0, err
for range count {
if alloc.base == alloc.end {
if err := alloc.rebaseLocked(true); err != nil {
return 0, 0, err
}
}
}

alloc.base++
alloc.base++
}

return alloc.base, nil
return alloc.base, count, nil
}

// SetBase sets the base.
Expand Down
4 changes: 2 additions & 2 deletions pkg/id/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ func TestMultipleAllocator(t *testing.T) {

// testAllocator sequentially updates given allocator and check if values are expected.
func testAllocator(re *require.Assertions, allocator Allocator) {
startID, err := allocator.Alloc()
startID, _, err := allocator.Alloc(1)
re.NoError(err)
for i := startID + 1; i < startID+step*20; i++ {
id, err := allocator.Alloc()
id, _, err := allocator.Alloc(1)
re.NoError(err)
re.Equal(i, id)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func (manager *Manager) LoadRangeKeyspace(startID uint32, limit int) ([]*keyspac

// allocID allocate a new keyspace id.
func (manager *Manager) allocID() (uint32, error) {
id64, err := manager.idAllocator.Alloc()
id64, _, err := manager.idAllocator.Alloc(1)
if err != nil {
return 0, err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,20 +239,20 @@ func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.per
// GetStoreConfig returns the store config.
func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig }

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) {
// AllocID allocates new IDs.
func (c *Cluster) AllocID(count uint32) (uint64, uint32, error) {
client, err := c.getPDLeaderClient()
if err != nil {
return 0, err
return 0, 0, err
}
ctx, cancel := context.WithTimeout(c.ctx, requestTimeout)
defer cancel()
resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}})
resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, Count: count})
if err != nil {
c.triggerMembershipCheck()
return 0, err
return 0, 0, err
}
return resp.GetId(), nil
return resp.GetId(), resp.GetCount(), nil
}

func (c *Cluster) getPDLeaderClient() (pdpb.PDClient, error) {
Expand Down
20 changes: 11 additions & 9 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,19 @@ func (s *Service) AskBatchSplit(_ context.Context, request *schedulingpb.AskBatc
splitIDs := make([]*pdpb.SplitID, 0, splitCount)
recordRegions := make([]uint64, 0, splitCount+1)

for i := 0; i < int(splitCount); i++ {
newRegionID, err := c.AllocID()
if err != nil {
return nil, errs.ErrSchedulerNotFound.FastGenByArgs()
}
id, count, err := c.AllocID(splitCount * (1 + uint32(len(request.Region.Peers))))
if err != nil {
return nil, err
}
curID := id - uint64(count)
for range splitCount {
newRegionID := curID
curID++

peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
if peerIDs[i], err = c.AllocID(); err != nil {
return nil, err
}
for j := 0; j < len(peerIDs); j++ {
peerIDs[j] = curID
curID++
}

recordRegions = append(recordRegions, newRegionID)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (mc *Cluster) GetStorage() storage.Storage {
}

// AllocID returns a new unique ID.
func (mc *Cluster) AllocID() (uint64, error) {
return mc.IDAllocator.Alloc()
func (mc *Cluster) AllocID(uint32) (uint64, uint32, error) {
return mc.IDAllocator.Alloc(1)
}

// UpdateRegionsLabelLevelStats updates the label level stats for the regions.
Expand Down Expand Up @@ -190,7 +190,7 @@ func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind utils.RWTy

// AllocPeer allocs a new peer on a store.
func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.AllocID()
peerID, _, err := mc.AllocID(1)
if err != nil {
log.Error("failed to alloc peer", errs.ZapError(err))
return nil, err
Expand Down Expand Up @@ -363,7 +363,7 @@ func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, lea
}
mc.AddRegionStore(storeID, regionCount)
for range leaderCount {
id, _ := mc.AllocID()
id, _, _ := mc.AllocID(1)
mc.AddLeaderRegion(id, storeID)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mock/mockid/mockid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func NewIDAllocator() *IDAllocator {
}

// Alloc returns a new id.
func (alloc *IDAllocator) Alloc() (uint64, error) {
return atomic.AddUint64(&alloc.base, 1), nil
func (alloc *IDAllocator) Alloc(uint32) (uint64, uint32, error) {
return atomic.AddUint64(&alloc.base, 1), 1, nil
}

// SetBase implements the IDAllocator interface.
Expand Down
8 changes: 4 additions & 4 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (m *ModeManager) drSwitchToAsyncWait(availableStores []uint64) error {
m.Lock()
defer m.Unlock()

id, err := m.cluster.AllocID()
id, _, err := m.cluster.AllocID(1)
if err != nil {
log.Warn("failed to switch to async wait state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -260,7 +260,7 @@ func (m *ModeManager) drSwitchToAsync(availableStores []uint64) error {
}

func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error {
id, err := m.cluster.AllocID()
id, _, err := m.cluster.AllocID(1)
if err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand Down Expand Up @@ -292,7 +292,7 @@ func (m *ModeManager) drSwitchToSyncRecover() error {
}

func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
id, err := m.cluster.AllocID()
id, _, err := m.cluster.AllocID(1)
if err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -312,7 +312,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
func (m *ModeManager) drSwitchToSync() error {
m.Lock()
defer m.Unlock()
id, err := m.cluster.AllocID()
id, _, err := m.cluster.AllocID(1)
if err != nil {
log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type SharedCluster interface {
GetBasicCluster() *core.BasicCluster
GetSharedConfig() sc.SharedConfigProvider
GetRuleManager() *placement.RuleManager
AllocID() (uint64, error)
AllocID(uint32) (uint64, uint32, error)
IsSchedulingHalted() bool
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (b *Builder) prepareBuild() (string, error) {
if o == nil || (!b.useJointConsensus && !core.IsLearner(o) && core.IsLearner(n)) {
if n.GetId() == 0 {
// Allocate peer ID if need.
id, err := b.AllocID()
id, _, err := b.AllocID(1)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func checkBalanceRegionOpInfluence(re *require.Assertions, enablePlacementRules
// ensure store score without operator influence : store 4 > store 3
// and store score with operator influence : store 3 > store 4
for i := 1; i <= 8; i++ {
id, _ := tc.Alloc()
id, _, _ := tc.Alloc(1)
origin := tc.AddLeaderRegion(id, 4)
newPeer := &metapb.Peer{StoreId: 3, Role: metapb.PeerRole_Voter}
op, _ := operator.CreateMovePeerOperator("balance-region", tc, origin, operator.OpKind(0), 4, newPeer)
Expand Down
6 changes: 3 additions & 3 deletions pkg/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,14 @@ func getIDAllocator() *mockid.IDAllocator {
func buildRegion(cluster *core.BasicCluster, kind utils.RWType, peerCount int, interval uint64) (region *core.RegionInfo) {
peers := make([]*metapb.Peer, 0, peerCount)
for range peerCount {
id, _ := getIDAllocator().Alloc()
storeID, _ := getIDAllocator().Alloc()
id, _, _ := getIDAllocator().Alloc(1)
storeID, _, _ := getIDAllocator().Alloc(1)
peers = append(peers, &metapb.Peer{
Id: id,
StoreId: storeID,
})
}
id, _ := getIDAllocator().Alloc()
id, _, _ := getIDAllocator().Alloc(1)
meta := &metapb.Region{
Id: id,
Peers: peers,
Expand Down
6 changes: 3 additions & 3 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type cluster interface {
core.StoreSetInformer

ResetRegionCache()
AllocID() (uint64, error)
AllocID(uint32) (uint64, uint32, error)
BuryStore(storeID uint64, forceBury bool) error
GetSchedulerConfig() sc.SchedulerConfigProvider
}
Expand Down Expand Up @@ -1151,11 +1151,11 @@ func (u *Controller) generateCreateEmptyRegionPlan(newestRegionTree *regionTree,
hasPlan := false

createRegion := func(startKey, endKey []byte, storeID uint64) (*metapb.Region, error) {
regionID, err := u.cluster.AllocID()
regionID, _, err := u.cluster.AllocID(1)
if err != nil {
return nil, err
}
peerID, err := u.cluster.AllocID()
peerID, _, err := u.cluster.AllocID(1)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,13 @@ func (suite *adminTestSuite) TestRecoverAllocID() {
tu.StatusOK(re)))
re.NoError(tu.CheckPostJSON(testDialClient, url, []byte(`{"id": "1000000"}`),
tu.StatusOK(re)))
id, err2 := suite.svr.GetAllocator().Alloc()
id, _, err2 := suite.svr.GetAllocator().Alloc(1)
re.NoError(err2)
re.Equal(uint64(1000001), id)
// recover alloc id again
re.NoError(tu.CheckPostJSON(testDialClient, url, []byte(`{"id": "99000000"}`),
tu.StatusOK(re)))
id, err2 = suite.svr.GetAllocator().Alloc()
id, _, err2 = suite.svr.GetAllocator().Alloc(1)
re.NoError(err2)
re.Equal(uint64(99000001), id)
// unmark
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,8 +916,8 @@ func (c *RaftCluster) GetHeartbeatStreams() *hbstream.HeartbeatStreams {
}

// AllocID returns a global unique ID.
func (c *RaftCluster) AllocID() (uint64, error) {
return c.id.Alloc()
func (c *RaftCluster) AllocID(uint32) (uint64, uint32, error) {
return c.id.Alloc(1)
}

// GetRegionSyncer returns the region syncer.
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,7 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error {
}

func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
id, err := c.AllocID()
id, _, err := c.AllocID(1)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
return nil, errors.New("region split is paused by replication mode")
}

newRegionID, err := c.id.Alloc()
newRegionID, _, err := c.id.Alloc(1)
if err != nil {
return nil, err
}

peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
if peerIDs[i], err = c.id.Alloc(); err != nil {
if peerIDs[i], _, err = c.id.Alloc(1); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -136,15 +136,15 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*
splitIDs := make([]*pdpb.SplitID, 0, splitCount)
recordRegions := make([]uint64, 0, splitCount+1)

for i := 0; i < int(splitCount); i++ {
newRegionID, err := c.id.Alloc()
for range splitCount {
newRegionID, _, err := c.id.Alloc(1)
if err != nil {
return nil, errs.ErrSchedulerNotFound.FastGenByArgs()
}

peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
if peerIDs[i], err = c.id.Alloc(); err != nil {
if peerIDs[i], _, err = c.id.Alloc(1); err != nil {
return nil, err
}
}
Expand Down
Loading

0 comments on commit 6ec9efc

Please sign in to comment.