Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc_worker: resolve locks on offline stores (#18383) #18550

Merged
merged 3 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) {
return w.loadGCConcurrencyWithDefault()
}

stores, err := w.getUpStoresForGC(ctx)
stores, err := w.getStoresForGC(ctx)
concurrency := len(stores)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency. use config.",
Expand Down Expand Up @@ -457,8 +457,8 @@ func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) {
return true, nil
}

// validateGCLiftTime checks whether life time is small than min gc life time.
func (w *GCWorker) validateGCLiftTime(lifeTime time.Duration) (time.Duration, error) {
// validateGCLifeTime checks whether life time is small than min gc life time.
func (w *GCWorker) validateGCLifeTime(lifeTime time.Duration) (time.Duration, error) {
if lifeTime >= gcMinLifeTime {
return lifeTime, nil
}
Expand All @@ -476,7 +476,7 @@ func (w *GCWorker) calculateNewSafePoint(ctx context.Context, now time.Time) (*t
if err != nil {
return nil, 0, errors.Trace(err)
}
*lifeTime, err = w.validateGCLiftTime(*lifeTime)
*lifeTime, err = w.validateGCLifeTime(*lifeTime)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -710,7 +710,7 @@ func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64, concu

func (w *GCWorker) doUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte, concurrency int) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := w.getUpStoresForGC(ctx)
stores, err := w.getStoresForGC(ctx)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] delete ranges: got an error while trying to get store list from PD",
zap.String("uuid", w.uuid),
Expand Down Expand Up @@ -786,8 +786,14 @@ const (
// needsGCOperationForStore checks if the store-level requests related to GC needs to be sent to the store. The store-level
// requests includes UnsafeDestroyRange, PhysicalScanLock, etc.
func needsGCOperationForStore(store *metapb.Store) (bool, error) {
engineLabel := ""
// TombStone means the store has been removed from the cluster and there isn't any peer on the store, so needn't do GC for it.
// Offline means the store is being removed from the cluster and it becomes tombstone after all peers are removed from it,
// so we need to do GC for it.
if store.State == metapb.StoreState_Tombstone {
return false, nil
}

engineLabel := ""
for _, label := range store.GetLabels() {
if label.GetKey() == engineLabelKey {
engineLabel = label.GetValue()
Expand Down Expand Up @@ -816,18 +822,15 @@ func needsGCOperationForStore(store *metapb.Store) (bool, error) {
}
}

// getUpStoresForGC gets the list of stores that needs to be processed during GC.
func (w *GCWorker) getUpStoresForGC(ctx context.Context) ([]*metapb.Store, error) {
// getStoresForGC gets the list of stores that needs to be processed during GC.
func (w *GCWorker) getStoresForGC(ctx context.Context) ([]*metapb.Store, error) {
stores, err := w.pdClient.GetAllStores(ctx)
if err != nil {
return nil, errors.Trace(err)
}

upStores := make([]*metapb.Store, 0, len(stores))
for _, store := range stores {
if store.State != metapb.StoreState_Up {
continue
}
needsGCOp, err := needsGCOperationForStore(store)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -839,8 +842,8 @@ func (w *GCWorker) getUpStoresForGC(ctx context.Context) ([]*metapb.Store, error
return upStores, nil
}

func (w *GCWorker) getUpStoresMapForGC(ctx context.Context) (map[uint64]*metapb.Store, error) {
stores, err := w.getUpStoresForGC(ctx)
func (w *GCWorker) getStoresMapForGC(ctx context.Context) (map[uint64]*metapb.Store, error) {
stores, err := w.getStoresForGC(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1080,7 +1083,7 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
registeredStores := make(map[uint64]*metapb.Store)
defer w.removeLockObservers(ctx, safePoint, registeredStores)

dirtyStores, err := w.getUpStoresMapForGC(ctx)
dirtyStores, err := w.getStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1101,7 +1104,7 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e

failpoint.Inject("beforeCheckLockObservers", func() {})

stores, err := w.getUpStoresMapForGC(ctx)
stores, err := w.getStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}
Expand Down
39 changes: 21 additions & 18 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,32 +497,35 @@ func (s *testGCWorkerSuite) TestCheckScanLockMode(c *C) {
}

func (s *testGCWorkerSuite) TestNeedsGCOperationForStore(c *C) {
newStore := func(hasEngineLabel bool, engineLabel string) *metapb.Store {
newStore := func(state metapb.StoreState, hasEngineLabel bool, engineLabel string) *metapb.Store {
store := &metapb.Store{}
store.State = state
if hasEngineLabel {
store.Labels = []*metapb.StoreLabel{{Key: engineLabelKey, Value: engineLabel}}
}
return store
}

// TiKV needs to do the store-level GC operations.
res, err := needsGCOperationForStore(newStore(false, ""))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)
res, err = needsGCOperationForStore(newStore(true, ""))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)
res, err = needsGCOperationForStore(newStore(true, engineLabelTiKV))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)

// TiFlash does not need these operations.
res, err = needsGCOperationForStore(newStore(true, engineLabelTiFlash))
c.Assert(err, IsNil)
c.Assert(res, IsFalse)
for _, state := range []metapb.StoreState{metapb.StoreState_Up, metapb.StoreState_Offline, metapb.StoreState_Tombstone} {
needGC := state != metapb.StoreState_Tombstone
res, err := needsGCOperationForStore(newStore(state, false, ""))
c.Assert(err, IsNil)
c.Assert(res, Equals, needGC)
res, err = needsGCOperationForStore(newStore(state, true, ""))
c.Assert(err, IsNil)
c.Assert(res, Equals, needGC)
res, err = needsGCOperationForStore(newStore(state, true, engineLabelTiKV))
c.Assert(err, IsNil)
c.Assert(res, Equals, needGC)

// TiFlash does not need these operations.
res, err = needsGCOperationForStore(newStore(state, true, engineLabelTiFlash))
c.Assert(err, IsNil)
c.Assert(res, IsFalse)
}
// Throw an error for unknown store types.
_, err = needsGCOperationForStore(newStore(true, "invalid"))
_, err := needsGCOperationForStore(newStore(metapb.StoreState_Up, true, "invalid"))
c.Assert(err, NotNil)
}

Expand Down Expand Up @@ -569,7 +572,7 @@ func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) {
c.Assert(err, IsNil)
c.Assert(preparedRanges, DeepEquals, ranges)

stores, err := s.gcWorker.getUpStoresForGC(context.Background())
stores, err := s.gcWorker.getStoresForGC(context.Background())
c.Assert(err, IsNil)
c.Assert(len(stores), Equals, 3)

Expand Down Expand Up @@ -977,7 +980,7 @@ func (s *testGCWorkerSuite) makeMergedMockClient(c *C, count int) (*mergeLockSca

const scanLockLimit = 3

storesMap, err := s.gcWorker.getUpStoresMapForGC(context.Background())
storesMap, err := s.gcWorker.getStoresMapForGC(context.Background())
c.Assert(err, IsNil)
scanner := newMergeLockScanner(100000, s.client, storesMap)
scanner.scanLockLimit = scanLockLimit
Expand Down