diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 70cfee255651..1247d5f44a9d 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -183,6 +183,7 @@ type proposalResult struct { Reply *roachpb.BatchResponse Err *roachpb.Error ProposalRetry proposalRetryReason + Intents []intentsWithArg } type replicaChecksum struct { @@ -1869,6 +1870,11 @@ func (r *Replica) tryAddWriteCmd( // Set endCmds to nil because they have already been invoked // in processRaftCommand. endCmds = nil + if propResult.Intents != nil { + // Synchronously process any intents that need resolving here in order + // to apply back pressure on the client which generated them. + r.store.intentResolver.processIntents(r, propResult.Intents) + } return propResult.Reply, propResult.Err, propResult.ProposalRetry case <-ctxDone: // If our context was cancelled, return an AmbiguousResultError @@ -2077,12 +2083,13 @@ func (r *Replica) propose( // An error here corresponds to a failfast-proposal: The command resulted // in an error and did not need to commit a batch (the common error case). if pErr != nil { + intents := pCmd.Local.detachIntents() r.handleEvalResult(ctx, repDesc, pCmd.Local, pCmd.Replicated) if endCmds != nil { endCmds.done(nil, pErr, proposalNoRetry) } ch := make(chan proposalResult, 1) - ch <- proposalResult{Err: pErr} + ch <- proposalResult{Err: pErr, Intents: intents} close(ch) return ch, func() bool { return false }, nil } @@ -3287,6 +3294,7 @@ func (r *Replica) processRaftCommand( } else { log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd) } + response.Intents = cmd.Local.detachIntents() lResult = cmd.Local } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 231ed3fe3dce..4bc26bbb9426 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -110,6 +110,15 @@ func (lResult *LocalEvalResult) finish(pr proposalResult) { close(lResult.doneCh) } +func (lResult *LocalEvalResult) detachIntents() []intentsWithArg { + if lResult.intents == nil { + return nil + } + intents := *lResult.intents + lResult.intents = nil + return intents +} + // EvalResult is the result of evaluating a KV request. That is, the // proposer (which holds the lease, at least in the case in which the command // will complete successfully) has evaluated the request and is holding on to: @@ -600,22 +609,6 @@ func (r *Replica) handleLocalEvalResult( // Non-state updates and actions. // ====================== - if originReplica.StoreID == r.store.StoreID() { - // On the replica on which this command originated, resolve skipped - // intents asynchronously - even on failure. - // - // TODO(tschottdorf): EndTransaction will use this pathway to return - // intents which should immediately be resolved. However, there's - // a slight chance that an error between the origin of that intents - // slice and here still results in that intent slice arriving here - // without the EndTransaction having committed. We should clearly - // separate the part of the EvalResult which also applies on errors. - if lResult.intents != nil { - r.store.intentResolver.processIntentsAsync(r, *lResult.intents) - } - } - lResult.intents = nil - // The above are present too often, so we assert only if there are // "nontrivial" actions below. shouldAssert = (lResult != LocalEvalResult{})