Skip to content

Commit

Permalink
tikv: set undetermined error when async commit prewrite rpc fails (#2…
Browse files Browse the repository at this point in the history
…0150)

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf authored Sep 23, 2020
1 parent 6271bd4 commit 592b244
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 8 deletions.
6 changes: 6 additions & 0 deletions store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
r := req.Prewrite()
c.cluster.handleDelay(r.StartVersion, r.Context.RegionId)
resp.Resp, err = c.usSvr.KvPrewrite(ctx, r)

failpoint.Inject("rpcPrewriteTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, undeterminedErr)
}
})
case tikvrpc.CmdPessimisticLock:
r := req.PessimisticLock()
c.cluster.handleDelay(r.StartVersion, r.Context.RegionId)
Expand Down
19 changes: 17 additions & 2 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,9 +756,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
}
} else {
// The error means the async commit should not succeed.
// TODO: Handle undetermined case here.
if err != nil {
c.cleanup(ctx)
if c.getUndeterminedErr() == nil {
c.cleanup(ctx)
}
tikvAsyncCommitTxnCounterError.Inc()
} else {
tikvAsyncCommitTxnCounterOk.Inc()
Expand All @@ -775,6 +776,20 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
prewriteBo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars)
start := time.Now()
err = c.prewriteMutations(prewriteBo, c.mutations)

if err != nil {
// TODO: Now we return an undetermined error as long as one of the prewrite
// RPCs fails. However, if there are multiple errors and some of the errors
// are not RPC failures, we can return the actual error instead of undetermined.
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
logutil.Logger(ctx).Error("2PC commit result undetermined",
zap.Error(err),
zap.NamedError("rpcErr", undeterminedErr),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(terror.ErrResultUndetermined)
}
}

commitDetail := c.getDetail()
commitDetail.PrewriteTime = time.Since(start)
if prewriteBo.totalSleep > 0 {
Expand Down
83 changes: 83 additions & 0 deletions store/tikv/async_commit_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"bytes"
"context"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore/cluster"
"github.com/pingcap/tidb/store/mockstore/unistore"
)

type testAsyncCommitFailSuite struct {
OneByOneSuite
cluster cluster.Cluster
store *tikvStore
}

var _ = SerialSuites(&testAsyncCommitFailSuite{})

func (s *testAsyncCommitFailSuite) SetUpTest(c *C) {
client, pdClient, cluster, err := unistore.New("")
c.Assert(err, IsNil)
unistore.BootstrapWithSingleStore(cluster)
s.cluster = cluster
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
c.Assert(err, IsNil)

s.store = store.(*tikvStore)
}

// TestFailCommitPrimaryRpcErrors tests rpc errors are handled properly when
// committing primary region task.
func (s *testAsyncCommitFailSuite) TestFailAsyncCommitPrewriteRpcErrors(c *C) {
defer config.RestoreFunc()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.EnableAsyncCommit = true
})
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/noRetryOnRpcError", "return(true)"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcPrewriteTimeout", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcPrewriteTimeout"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/noRetryOnRpcError"), IsNil)
}()
// The rpc error will be wrapped to ErrResultUndetermined.
t1, err := s.store.Begin()
c.Assert(err, IsNil)
err = t1.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
ctx := context.WithValue(context.Background(), sessionctx.ConnID, uint64(1))
err = t1.Commit(ctx)
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err)))

// We don't need to call "Rollback" after "Commit" fails.
err = t1.Rollback()
c.Assert(err, Equals, kv.ErrInvalidTxn)

// Create a new transaction to check. The previous transaction should actually commit.
t2, err := s.store.Begin()
c.Assert(err, IsNil)
res, err := t2.Get(context.Background(), []byte("a"))
c.Assert(err, IsNil)
c.Assert(bytes.Equal(res, []byte("a1")), IsTrue)
}
10 changes: 5 additions & 5 deletions store/tikv/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/mockstore/cluster"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

Expand All @@ -39,10 +39,10 @@ type testAsyncCommitSuite struct {
var _ = Suite(&testAsyncCommitSuite{})

func (s *testAsyncCommitSuite) SetUpTest(c *C) {
client, clstr, pdClient, err := mocktikv.NewTiKVAndPDClient("")
client, pdClient, cluster, err := unistore.New("")
c.Assert(err, IsNil)
mocktikv.BootstrapWithSingleStore(clstr)
s.cluster = clstr
unistore.BootstrapWithSingleStore(cluster)
s.cluster = cluster
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
c.Assert(err, IsNil)

Expand Down Expand Up @@ -120,10 +120,10 @@ func (s *testAsyncCommitSuite) mustGetLock(c *C, key []byte) *Lock {
}

func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.EnableAsyncCommit = true
})
defer config.RestoreFunc()()

s.putAlphabets(c)

Expand Down
10 changes: 9 additions & 1 deletion store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,15 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff

req := c.buildPrewriteRequest(batch, txnSize)
for {
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)

// If we fail to receive response for async commit prewrite, it will be undetermined whether this
// transaction has been successfully committed.
if c.isAsyncCommit() && sender.rpcError != nil {
c.setUndeterminedErr(errors.Trace(sender.rpcError))
}

if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 5 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext,
}

s.rpcError = err
failpoint.Inject("noRetryOnRpcError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, false, err)
}
})
if e := s.onSendFail(bo, rpcCtx, err); e != nil {
return nil, false, errors.Trace(e)
}
Expand Down

0 comments on commit 592b244

Please sign in to comment.