Skip to content

Commit

Permalink
Merge branch 'master' into improve-observe
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored May 21, 2024
2 parents 6c3f0ea + 2fabb74 commit b770f44
Show file tree
Hide file tree
Showing 31 changed files with 985 additions and 743 deletions.
29 changes: 21 additions & 8 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,33 @@ jobs:
strategy:
fail-fast: true
matrix:
worker_id: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
include:
- worker_id: 1
name: 'Unit Test(1)'
- worker_id: 2
name: 'Unit Test(2)'
- worker_id: 3
name: 'Tools Test'
- worker_id: 4
name: 'Client Integration Test'
- worker_id: 5
name: 'TSO Integration Test'
- worker_id: 6
name: 'MicroService Integration Test'
outputs:
job-total: 13
job-total: 6
steps:
- name: Checkout code
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.21'
- name: Make Test
- name: ${{ matrix.name }}
env:
WORKER_ID: ${{ matrix.worker_id }}
WORKER_COUNT: 13
JOB_COUNT: 9 # 10 is tools test, 11, 12, 13 are for other integrations jobs
run: |
make ci-test-job JOB_COUNT=$(($JOB_COUNT)) JOB_INDEX=$WORKER_ID
make ci-test-job JOB_INDEX=$WORKER_ID
mv covprofile covprofile_$WORKER_ID
sed -i "/failpoint_binding/d" covprofile_$WORKER_ID
- name: Upload coverage result ${{ matrix.worker_id }}
uses: actions/upload-artifact@v4
with:
Expand All @@ -62,7 +71,11 @@ jobs:
- name: Merge
env:
TOTAL_JOBS: ${{needs.chunks.outputs.job-total}}
run: for i in $(seq 1 $TOTAL_JOBS); do cat covprofile_$i >> covprofile; done
run: |
for i in $(seq 1 $TOTAL_JOBS); do cat covprofile_$i >> covprofile; done
sed -i "/failpoint_binding/d" covprofile
# only keep the first line(`mode: aomic`) of the coverage profile
sed -i '2,${/mode: atomic/d;}' covprofile
- name: Send coverage
uses: codecov/codecov-action@v4.2.0
with:
Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ regions-dump:
stores-dump:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/stores-dump stores-dump/main.go
pd-ut: pd-xprog
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ut pd-ut/ut.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ut pd-ut/ut.go pd-ut/coverProfile.go
pd-xprog:
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -tags xprog -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/xprog pd-ut/xprog.go

Expand Down Expand Up @@ -227,7 +227,8 @@ failpoint-disable: install-tools

ut: pd-ut
@$(FAILPOINT_ENABLE)
./bin/pd-ut run --race
# only run unit tests
./bin/pd-ut run --ignore tests --race
@$(CLEAN_UT_BINARY)
@$(FAILPOINT_DISABLE)

Expand All @@ -251,7 +252,7 @@ basic-test: install-tools
go test $(BASIC_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

ci-test-job: install-tools dashboard-ui
ci-test-job: install-tools dashboard-ui pd-ut
@$(FAILPOINT_ENABLE)
./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)
Expand Down
6 changes: 0 additions & 6 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,3 @@ flag_management:
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.
- type: patch
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.

ignore:
# Ignore the tool tests
- tests/dashboard
- tests/pdbackup
- tests/pdctl
203 changes: 67 additions & 136 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,8 @@ type RegionsInfo struct {
learners map[uint64]*regionTree // storeID -> sub regionTree
witnesses map[uint64]*regionTree // storeID -> sub regionTree
pendingPeers map[uint64]*regionTree // storeID -> sub regionTree
// This tree is used to check the overlaps among all the subtrees.
overlapTree *regionTree
}

// NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers
Expand All @@ -927,6 +929,7 @@ func NewRegionsInfo() *RegionsInfo {
learners: make(map[uint64]*regionTree),
witnesses: make(map[uint64]*regionTree),
pendingPeers: make(map[uint64]*regionTree),
overlapTree: newRegionTreeWithCountRef(),
}
}

Expand Down Expand Up @@ -1041,10 +1044,10 @@ func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *Regio
// Usually used with CheckAndPutRootTree together.
func (r *RegionsInfo) CheckAndPutSubTree(region *RegionInfo) {
// new region get from root tree again
var newRegion *RegionInfo
newRegion = r.GetRegion(region.GetID())
newRegion := r.GetRegion(region.GetID())
if newRegion == nil {
newRegion = region
// Make sure there is this region in the root tree, so as to ensure the correctness of reference count
return
}
r.UpdateSubTreeOrderInsensitive(newRegion)
}
Expand All @@ -1066,110 +1069,98 @@ func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
origin = originItem.RegionInfo
}
rangeChanged := true

if origin != nil {
rangeChanged = !origin.rangeEqualsTo(region)
if r.preUpdateSubTreeLocked(rangeChanged, !origin.peersEqualTo(region), true, origin, region) {
return
}
}
r.updateSubTreeLocked(rangeChanged, nil, region)
}

