Skip to content

Commit

Permalink
Merge branch 'master' into tpcc-tune3
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Nov 29, 2022
2 parents 6dc1374 + 2c0c130 commit 83c2880
Show file tree
Hide file tree
Showing 74 changed files with 1,072 additions and 563 deletions.
1 change: 1 addition & 0 deletions autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//meta",
"//metrics",
"//owner",
"//parser/model",
"//util/logutil",
"//util/mathutil",
"@com_github_pingcap_errors//:errors",
Expand Down
11 changes: 6 additions & 5 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (alloc *autoIDValue) alloc4Unsigned(ctx context.Context, store kv.Storage,

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
var err1 error
newBase, err1 = idAcc.Get()
if err1 != nil {
Expand Down Expand Up @@ -137,7 +138,7 @@ func (alloc *autoIDValue) alloc4Signed(ctx context.Context,

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
var err1 error
newBase, err1 = idAcc.Get()
if err1 != nil {
Expand Down Expand Up @@ -188,7 +189,7 @@ func (alloc *autoIDValue) rebase4Unsigned(ctx context.Context,
startTime := time.Now()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down Expand Up @@ -221,7 +222,7 @@ func (alloc *autoIDValue) rebase4Signed(ctx context.Context, store kv.Storage, d
var newBase, newEnd int64
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down Expand Up @@ -451,7 +452,7 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*
func (alloc *autoIDValue) forceRebase(ctx context.Context, store kv.Storage, dbID, tblID, requiredBase int64, isUnsigned bool) error {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ var (
ErrStorageInvalidPermission = errors.Normalize("external storage permission", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidPermission"))

// Snapshot restore
ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch"))
ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer"))
ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch"))
ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer"))
ErrRestoreRegionWithoutPeer = errors.Normalize("restore met a region without any peer", errors.RFCCodeText("BR:EBS:ErrRestoreRegionWithoutPeer"))

// Errors reported from TiKV.
ErrKVStorage = errors.Normalize("tikv storage occur I/O error", errors.RFCCodeText("BR:KV:ErrKVStorage"))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type panickingAllocator struct {
func NewPanickingAllocators(base int64) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
false,
&panickingAllocator{base: sharedBase, ty: autoid.RowIDAllocType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoIncrementType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoRandomType},
Expand Down
23 changes: 14 additions & 9 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ type RecoverRegion struct {
// 2. build a leader list for all region during the tikv startup
// 3. get max allocate id
func (recovery *Recovery) MakeRecoveryPlan() error {
storeBalanceScore := make(map[uint64]int, len(recovery.allStores))
// Group region peer info by region id. find the max allocateId
// region [id] [peer[0-n]]
var regions = make(map[uint64][]*RecoverRegion, 0)
Expand Down Expand Up @@ -410,16 +411,20 @@ func (recovery *Recovery) MakeRecoveryPlan() error {
}
} else {
// Generate normal commands.
log.Debug("detected valid peer", zap.Uint64("region id", regionId))
for i, peer := range peers {
log.Debug("make plan", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId))
plan := &recovpb.RecoverRegionRequest{RegionId: peer.RegionId, AsLeader: i == 0}
// sorted by log term -> last index -> commit index in a region
if plan.AsLeader {
log.Debug("as leader peer", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId))
recovery.RecoveryPlan[peer.StoreId] = append(recovery.RecoveryPlan[peer.StoreId], plan)
}
log.Debug("detected valid region", zap.Uint64("region id", regionId))
// calc the leader candidates
leaderCandidates, err := LeaderCandidates(peers)
if err != nil {
log.Warn("region without peer", zap.Uint64("region id", regionId))
return errors.Trace(err)
}

// select the leader base on tikv storeBalanceScore
leader := SelectRegionLeader(storeBalanceScore, leaderCandidates)
log.Debug("as leader peer", zap.Uint64("store id", leader.StoreId), zap.Uint64("region id", leader.RegionId))
plan := &recovpb.RecoverRegionRequest{RegionId: leader.RegionId, AsLeader: true}
recovery.RecoveryPlan[leader.StoreId] = append(recovery.RecoveryPlan[leader.StoreId], plan)
storeBalanceScore[leader.StoreId] += 1
}
}
return nil
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,3 +750,43 @@ func CheckConsistencyAndValidPeer(regionInfos []*RecoverRegionInfo) (map[uint64]
}
return validPeers, nil
}

// in cloud, since iops and bandwidth limitation, write operator in raft is slow, so raft state (logterm, lastlog, commitlog...) are the same among the peers
// LeaderCandidates select all peers can be select as a leader during the restore
func LeaderCandidates(peers []*RecoverRegion) ([]*RecoverRegion, error) {
if peers == nil {
return nil, errors.Annotatef(berrors.ErrRestoreRegionWithoutPeer,
"invalid region range")
}
candidates := make([]*RecoverRegion, 0, len(peers))
// by default, the peers[0] to be assign as a leader, since peers already sorted by leader selection rule
leader := peers[0]
candidates = append(candidates, leader)
for _, peer := range peers[1:] {
// qualificated candidate is leader.logterm = candidate.logterm && leader.lastindex = candidate.lastindex && && leader.commitindex = candidate.commitindex
if peer.LastLogTerm == leader.LastLogTerm && peer.LastIndex == leader.LastIndex && peer.CommitIndex == leader.CommitIndex {
log.Debug("leader candidate", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId))
candidates = append(candidates, peer)
}
}
return candidates, nil
}

// for region A, has candidate leader x, y, z
// peer x on store 1 with storeBalanceScore 3
// peer y on store 3 with storeBalanceScore 2
// peer z on store 4 with storeBalanceScore 1
// result: peer z will be select as leader on store 4
func SelectRegionLeader(storeBalanceScore map[uint64]int, peers []*RecoverRegion) *RecoverRegion {
// by default, the peers[0] to be assign as a leader
leader := peers[0]
minLeaderStore := storeBalanceScore[leader.StoreId]
for _, peer := range peers[1:] {
log.Debug("leader candidate", zap.Int("score", storeBalanceScore[peer.StoreId]), zap.Int("min-score", minLeaderStore), zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId))
if storeBalanceScore[peer.StoreId] < minLeaderStore {
minLeaderStore = storeBalanceScore[peer.StoreId]
leader = peer
}
}
return leader
}
49 changes: 49 additions & 0 deletions br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,52 @@ func TestCheckConsistencyAndValidPeer(t *testing.T) {
require.Error(t, err)
require.Regexp(t, ".*invalid restore range.*", err.Error())
}

func TestLeaderCandidates(t *testing.T) {
//key space is continuous
validPeer1 := newPeerMeta(9, 11, 2, []byte(""), []byte("bb"), 2, 1, 0, 0, false)
validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false)
validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false)

peers := []*restore.RecoverRegion{
validPeer1,
validPeer2,
validPeer3,
}

candidates, err := restore.LeaderCandidates(peers)
require.NoError(t, err)
require.Equal(t, 3, len(candidates))
}

func TestSelectRegionLeader(t *testing.T) {
validPeer1 := newPeerMeta(9, 11, 2, []byte(""), []byte("bb"), 2, 1, 0, 0, false)
validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false)
validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false)

peers := []*restore.RecoverRegion{
validPeer1,
validPeer2,
validPeer3,
}
// init store banlance score all is 0
storeBalanceScore := make(map[uint64]int, len(peers))
leader := restore.SelectRegionLeader(storeBalanceScore, peers)
require.Equal(t, validPeer1, leader)

// change store banlance store
storeBalanceScore[2] = 3
storeBalanceScore[3] = 2
storeBalanceScore[1] = 1
leader = restore.SelectRegionLeader(storeBalanceScore, peers)
require.Equal(t, validPeer3, leader)

// one peer
peer := []*restore.RecoverRegion{
validPeer3,
}
// init store banlance score all is 0
storeScore := make(map[uint64]int, len(peer))
leader = restore.SelectRegionLeader(storeScore, peer)
require.Equal(t, validPeer3, leader)
}
10 changes: 7 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1543,7 +1543,7 @@ func checkAndApplyAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,
return nil
}
idAcc := m.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID)
err := checkNewAutoRandomBits(idAcc, oldCol, newCol, newAutoRandBits, tblInfo.AutoRandomRangeBits, tblInfo.Version)
err := checkNewAutoRandomBits(idAcc, oldCol, newCol, newAutoRandBits, tblInfo.AutoRandomRangeBits, tblInfo.SepAutoInc())
if err != nil {
return err
}
Expand All @@ -1552,13 +1552,17 @@ func checkAndApplyAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,

// checkNewAutoRandomBits checks whether the new auto_random bits number can cause overflow.
func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.ColumnInfo,
newCol *model.ColumnInfo, newShardBits, newRangeBits uint64, tblVer uint16) error {
newCol *model.ColumnInfo, newShardBits, newRangeBits uint64, sepAutoInc bool) error {
shardFmt := autoid.NewShardIDFormat(&newCol.FieldType, newShardBits, newRangeBits)

idAcc := idAccessors.RandomID()
convertedFromAutoInc := mysql.HasAutoIncrementFlag(oldCol.GetFlag())
if convertedFromAutoInc {
idAcc = idAccessors.IncrementID(tblVer)
if sepAutoInc {
idAcc = idAccessors.IncrementID(model.TableInfoVersion5)
} else {
idAcc = idAccessors.RowID()
}
}
// Generate a new auto ID first to prevent concurrent update in DML.
_, err := idAcc.Inc(1)
Expand Down
Loading

0 comments on commit 83c2880

Please sign in to comment.