Skip to content

Commit

Permalink
storage: test lease renewal during merge transaction
Browse files Browse the repository at this point in the history
A lease renewal during a merge transaction can cause GetSnapshotForMerge
to be executed on a replica that has already noticed that a merge is in
progress. This is a safe and expected case, but the previous
implementation did not handle it correctly. See the comments within the
patch for a more detailed explanation of the subtleties involved.

The TestStoreRangeMergeWithData is adjusted to force these lease
renewals for reliable test coverage. They otherwise happened only rarely
under stress.

Release note: None
  • Loading branch information
benesch committed Jul 12, 2018
1 parent 8dac5f9 commit 2ed1515
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ func (*AddSSTableRequest) flags() int { return isWrite | isAlone | isRange
func (*RefreshRequest) flags() int { return isRead | isTxn | updatesReadTSCache }
func (*RefreshRangeRequest) flags() int { return isRead | isTxn | isRange | updatesReadTSCache }

func (*GetSnapshotForMergeRequest) flags() int { return isRead | updatesReadTSCache }
func (*GetSnapshotForMergeRequest) flags() int { return isRead | isAlone | updatesReadTSCache }

// Keys returns credentials in an aws.Config.
func (b *ExportStorage_S3) Keys() *aws.Config {
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachpb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,10 @@ message RefreshRangeResponse {
// that ensure there is no moment in time where the ranges involved in the merge
// could both process commands for the same keys. See the comment on
// GetSnapshotForMerge for details.
//
// GetSnapshotForMerge may return stale data when used outside of a merge
// transaction. As a rule of thumb, it is incorrect to call GetSnapshotForMerge,
// except from its carefully-chosen location within a merge transaction.
message GetSnapshotForMergeRequest {
option (gogoproto.equal) = true;

Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ func (ba *BatchRequest) IsSingleEndTransactionRequest() bool {
return false
}

// IsSingleGetSnapshotForMergeRequest returns true iff the batch contains a
// single request, and that request is an GetSnapshotForMergeRequest.
func (ba *BatchRequest) IsSingleGetSnapshotForMergeRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*GetSnapshotForMergeRequest)
return ok
}
return false
}

// GetPrevLeaseForLeaseRequest returns the previous lease, at the time
// of proposal, for a request lease or transfer lease request. If the
// batch does not contain a single lease request, this method will panic.
Expand Down
12 changes: 9 additions & 3 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
func TestStoreRangeMergeWithData(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("#27401")

for _, colocate := range []bool{false, true} {
for _, retries := range []int64{0, 3} {
t.Run(fmt.Sprintf("colocate=%v/retries=%d", colocate, retries), func(t *testing.T) {
Expand All @@ -203,18 +201,26 @@ func mergeWithData(t *testing.T, colocate bool, retries int64) {
sc.TestingKnobs.DisableReplicateQueue = true

// Maybe inject some retryable errors when the merge transaction commits.
var mtc *multiTestContext
sc.TestingKnobs.TestingRequestFilter = func(ba roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
if et := req.GetEndTransaction(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil {
if atomic.AddInt64(&retries, -1) >= 0 {
return roachpb.NewError(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE))
}
}
if req.GetGetSnapshotForMerge() != nil {
// Introduce targeted chaos by forcing a lease acquisition before
// GetSnapshotForMerge can execute. This triggers an unusual code path
// where the lease acquisition, not GetSnapshotForMerge, notices the
// merge and installs a mergeComplete channel on the replica.
mtc.advanceClock(ctx)
}
}
return nil
}

mtc := &multiTestContext{storeConfig: &sc}
mtc = &multiTestContext{storeConfig: &sc}

var store1, store2 *storage.Store
if colocate {
Expand Down
39 changes: 34 additions & 5 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2360,7 +2360,7 @@ func (r *Replica) beginCmds(
r.mu.RLock()
mergeCompleteCh := r.mu.mergeComplete
r.mu.RUnlock()
if mergeCompleteCh != nil {
if mergeCompleteCh != nil && !ba.IsSingleGetSnapshotForMergeRequest() {
// The replica is being merged into its left-hand neighbor. This request
// cannot proceed until the merge completes, signaled by the closing of
// the channel.
Expand All @@ -2370,6 +2370,34 @@ func (r *Replica) beginCmds(
// guaranteed that we're not racing with a GetSnapshotForMerge command.
// (GetSnapshotForMerge commands declare a conflict with all other
// commands.)
//
// Note that GetSnapshotForMerge commands are exempt from waiting on the
// mergeComplete channel. This is necessary to avoid deadlock. While
// normally a GetSnapshotForMerge request will trigger the installation of
// a mergeComplete channel after it is executed, it may sometimes execute
// after the mergeComplete channel has been installed. Consider the case
// where the RHS replica acquires a new lease after the merge transaction
// deletes its local range descriptor but before the GetSnapshotForMerge
// command is sent. The lease acquisition request will notice the intent
// on the local range descriptor and install a mergeComplete channel. If
// the forthcoming GetSnapshotForMerge blocked on that channel, the merge
// transaction would deadlock.
//
// This exclusion admits a small race condition. If a GetSnapshotForMerge
// request is sent to the right-hand side of a merge, outside of a merge
// transaction, after the merge has committed but before the RHS has
// noticed that the merge has committed, the request may return stale
// data. Since the merge has committed, the LHS may have processed writes
// to the keyspace previously owned by the RHS that the RHS is unaware of.
// This window closes quickly, as the RHS will soon notice the merge
// transaction has committed and mark itself as destroyed, which prevents
// it from serving all traffic, including GetSnapshotForMerge requests.
//
// In our current, careful usage of GetSnapshotForMerge, this race
// condition is irrelevant. GetSnapshotForMerge is only sent from within a
// merge transaction, and merge transactions read the RHS descriptor at
// the beginning of the transaction to verify that it has not already been
// merged away.
select {
case <-mergeCompleteCh:
// Merge complete. Carry on.
Expand Down Expand Up @@ -2696,10 +2724,11 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error {
mergeCompleteCh := make(chan struct{})
r.mu.Lock()
if r.mu.mergeComplete != nil {
// TODO(benesch): Is there a way this could legitimately happen? If so,
// what to do? Return and assume there's another goroutine that's doing
// the watching?
log.Fatalf(ctx, "maybeWatchForMerge called twice for the same merge")
// Another request already noticed the merge, installed a mergeComplete
// channel, and launched a goroutine to watch for the merge's completion.
// Nothing more to do.
r.mu.Unlock()
return nil
}
r.mu.mergeComplete = mergeCompleteCh
r.mu.Unlock()
Expand Down

0 comments on commit 2ed1515

Please sign in to comment.