func (r *RegionsInfo) preUpdateSubTreeLocked(
rangeChanged, peerChanged, orderInsensitive bool,
origin, region *RegionInfo,
) (done bool) {
if orderInsensitive {
re := region.GetRegionEpoch()
oe := origin.GetRegionEpoch()
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() {
// Region meta is stale, skip.
return
return true
}
rangeChanged = !origin.rangeEqualsTo(region)

if rangeChanged || !origin.peersEqualTo(region) {
// If the range or peers have changed, the sub regionTree needs to be cleaned up.
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
if rangeChanged || peerChanged {
// If the range or peers have changed, clean up the subtrees before updating them.
// TODO: improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return true
}
return false
}

func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionInfo, region *RegionInfo) {
if rangeChanged {
overlaps := r.getOverlapRegionFromSubTreeLocked(region)
for _, re := range overlaps {
r.removeRegionFromSubTreeLocked(re)
// TODO: only perform the remove operation on the overlapped peer.
if len(overlaps) == 0 {
// If the range has changed but the overlapped regions are not provided, collect them by `[]*regionItem`.
for _, item := range r.getOverlapRegionFromOverlapTreeLocked(region) {
r.removeRegionFromSubTreeLocked(item.RegionInfo)
}
} else {
// Remove all provided overlapped regions from the subtrees.
for _, overlap := range overlaps {
r.removeRegionFromSubTreeLocked(overlap)
}
}
}

// Reinsert the region into all subtrees.
item := &regionItem{region}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, countRef bool) {
r.overlapTree.update(item, false)
// Add leaders and followers.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64) {
store, ok := peersMap[storeID]
if !ok {
if !countRef {
store = newRegionTree()
} else {
store = newRegionTreeWithCountRef()
}
store = newRegionTree()
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item, true)
setPeer(r.leaders, storeID)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item, false)
setPeer(r.followers, storeID)
}
}

// Add other peers.
setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item, false)
setPeer(peersMap, peer.GetStoreId())
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo {
it := &regionItem{RegionInfo: region}
overlaps := make([]*RegionInfo, 0)
overlapsMap := make(map[uint64]struct{})
collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
items := tree.overlaps(it)
for _, item := range items {
if _, ok := overlapsMap[item.GetID()]; !ok {
overlapsMap[item.GetID()] = struct{}{}
overlaps = append(overlaps, item.RegionInfo)
}
}
}
}
for _, peer := range region.GetMeta().GetPeers() {
storeID := peer.GetStoreId()
collectFromItemSlice(r.leaders, storeID)
collectFromItemSlice(r.followers, storeID)
collectFromItemSlice(r.learners, storeID)
collectFromItemSlice(r.witnesses, storeID)
}
return overlaps
func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*regionItem {
return r.overlapTree.overlaps(&regionItem{RegionInfo: region})
}

// GetRelevantRegions returns the relevant regions for a given region.
Expand Down Expand Up @@ -1275,72 +1266,11 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
r.st.Lock()
defer r.st.Unlock()
if origin != nil {
if rangeChanged || !origin.peersEqualTo(region) {
// If the range or peers have changed, the sub regionTree needs to be cleaned up.
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
if r.preUpdateSubTreeLocked(rangeChanged, !origin.peersEqualTo(region), false, origin, region) {
return
}
}
if rangeChanged {
for _, re := range overlaps {
r.removeRegionFromSubTreeLocked(re)
}
}

item := &regionItem{region}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, countRef bool) {
store, ok := peersMap[storeID]
if !ok {
if !countRef {
store = newRegionTree()
} else {
store = newRegionTreeWithCountRef()
}
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item, true)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item, false)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item, false)
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
r.updateSubTreeLocked(rangeChanged, overlaps, region)
}

func (r *RegionsInfo) updateSubTreeStat(origin *RegionInfo, region *RegionInfo) {
Expand Down Expand Up @@ -1394,7 +1324,7 @@ func (r *RegionsInfo) RemoveRegion(region *RegionInfo) {
// ResetRegionCache resets the regions info.
func (r *RegionsInfo) ResetRegionCache() {
r.t.Lock()
r.tree = newRegionTree()
r.tree = newRegionTreeWithCountRef()
r.regions = make(map[uint64]*regionItem)
r.t.Unlock()
r.st.Lock()
Expand All @@ -1404,6 +1334,7 @@ func (r *RegionsInfo) ResetRegionCache() {
r.learners = make(map[uint64]*regionTree)
r.witnesses = make(map[uint64]*regionTree)
r.pendingPeers = make(map[uint64]*regionTree)
r.overlapTree = newRegionTreeWithCountRef()
}

// RemoveRegionFromSubTree removes RegionInfo from regionSubTrees
Expand All @@ -1416,7 +1347,6 @@ func (r *RegionsInfo) RemoveRegionFromSubTree(region *RegionInfo) {

// removeRegionFromSubTreeLocked removes RegionInfo from regionSubTrees
func (r *RegionsInfo) removeRegionFromSubTreeLocked(region *RegionInfo) {
// Remove from leaders and followers.
for _, peer := range region.GetMeta().GetPeers() {
storeID := peer.GetStoreId()
r.leaders[storeID].remove(region)
Expand All @@ -1425,6 +1355,7 @@ func (r *RegionsInfo) removeRegionFromSubTreeLocked(region *RegionInfo) {
r.witnesses[storeID].remove(region)
r.pendingPeers[storeID].remove(region)
}
r.overlapTree.remove(region)
delete(r.subRegions, region.GetMeta().GetId())
}

Expand Down
Loading

0 comments on commit b770f44

Please sign in to comment.