Skip to content

Commit

Permalink
storage: process intents synchronously
Browse files Browse the repository at this point in the history
Process intents synchronously on the goroutine which generated them.
  • Loading branch information
petermattis committed Nov 23, 2016
1 parent 88eb869 commit 7a6dd5c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
10 changes: 9 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type proposalResult struct {
Reply *roachpb.BatchResponse
Err *roachpb.Error
ProposalRetry proposalRetryReason
Intents []intentsWithArg
}

type replicaChecksum struct {
Expand Down Expand Up @@ -1869,6 +1870,11 @@ func (r *Replica) tryAddWriteCmd(
// Set endCmds to nil because they have already been invoked
// in processRaftCommand.
endCmds = nil
if propResult.Intents != nil {
// Synchronously process any intents that need resolving here in order
// to apply back pressure on the client which generated them.
r.store.intentResolver.processIntents(r, propResult.Intents)
}
return propResult.Reply, propResult.Err, propResult.ProposalRetry
case <-ctxDone:
// If our context was cancelled, return an AmbiguousResultError
Expand Down Expand Up @@ -2077,12 +2083,13 @@ func (r *Replica) propose(
// An error here corresponds to a failfast-proposal: The command resulted
// in an error and did not need to commit a batch (the common error case).
if pErr != nil {
intents := pCmd.Local.detachIntents()
r.handleEvalResult(ctx, repDesc, pCmd.Local, pCmd.Replicated)
if endCmds != nil {
endCmds.done(nil, pErr, proposalNoRetry)
}
ch := make(chan proposalResult, 1)
ch <- proposalResult{Err: pErr}
ch <- proposalResult{Err: pErr, Intents: intents}
close(ch)
return ch, func() bool { return false }, nil
}
Expand Down Expand Up @@ -3287,6 +3294,7 @@ func (r *Replica) processRaftCommand(
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd)
}
response.Intents = cmd.Local.detachIntents()
lResult = cmd.Local
}

Expand Down
25 changes: 9 additions & 16 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ func (lResult *LocalEvalResult) finish(pr proposalResult) {
close(lResult.doneCh)
}

func (lResult *LocalEvalResult) detachIntents() []intentsWithArg {
if lResult.intents == nil {
return nil
}
intents := *lResult.intents
lResult.intents = nil
return intents
}

// EvalResult is the result of evaluating a KV request. That is, the
// proposer (which holds the lease, at least in the case in which the command
// will complete successfully) has evaluated the request and is holding on to:
Expand Down Expand Up @@ -600,22 +609,6 @@ func (r *Replica) handleLocalEvalResult(
// Non-state updates and actions.
// ======================

if originReplica.StoreID == r.store.StoreID() {
// On the replica on which this command originated, resolve skipped
// intents asynchronously - even on failure.
//
// TODO(tschottdorf): EndTransaction will use this pathway to return
// intents which should immediately be resolved. However, there's
// a slight chance that an error between the origin of that intents
// slice and here still results in that intent slice arriving here
// without the EndTransaction having committed. We should clearly
// separate the part of the EvalResult which also applies on errors.
if lResult.intents != nil {
r.store.intentResolver.processIntentsAsync(r, *lResult.intents)
}
}
lResult.intents = nil

// The above are present too often, so we assert only if there are
// "nontrivial" actions below.
shouldAssert = (lResult != LocalEvalResult{})
Expand Down

0 comments on commit 7a6dd5c

Please sign in to comment.