From a77b2afa79b4479eb8595f4cfe81c2bb6d15159a Mon Sep 17 00:00:00 2001 From: Lei Zhao Date: Wed, 1 Dec 2021 16:35:10 +0800 Subject: [PATCH] txnkv: read through locks (#380) Signed-off-by: youjiali1995 Signed-off-by: MyonKeminta --- go.mod | 2 +- go.sum | 4 +- integration_tests/async_commit_test.go | 6 +- integration_tests/go.mod | 12 +- integration_tests/go.sum | 40 ++-- integration_tests/lock_test.go | 113 ++++++++-- integration_tests/snapshot_fail_test.go | 87 ++++++++ internal/client/client_batch.go | 2 +- internal/locate/region_request.go | 48 +++-- tikv/kv.go | 1 + tikv/test_probe.go | 9 + txnkv/transaction/pessimistic.go | 2 +- txnkv/transaction/prewrite.go | 2 +- txnkv/txnlock/lock_resolver.go | 267 ++++++++++++++---------- txnkv/txnlock/test_probe.go | 9 +- txnkv/txnsnapshot/client_helper.go | 41 ++-- txnkv/txnsnapshot/scan.go | 2 +- txnkv/txnsnapshot/snapshot.go | 10 +- 18 files changed, 449 insertions(+), 208 deletions(-) diff --git a/go.mod b/go.mod index 2933100ec..f6e6f9061 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad + github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.5.1 diff --git a/go.sum b/go.sum index 7bb8c9a99..c9d31e182 100644 --- a/go.sum +++ b/go.sum @@ -283,8 +283,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210819164333-bd5706b9d9f2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad h1:suBPTeuY6yVF7xvTGeTQ9+tiGzufnORJpCRwzbdN2sc= -github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f h1:hjInxK1Ie6CYx7Jy2pYnBdEnWI8jIfr423l9Yh6LRy8= +github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 0ef2b4c60..551572e26 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -255,7 +255,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { status := lockutil.NewLockStatus(nil, true, ts) resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) - err = resolver.ResolveLockAsync(s.bo, lock, status) + err = resolver.ResolveAsyncCommitLock(s.bo, lock, status) s.Nil(err) currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) @@ -334,7 +334,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { _ = s.beginAsyncCommit() - err = resolver.ResolveLockAsync(s.bo, lock, status) + err = resolver.ResolveAsyncCommitLock(s.bo, lock, status) s.Nil(err) s.Equal(gotCheckA, int64(1)) s.Equal(gotCheckB, int64(1)) @@ -371,7 +371,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { lock.TxnID = ts lock.MinCommitTS = ts + 5 - err = resolver.ResolveLockAsync(s.bo, lock, status) + err = resolver.ResolveAsyncCommitLock(s.bo, lock, status) s.Nil(err) s.Equal(gotCheckA, int64(1)) s.Equal(gotCheckB, int64(1)) diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 0a8c09cbd..d8dc67a5f 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -5,14 +5,14 @@ go 1.16 require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd - github.com/pingcap/kvproto v0.0.0-20211011060348-d957056f1551 - github.com/pingcap/tidb v1.1.0-beta.0.20211111035905-25d698c79730 - github.com/pingcap/tidb/parser v0.0.0-20211111035905-25d698c79730 // indirect + github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f + github.com/pingcap/tidb v1.1.0-beta.0.20211130051352-37e0dac25981 + github.com/pingcap/tidb/parser v0.0.0-20211130051352-37e0dac25981 // indirect github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.0 - github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211103022933-5ae005dac331 - github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae - go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 + github.com/tikv/client-go/v2 v2.0.0 + github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee + go.uber.org/goleak v1.1.12 ) replace github.com/tikv/client-go/v2 => ../ diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 6b9461057..4061bf66b 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -565,9 +565,9 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210819164333-bd5706b9d9f2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20211011060348-d957056f1551 h1:aRx2l2TAeYNPPUc+lk5dEFCXfUGxR/C2fbt/YA5nqiQ= -github.com/pingcap/kvproto v0.0.0-20211011060348-d957056f1551/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f h1:hjInxK1Ie6CYx7Jy2pYnBdEnWI8jIfr423l9Yh6LRy8= +github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= @@ -577,16 +577,17 @@ github.com/pingcap/log v0.0.0-20210906054005-afc726e70354/go.mod h1:DWQW5jICDR7U github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 h1:7rvAtZe/ZUzOKzgriNPQoBNvleJXBk4z7L3Z47+tS98= github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY= -github.com/pingcap/tidb v1.1.0-beta.0.20211111035905-25d698c79730 h1:dFbUXxGYk0rx3R2dxhK6HZ93cj+tCAq66JjZ1GX5TgU= -github.com/pingcap/tidb v1.1.0-beta.0.20211111035905-25d698c79730/go.mod h1:Z5NlUEsyr1yVAtTaKNdOO7WgySTZuwnCP6kuCaUiRyc= +github.com/pingcap/tidb v1.1.0-beta.0.20211130051352-37e0dac25981 h1:qDBc/5rWnNMVjnrvNd7rHc+EwPryNUQ15uPEGaQgLX0= +github.com/pingcap/tidb v1.1.0-beta.0.20211130051352-37e0dac25981/go.mod h1:QV0oFMcfKZzSOK6Z5cKJdeKmYePKKZZo+sqzUvYkPOA= github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= +github.com/pingcap/tidb-dashboard v0.0.0-20211107164327-80363dfbe884/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible h1:c7+izmker91NkjkZ6FgTlmD4k1A5FLOAq+li6Ki2/GY= github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg= -github.com/pingcap/tidb/parser v0.0.0-20211111035905-25d698c79730 h1:wNgibpxmYT9nhnB6mwOpY+Jn29Pze49f5GXHz+Wv7ks= -github.com/pingcap/tidb/parser v0.0.0-20211111035905-25d698c79730/go.mod h1:MAa22tagoj7nv5b1NBcxPkc5CiUNhqj1wuSQnw4f9WE= -github.com/pingcap/tipb v0.0.0-20211105090418-71142a4d40e3 h1:xnp/Qkk5gELlB8TaY6oro0JNXMBXTafNVxU/vbrNU8I= -github.com/pingcap/tipb v0.0.0-20211105090418-71142a4d40e3/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tidb/parser v0.0.0-20211130051352-37e0dac25981 h1:cuJx6z+9q1wtMLRtA4ECaap/+N7az46GzZ5nIiwpa+Y= +github.com/pingcap/tidb/parser v0.0.0-20211130051352-37e0dac25981/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= +github.com/pingcap/tipb v0.0.0-20211116093845-e9b045a0bdf8 h1:Vu/6oq8EFNWgyXRHiclNzTKIu+YKHPCSI/Ba5oVrLtM= +github.com/pingcap/tipb v0.0.0-20211116093845-e9b045a0bdf8/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -694,8 +695,9 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae h1:PmnkhiOopgMZYDQ7Htj1kt/zwW4MEOUL+Dem6WLZISY= github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= +github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs= +github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI= github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZc= @@ -782,8 +784,9 @@ go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/fx v1.10.0/go.mod h1:vLRicqpG/qQEzno4SYU86iCwfT95EZza+Eba0ItuxqY= go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= @@ -817,6 +820,7 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rB golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1255,7 +1259,19 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= -modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= +modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254= +modernc.org/lex v1.0.0/go.mod h1:G6rxMTy3cH2iA0iXL/HRRv4Znu8MK4higxph/lE7ypk= +modernc.org/lexer v1.0.0/go.mod h1:F/Dld0YKYdZCLQ7bD0USbWL4YKCyTDRDHiDTOs0q0vk= +modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= +modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/parser v1.0.0/go.mod h1:H20AntYJ2cHHL6MHthJ8LZzXCdDCHMWt1KZXtIMjejA= +modernc.org/parser v1.0.2/go.mod h1:TXNq3HABP3HMaqLK7brD1fLA/LfN0KS6JxZn71QdDqs= +modernc.org/scanner v1.0.1/go.mod h1:OIzD2ZtjYk6yTuyqZr57FmifbM9fIH74SumloSsajuE= +modernc.org/sortutil v1.0.0/go.mod h1:1QO0q8IlIlmjBIwm6t/7sof874+xCfZouyqZMLIAtxM= +modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= +modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= +modernc.org/y v1.0.1/go.mod h1:Ho86I+LVHEI+LYXoUKlmOMAM1JTXOCfj8qi1T8PsClE= moul.io/zapgorm2 v1.1.0/go.mod h1:emRfKjNqSzVj5lcgasBdovIXY1jSOwFz2GQZn1Rddks= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index 55d70224c..2317e6cf4 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -78,7 +78,7 @@ func (s *testLockSuite) TearDownTest() { s.store.Close() } -func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { +func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, ttl uint64, commitPrimary bool, asyncCommit bool) (uint64, uint64) { txn, err := s.store.Begin() s.Nil(err) if len(value) > 0 { @@ -97,6 +97,10 @@ func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, com tpc, err := txn.NewCommitter(0) s.Nil(err) tpc.SetPrimaryKey(primaryKey) + tpc.SetLockTTL(ttl) + if asyncCommit { + tpc.SetUseAsyncCommit() + } ctx := context.Background() err = tpc.PrewriteAllMutations(ctx) @@ -130,11 +134,11 @@ func (s *testLockSuite) putKV(key, value []byte) (uint64, uint64) { func (s *testLockSuite) prepareAlphabetLocks() { s.putKV([]byte("c"), []byte("cc")) - s.lockKey([]byte("c"), []byte("c"), []byte("z1"), []byte("z1"), true) - s.lockKey([]byte("d"), []byte("dd"), []byte("z2"), []byte("z2"), false) - s.lockKey([]byte("foo"), []byte("foo"), []byte("z3"), []byte("z3"), false) + s.lockKey([]byte("c"), []byte("c"), []byte("z1"), []byte("z1"), 3000, true, false) + s.lockKey([]byte("d"), []byte("dd"), []byte("z2"), []byte("z2"), 3000, false, false) + s.lockKey([]byte("foo"), []byte("foo"), []byte("z3"), []byte("z3"), 3000, false, false) s.putKV([]byte("bar"), []byte("bar")) - s.lockKey([]byte("bar"), nil, []byte("z4"), []byte("z4"), true) + s.lockKey([]byte("bar"), nil, []byte("z4"), []byte("z4"), 3000, true, false) } func (s *testLockSuite) TestScanLockResolveWithGet() { @@ -205,7 +209,7 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet() { func (s *testLockSuite) TestCleanLock() { for ch := byte('a'); ch <= byte('z'); ch++ { k := []byte{ch} - s.lockKey(k, k, k, k, false) + s.lockKey(k, k, k, k, 3000, false, false) } txn, err := s.store.Begin() s.Nil(err) @@ -224,13 +228,13 @@ func (s *testLockSuite) TestGetTxnStatus() { s.True(status.IsCommitted()) s.Equal(status.CommitTS(), commitTS) - startTS, commitTS = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), true) + startTS, commitTS = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), 3000, true, false) status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a")) s.Nil(err) s.True(status.IsCommitted()) s.Equal(status.CommitTS(), commitTS) - startTS, _ = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), false) + startTS, _ = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), 3000, false, false) status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a")) s.Nil(err) s.False(status.IsCommitted()) @@ -257,7 +261,8 @@ func (s *testLockSuite) TestCheckTxnStatusTTL() { // Rollback the txn. lock := s.mustGetLock([]byte("key")) - err = s.store.NewLockResolver().ResolveLock(context.Background(), lock) + + err = s.store.NewLockResolver().ForceResolveLock(context.Background(), lock) s.Nil(err) // Check its status is rollbacked. @@ -290,7 +295,7 @@ func (s *testLockSuite) TestTxnHeartBeat() { s.Equal(newTTL, uint64(6666)) lock := s.mustGetLock([]byte("key")) - err = s.store.NewLockResolver().ResolveLock(context.Background(), lock) + err = s.store.NewLockResolver().ForceResolveLock(context.Background(), lock) s.Nil(err) newTTL, err = s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 6666) @@ -322,13 +327,13 @@ func (s *testLockSuite) TestCheckTxnStatus() { // Test the ResolveLocks API lock := s.mustGetLock([]byte("second")) - timeBeforeExpire, _, err := resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock}) + timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock}) s.Nil(err) s.True(timeBeforeExpire > int64(0)) // Force rollback the lock using lock.TTL = 0. lock.TTL = uint64(0) - timeBeforeExpire, _, err = resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock}) + timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock}) s.Nil(err) s.Equal(timeBeforeExpire, int64(0)) @@ -572,19 +577,19 @@ func (s *testLockSuite) TestZeroMinCommitTS() { s.Nil(failpoint.Disable("tikvclient/mockZeroCommitTS")) lock := s.mustGetLock([]byte("key")) - expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock}) + expire, pushed, _, err := s.store.NewLockResolver().ResolveLocksForRead(bo, 0, []*txnkv.Lock{lock}, true) s.Nil(err) s.Len(pushed, 0) s.Greater(expire, int64(0)) - expire, pushed, err = s.store.NewLockResolver().ResolveLocks(bo, math.MaxUint64, []*txnkv.Lock{lock}) + expire, pushed, _, err = s.store.NewLockResolver().ResolveLocksForRead(bo, math.MaxUint64, []*txnkv.Lock{lock}, true) s.Nil(err) s.Len(pushed, 1) - s.Greater(expire, int64(0)) + s.Equal(expire, int64(0)) // Clean up this test. lock.TTL = uint64(0) - expire, _, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock}) + expire, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock}) s.Nil(err) s.Equal(expire, int64(0)) } @@ -641,10 +646,9 @@ func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit() { lock := s.mustGetLock([]byte("fb1")) s.True(lock.UseAsyncCommit) bo := tikv.NewBackoffer(context.Background(), getMaxBackoff) - expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock}) + expire, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock}) s.Nil(err) s.Equal(expire, int64(0)) - s.Equal(len(pushed), 0) t3, err := s.store.Begin() s.Nil(err) @@ -893,3 +897,76 @@ func (s *testLockSuite) TestPrewriteEncountersLargerTsLock() { s.Nil(failpoint.Disable("tikvclient/prewritePrimary")) <-ch } + +func (s *testLockSuite) TestResolveLocksForRead() { + ctx := context.Background() + var resolvedLocks, committedLocks []uint64 + var locks []*txnlock.Lock + + // commitTS < readStartTS + startTS, _ := s.lockKey([]byte("k1"), []byte("v1"), []byte("k11"), []byte("v11"), 3000, true, false) + committedLocks = append(committedLocks, startTS) + lock := s.mustGetLock([]byte("k1")) + locks = append(locks, lock) + + // rolled back + startTS, _ = s.lockKey([]byte("k2"), []byte("v2"), []byte("k22"), []byte("v22"), 3000, false, false) + lock = s.mustGetLock([]byte("k22")) + err := s.store.NewLockResolver().ForceResolveLock(ctx, lock) + s.Nil(err) + resolvedLocks = append(resolvedLocks, startTS) + lock = s.mustGetLock([]byte("k2")) + locks = append(locks, lock) + + // pushed + startTS, _ = s.lockKey([]byte("k3"), []byte("v3"), []byte("k33"), []byte("v33"), 3000, false, false) + resolvedLocks = append(resolvedLocks, startTS) + lock = s.mustGetLock([]byte("k3")) + locks = append(locks, lock) + + // can't be pushed and isn't expired + _, _ = s.lockKey([]byte("k4"), []byte("v4"), []byte("k44"), []byte("v44"), 3000, false, true) + lock = s.mustGetLock([]byte("k4")) + locks = append(locks, lock) + + // can't be pushed but is expired + startTS, _ = s.lockKey([]byte("k5"), []byte("v5"), []byte("k55"), []byte("v55"), 0, false, true) + committedLocks = append(committedLocks, startTS) + lock = s.mustGetLock([]byte("k5")) + locks = append(locks, lock) + + // commitTS > readStartTS + var readStartTS uint64 + { + t, err := s.store.Begin() + s.Nil(err) + resolvedLocks = append(resolvedLocks, t.StartTS()) + s.Nil(t.Set([]byte("k6"), []byte("v6"))) + s.Nil(t.Set([]byte("k66"), []byte("v66"))) + committer, err := t.NewCommitter(1) + s.Nil(err) + s.Nil(committer.PrewriteAllMutations(ctx)) + committer.SetPrimaryKey([]byte("k66")) + + readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + s.Nil(err) + + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + s.Nil(err) + s.Greater(commitTS, readStartTS) + committer.SetCommitTS(commitTS) + err = committer.CommitMutations(ctx) + s.Nil(err) + lock = s.mustGetLock([]byte("k6")) + locks = append(locks, lock) + } + + bo := tikv.NewBackoffer(context.Background(), getMaxBackoff) + lr := s.store.NewLockResolver() + defer lr.Close() + msBeforeExpired, resolved, committed, err := lr.ResolveLocksForRead(bo, readStartTS, locks, false) + s.Nil(err) + s.Greater(msBeforeExpired, int64(0)) + s.Equal(resolvedLocks, resolved) + s.Equal(committedLocks, committed) +} diff --git a/integration_tests/snapshot_fail_test.go b/integration_tests/snapshot_fail_test.go index 62f8e01e3..fa6745dcb 100644 --- a/integration_tests/snapshot_fail_test.go +++ b/integration_tests/snapshot_fail_test.go @@ -41,10 +41,15 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/txnkv" + "github.com/tikv/client-go/v2/txnkv/txnlock" ) func TestSnapshotFail(t *testing.T) { @@ -304,3 +309,85 @@ func (s *testSnapshotFailSuite) TestResetSnapshotTS() { s.Nil(err) s.Equal(val, []byte("y1")) } + +func (s *testSnapshotFailSuite) getLock(key []byte) *txnkv.Lock { + ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + s.Nil(err) + bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: key, + Version: ver, + }) + loc, err := s.store.GetRegionCache().LocateKey(bo, key) + s.Nil(err) + resp, err := s.store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutShort) + s.Nil(err) + s.NotNil(resp.Resp) + keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() + if keyErr == nil { + return nil + } + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) + if err != nil { + return nil + } + return lock +} + +func (s *testSnapshotFailSuite) TestSnapshotUseResolveForRead() { + s.Nil(failpoint.Enable("tikvclient/resolveLock", "sleep(500)")) + s.Nil(failpoint.Enable("tikvclient/resolveAsyncResolveData", "sleep(500)")) + defer func() { + s.Nil(failpoint.Disable("tikvclient/resolveAsyncResolveData")) + s.Nil(failpoint.Disable("tikvclient/resolveLock")) + }() + + for _, asyncCommit := range []bool{false, true} { + x := []byte("x_key_TestSnapshotUseResolveForRead") + y := []byte("y_key_TestSnapshotUseResolveForRead") + txn, err := s.store.Begin() + s.Nil(err) + s.Nil(txn.Set(x, []byte("x"))) + s.Nil(txn.Set(y, []byte("y"))) + txn.SetEnableAsyncCommit(asyncCommit) + ctx := context.Background() + committer, err := txn.NewCommitter(1) + s.Nil(err) + committer.SetLockTTL(3000) + s.Nil(committer.PrewriteAllMutations(ctx)) + committer.SetCommitTS(committer.GetStartTS() + 1) + committer.CommitMutations(ctx) + s.Equal(committer.GetPrimaryKey(), x) + s.NotNil(s.getLock(y)) + + txn, err = s.store.Begin() + s.Nil(err) + snapshot := txn.GetSnapshot() + start := time.Now() + val, err := snapshot.Get(ctx, y) + s.Nil(err) + s.Equal([]byte("y"), val) + s.Less(time.Since(start), 200*time.Millisecond) + s.NotNil(s.getLock(y)) + + txn, err = s.store.Begin() + s.Nil(err) + snapshot = txn.GetSnapshot() + start = time.Now() + res, err := snapshot.BatchGet(ctx, [][]byte{y}) + s.Nil(err) + s.Equal([]byte("y"), res[string(y)]) + s.Less(time.Since(start), 200*time.Millisecond) + s.NotNil(s.getLock(y)) + + var lock *txnkv.Lock + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + lock = s.getLock(y) + if lock == nil { + break + } + } + s.Nil(lock, "failed to resolve lock timely") + } +} diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index dddca14d7..ef16099e8 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -316,7 +316,7 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { a.pendingRequests.Observe(float64(len(a.batchCommandsCh))) a.batchSize.Observe(float64(a.reqBuilder.len())) - // curl -X PUT -d 'return(true)' http://0.0.0.0:10080/fail/github.com/tikv/client-go/v2/mockBlockOnBatchClient + // curl -X PUT -d 'return(true)' http://0.0.0.0:10080/fail/tikvclient/mockBlockOnBatchClient if val, err := util.EvalFailpoint("mockBlockOnBatchClient"); err == nil { if val.(bool) { time.Sleep(1 * time.Hour) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 25904c4a7..10f259133 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -898,32 +898,34 @@ func (s *RegionRequestSender) SendReqCtx( } if val, err := util.EvalFailpoint("tikvStoreSendReqResult"); err == nil { - switch val.(string) { - case "timeout": - return nil, nil, errors.New("timeout") - case "GCNotLeader": - if req.Type == tikvrpc.CmdGC { - return &tikvrpc.Response{ - Resp: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}, - }, nil, nil - } - case "GCServerIsBusy": - if req.Type == tikvrpc.CmdGC { + if s, ok := val.(string); ok { + switch s { + case "timeout": + return nil, nil, errors.New("timeout") + case "GCNotLeader": + if req.Type == tikvrpc.CmdGC { + return &tikvrpc.Response{ + Resp: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}, + }, nil, nil + } + case "GCServerIsBusy": + if req.Type == tikvrpc.CmdGC { + return &tikvrpc.Response{ + Resp: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}}, + }, nil, nil + } + case "busy": return &tikvrpc.Response{ Resp: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}}, }, nil, nil - } - case "busy": - return &tikvrpc.Response{ - Resp: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}}, - }, nil, nil - case "requestTiDBStoreError": - if et == tikvrpc.TiDB { - return nil, nil, errors.WithStack(tikverr.ErrTiKVServerTimeout) - } - case "requestTiFlashError": - if et == tikvrpc.TiFlash { - return nil, nil, errors.WithStack(tikverr.ErrTiFlashServerTimeout) + case "requestTiDBStoreError": + if et == tikvrpc.TiDB { + return nil, nil, errors.WithStack(tikverr.ErrTiKVServerTimeout) + } + case "requestTiFlashError": + if et == tikvrpc.TiFlash { + return nil, nil, errors.WithStack(tikverr.ErrTiFlashServerTimeout) + } } } } diff --git a/tikv/kv.go b/tikv/kv.go index aa7eae3aa..7af981a9d 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -330,6 +330,7 @@ func (s *KVStore) Close() error { s.oracle.Close() s.pdClient.Close() + s.lockResolver.Close() if err := s.GetTiKVClient().Close(); err != nil { return err diff --git a/tikv/test_probe.go b/tikv/test_probe.go index 71e463457..44a06763b 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -120,6 +120,15 @@ func NewLockResolverProb(r *txnlock.LockResolver) *LockResolverProbe { return &LockResolverProbe{&resolver} } +// ForceResolveLock forces to resolve a single lock. It's a helper function only for writing test. +func (l LockResolverProbe) ForceResolveLock(ctx context.Context, lock *txnlock.Lock) error { + bo := retry.NewBackofferWithVars(ctx, transaction.ConfigProbe{}.GetPessimisticLockMaxBackoff(), nil) + // make use of forcing resolving lock + lock.TTL = 0 + _, err := l.LockResolverProbe.ResolveLocks(bo, 0, []*txnlock.Lock{lock}) + return err +} + // ResolveLock resolves single lock. func (l LockResolverProbe) ResolveLock(ctx context.Context, lock *txnlock.Lock) error { bo := retry.NewBackofferWithVars(ctx, transaction.ConfigProbe{}.GetPessimisticLockMaxBackoff(), nil) diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 683a1a455..e904c3220 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -212,7 +212,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // Because we already waited on tikv, no need to Backoff here. // tikv default will wait 3s(also the maximum wait value) when lock error occurs startTime = time.Now() - msBeforeTxnExpired, _, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks) + msBeforeTxnExpired, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks) if err != nil { return err } diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index f30c58a21..1092c2814 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -345,7 +345,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B locks = append(locks, lock) } start := time.Now() - msBeforeExpired, err := c.store.GetLockResolver().ResolveLocksForWrite(bo, c.startTS, c.forUpdateTS, locks) + msBeforeExpired, err := c.store.GetLockResolver().ResolveLocks(bo, c.startTS, locks) if err != nil { return err } diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 1c944d322..166da49bb 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -42,6 +42,11 @@ import ( // ResolvedCacheSize is max number of cached txn status. const ResolvedCacheSize = 2048 +const ( + getTxnStatusMaxBackoff = 20000 + asyncResolveLockMaxBackoff = 40000 +) + type storage interface { // GetRegionCache gets the RegionCache. GetRegionCache() *locate.RegionCache @@ -64,6 +69,11 @@ type LockResolver struct { testingKnobs struct { meetLock func(locks []*Lock) } + + // LockResolver may have some goroutines resolving locks in the background. + // The Cancel function is to cancel these goroutines for passing goleak test. + asyncResolveCtx context.Context + asyncResolveCancel func() } // NewLockResolver creates a new LockResolver instance. @@ -74,9 +84,15 @@ func NewLockResolver(store storage) *LockResolver { } r.mu.resolved = make(map[uint64]TxnStatus) r.mu.recentResolved = list.New() + r.asyncResolveCtx, r.asyncResolveCancel = context.WithCancel(context.Background()) return r } +// Close cancels all background goroutines. +func (lr *LockResolver) Close() { + lr.asyncResolveCancel() +} + // TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback. type TxnStatus struct { ttl uint64 @@ -88,6 +104,9 @@ type TxnStatus struct { // IsCommitted returns true if the txn's final status is Commit. func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 } +// IsRolledBack returns true if the txn's final status is rolled back. +func (s TxnStatus) IsRolledBack() bool { return s.ttl == 0 && s.commitTS == 0 } + // CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true. func (s TxnStatus) CommitTS() uint64 { return s.commitTS } @@ -291,120 +310,119 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, []uint64 /*pushed*/, error) { - return lr.resolveLocks(bo, callerStartTS, locks, false, false) +func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) { + ttl, _, _, err := lr.resolveLocks(bo, callerStartTS, locks, false, false) + return ttl, err } -// ResolveLocksLite resolves locks while preventing scan whole region. -func (lr *LockResolver) ResolveLocksLite(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, []uint64 /*pushed*/, error) { - return lr.resolveLocks(bo, callerStartTS, locks, false, true) +// ResolveLocksForRead is essentially the same as ResolveLocks, except with some optimizations for read. +// Read operations needn't wait for resolve secondary locks and can read through(the lock's transaction is committed +// and its commitTS is less than or equal to callerStartTS) or ignore(the lock's transaction is rolled back or its minCommitTS is pushed) the lock . +func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) { + return lr.resolveLocks(bo, callerStartTS, locks, true, lite) } -func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forWrite bool, lite bool) (int64, []uint64 /*pushed*/, error) { +func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) { if lr.testingKnobs.meetLock != nil { lr.testingKnobs.meetLock(locks) } var msBeforeTxnExpired txnExpireTime if len(locks) == 0 { - return msBeforeTxnExpired.value(), nil, nil - } - - if forWrite { - metrics.LockResolverCountWithResolveForWrite.Inc() - } else { - metrics.LockResolverCountWithResolve.Inc() + return msBeforeTxnExpired.value(), nil, nil, nil } + metrics.LockResolverCountWithResolve.Inc() - var pushFail bool // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[locate.RegionVerID]struct{}) - var pushed []uint64 - // pushed is only used in the read operation. - if !forWrite { - pushed = make([]uint64, 0, len(locks)) - } - - var resolve func(*Lock, bool) error - resolve = func(l *Lock, forceSyncCommit bool) error { + var resolve func(*Lock, bool) (TxnStatus, error) + resolve = func(l *Lock, forceSyncCommit bool) (TxnStatus, error) { status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit) if err != nil { - return err + return TxnStatus{}, err + } + if status.ttl != 0 { + return status, nil } - if status.ttl == 0 { - metrics.LockResolverCountWithExpired.Inc() - // If the lock is committed or rollbacked, resolve lock. - cleanRegions, exists := cleanTxns[l.TxnID] - if !exists { - cleanRegions = make(map[locate.RegionVerID]struct{}) - cleanTxns[l.TxnID] = cleanRegions - } - - if status.primaryLock != nil && !forceSyncCommit && status.primaryLock.UseAsyncCommit && !exists { - err = lr.resolveLockAsync(bo, l, status) - if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok { - err = resolve(l, true) - } - } else if l.LockType == kvrpcpb.Op_PessimisticLock { - err = lr.resolvePessimisticLock(bo, l, cleanRegions) - } else { - err = lr.resolveLock(bo, l, status, lite, cleanRegions) + // If the lock is committed or rollbacked, resolve lock. + metrics.LockResolverCountWithExpired.Inc() + cleanRegions, exists := cleanTxns[l.TxnID] + if !exists { + cleanRegions = make(map[locate.RegionVerID]struct{}) + cleanTxns[l.TxnID] = cleanRegions + } + if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit { + // resolveAsyncCommitLock will resolve all locks of the transaction, so we needn't resolve + // it again if it has been resolved once. + if exists { + return status, nil } - if err != nil { - return err + // status of async-commit transaction is determined by resolveAsyncCommitLock. + status, err = lr.resolveAsyncCommitLock(bo, l, status, forRead) + if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok { + status, err = resolve(l, true) } + return status, err + } + if l.LockType == kvrpcpb.Op_PessimisticLock { + // pessimistic locks don't block read so it needn't be async. + err = lr.resolvePessimisticLock(bo, l) } else { - metrics.LockResolverCountWithNotExpired.Inc() - // If the lock is valid, the txn may be a pessimistic transaction. - // Update the txn expire time. - msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - msBeforeTxnExpired.update(msBeforeLockExpired) - if forWrite { - // Write conflict detected! - // If it's a optimistic conflict and current txn is earlier than the lock owner, - // abort current transaction. - // This could avoids the deadlock scene of two large transaction. - if l.LockType != kvrpcpb.Op_PessimisticLock && l.TxnID > callerStartTS { - metrics.LockResolverCountWithWriteConflict.Inc() - return tikverr.NewErrWriteConfictWithArgs(callerStartTS, l.TxnID, status.commitTS, l.Key) - } + if forRead { + asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff) + go func() { + // Pass an empty cleanRegions here to avoid data race and + // let `reqCollapse` deduplicate identical resolve requests. + err := lr.resolveLock(asyncBo, l, status, lite, map[locate.RegionVerID]struct{}{}) + if err != nil { + logutil.BgLogger().Info("failed to resolve lock asynchronously", + zap.String("lock", l.String()), zap.Uint64("commitTS", status.CommitTS()), zap.Error(err)) + } + }() } else { - if status.action != kvrpcpb.Action_MinCommitTSPushed { - pushFail = true - return nil - } - pushed = append(pushed, l.TxnID) + err = lr.resolveLock(bo, l, status, lite, cleanRegions) } } - return nil + return status, err } + var canIgnore, canAccess []uint64 for _, l := range locks { - err := resolve(l, false) + status, err := resolve(l, false) if err != nil { msBeforeTxnExpired.update(0) - return msBeforeTxnExpired.value(), nil, err + return msBeforeTxnExpired.value(), nil, nil, err + } + if !forRead { + if status.ttl != 0 { + metrics.LockResolverCountWithNotExpired.Inc() + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeTxnExpired.update(msBeforeLockExpired) + continue + } + } + if status.action == kvrpcpb.Action_MinCommitTSPushed || status.IsRolledBack() || + (status.IsCommitted() && status.CommitTS() > callerStartTS) { + if canIgnore == nil { + canIgnore = make([]uint64, 0, len(locks)) + } + canIgnore = append(canIgnore, l.TxnID) + } else if status.IsCommitted() && status.CommitTS() <= callerStartTS { + if canAccess == nil { + canAccess = make([]uint64, 0, len(locks)) + } + canAccess = append(canAccess, l.TxnID) + } else { + metrics.LockResolverCountWithNotExpired.Inc() + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeTxnExpired.update(msBeforeLockExpired) } } - if pushFail { - // If any of the lock fails to push minCommitTS, don't return the pushed array. - pushed = nil - } - - if msBeforeTxnExpired.value() > 0 && len(pushed) == 0 { - // If len(pushed) > 0, the caller will not block on the locks, it push the minCommitTS instead. + if msBeforeTxnExpired.value() > 0 { metrics.LockResolverCountWithWaitExpired.Inc() } - return msBeforeTxnExpired.value(), pushed, nil -} - -// ResolveLocksForWrite resolves lock for write -func (lr *LockResolver) ResolveLocksForWrite(bo *retry.Backoffer, callerStartTS, callerForUpdateTS uint64, locks []*Lock) (int64, error) { - // The forWrite parameter is only useful for optimistic transactions which can avoid deadlock between large transactions, - // so only use forWrite if the callerForUpdateTS is zero. - msBeforeTxnExpired, _, err := lr.resolveLocks(bo, callerStartTS, locks, callerForUpdateTS == 0, false) - return msBeforeTxnExpired, err + return msBeforeTxnExpired.value(), canIgnore, canAccess, nil } type txnExpireTime struct { @@ -433,8 +451,6 @@ func (t *txnExpireTime) value() int64 { return t.txnExpire } -const getTxnStatusMaxBackoff = 20000 - // GetTxnStatus queries tikv-server for a txn's status (commit/rollback). // If the primary key is still locked, it will launch a Rollback to abort it. // To avoid unnecessarily aborting too many txns, it is wiser to wait a few @@ -486,14 +502,6 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle return TxnStatus{ttl: l.TTL, action: kvrpcpb.Action_NoAction}, nil } - // Handle txnNotFound error. - // getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't. - // This is likely to happen in the concurrently prewrite when secondary regions - // success before the primary region. - if err := bo.Backoff(retry.BoTxnNotFound, err); err != nil { - logutil.Logger(bo.GetCtx()).Warn("getTxnStatusFromLock backoff fail", zap.Error(err)) - } - if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) <= 0 { logutil.Logger(bo.GetCtx()).Warn("lock txn not found, lock has expired", zap.Uint64("CallerStartTs", callerStartTS), @@ -508,10 +516,22 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle // The Action_LockNotExistDoNothing will be returned as the status. rollbackIfNotExist = true } else { + // For the Rollback statement from user, the pessimistic locks will be rollbacked and the primary key in store + // has no related information. There are possibilities that some other transactions do checkTxnStatus on these + // locks and they will be blocked ttl time, so let the transaction retries to do pessimistic lock if txn not found + // and the lock does not expire yet. if l.LockType == kvrpcpb.Op_PessimisticLock { return TxnStatus{ttl: l.TTL}, nil } } + + // Handle txnNotFound error. + // getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't. + // This is likely to happen in the concurrently prewrite when secondary regions + // success before the primary region. + if err := bo.Backoff(retry.BoTxnNotFound, err); err != nil { + logutil.Logger(bo.GetCtx()).Warn("getTxnStatusFromLock backoff fail", zap.Error(err)) + } } } @@ -734,25 +754,14 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK return shared.addKeys(checkResp.Locks, len(curKeys), txnID, checkResp.CommitTs) } -// resolveLockAsync resolves l assuming it was locked using the async commit protocol. -func (lr *LockResolver) resolveLockAsync(bo *retry.Backoffer, l *Lock, status TxnStatus) error { - metrics.LockResolverCountWithResolveAsync.Inc() +// resolveAsyncResolveData resolves all locks in an async-commit transaction according to the status. +func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, data *asyncResolveData) error { + util.EvalFailpoint("resolveAsyncResolveData") - resolveData, err := lr.checkAllSecondaries(bo, l, &status) + keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, data.keys, nil) if err != nil { return err } - - status.commitTS = resolveData.commitTs - - resolveData.keys = append(resolveData.keys, l.Primary) - keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, resolveData.keys, nil) - if err != nil { - return err - } - - logutil.BgLogger().Info("resolve async commit", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS)) - errChan := make(chan error, len(keysByRegion)) // Resolve every lock in the transaction. for region, locks := range keysByRegion { @@ -781,6 +790,37 @@ func (lr *LockResolver) resolveLockAsync(bo *retry.Backoffer, l *Lock, status Tx return nil } +// resolveLockAsync resolves l assuming it was locked using the async commit protocol. +func (lr *LockResolver) resolveAsyncCommitLock(bo *retry.Backoffer, l *Lock, status TxnStatus, asyncResolveAll bool) (TxnStatus, error) { + metrics.LockResolverCountWithResolveAsync.Inc() + + resolveData, err := lr.checkAllSecondaries(bo, l, &status) + if err != nil { + return TxnStatus{}, err + } + resolveData.keys = append(resolveData.keys, l.Primary) + + status.commitTS = resolveData.commitTs + if status.StatusCacheable() { + lr.saveResolved(l.TxnID, status) + } + + logutil.BgLogger().Info("resolve async commit", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS)) + if asyncResolveAll { + asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff) + go func() { + err := lr.resolveAsyncResolveData(asyncBo, l, status, resolveData) + if err != nil { + logutil.BgLogger().Info("failed to resolve async-commit locks asynchronously", + zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.CommitTS()), zap.Error(err)) + } + }() + } else { + err = lr.resolveAsyncResolveData(bo, l, status, resolveData) + } + return status, err +} + // checkAllSecondaries checks the secondary locks of an async commit transaction to find out the final // status of the transaction func (lr *LockResolver) checkAllSecondaries(bo *retry.Backoffer, l *Lock, status *TxnStatus) (*asyncResolveData, error) { @@ -872,8 +912,14 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region } func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}) error { + util.EvalFailpoint("resolveLock") + metrics.LockResolverCountWithResolveLocks.Inc() resolveLite := lite || l.TxnSize < lr.resolveLockLiteThreshold + // The lock has been resolved by getTxnStatusFromLock. + if resolveLite && bytes.Equal(l.Key, l.Primary) { + return nil + } for { loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key) if err != nil { @@ -930,16 +976,17 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat } } -func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock, cleanRegions map[locate.RegionVerID]struct{}) error { +func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error { metrics.LockResolverCountWithResolveLocks.Inc() + // The lock has been resolved by getTxnStatusFromLock. + if bytes.Equal(l.Key, l.Primary) { + return nil + } for { loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key) if err != nil { return err } - if _, ok := cleanRegions[loc.Region]; ok { - return nil - } forUpdateTS := l.LockForUpdateTS if forUpdateTS == 0 { forUpdateTS = math.MaxUint64 diff --git a/txnkv/txnlock/test_probe.go b/txnkv/txnlock/test_probe.go index 6dbd8b800..127c32684 100644 --- a/txnkv/txnlock/test_probe.go +++ b/txnkv/txnlock/test_probe.go @@ -45,9 +45,10 @@ type LockResolverProbe struct { *LockResolver } -// ResolveLockAsync tries to resolve a lock using the txn states. -func (l LockResolverProbe) ResolveLockAsync(bo *retry.Backoffer, lock *Lock, status TxnStatus) error { - return l.resolveLockAsync(bo, lock, status) +// ResolveAsyncCommitLock tries to resolve a lock using the txn states. +func (l LockResolverProbe) ResolveAsyncCommitLock(bo *retry.Backoffer, lock *Lock, status TxnStatus) error { + _, err := l.resolveAsyncCommitLock(bo, lock, status, false) + return err } // ResolveLock resolves single lock. @@ -57,7 +58,7 @@ func (l LockResolverProbe) ResolveLock(bo *retry.Backoffer, lock *Lock) error { // ResolvePessimisticLock resolves single pessimistic lock. func (l LockResolverProbe) ResolvePessimisticLock(bo *retry.Backoffer, lock *Lock) error { - return l.resolvePessimisticLock(bo, lock, make(map[locate.RegionVerID]struct{})) + return l.resolvePessimisticLock(bo, lock) } // GetTxnStatus sends the CheckTxnStatus request to the TiKV server. diff --git a/txnkv/txnsnapshot/client_helper.go b/txnkv/txnsnapshot/client_helper.go index 86df0e451..a755dc798 100644 --- a/txnkv/txnsnapshot/client_helper.go +++ b/txnkv/txnsnapshot/client_helper.go @@ -52,47 +52,47 @@ import ( // call ResolveLock, and if the lock belongs to a large transaction, we may retry // the request. If there is no context information about the resolved locks, we'll // meet the secondary lock again and run into a deadloop. +// +// ClientHelper is only used for read operations because these optimizations don't apply +// to write operations. type ClientHelper struct { - lockResolver *txnlock.LockResolver - regionCache *locate.RegionCache - resolvedLocks *util.TSSet - client client.Client - resolveLite bool + lockResolver *txnlock.LockResolver + regionCache *locate.RegionCache + resolvedLocks *util.TSSet + committedLocks *util.TSSet + client client.Client + resolveLite bool locate.RegionRequestRuntimeStats } // NewClientHelper creates a helper instance. -func NewClientHelper(store kvstore, resolvedLocks *util.TSSet, resolveLite bool) *ClientHelper { +func NewClientHelper(store kvstore, resolvedLocks *util.TSSet, committedLocks *util.TSSet, resolveLite bool) *ClientHelper { return &ClientHelper{ - lockResolver: store.GetLockResolver(), - regionCache: store.GetRegionCache(), - resolvedLocks: resolvedLocks, - client: store.GetTiKVClient(), - resolveLite: resolveLite, + lockResolver: store.GetLockResolver(), + regionCache: store.GetRegionCache(), + resolvedLocks: resolvedLocks, + committedLocks: committedLocks, + client: store.GetTiKVClient(), + resolveLite: resolveLite, } } // ResolveLocks wraps the ResolveLocks function and store the resolved result. func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) { - var err error - var resolvedLocks []uint64 - var msBeforeTxnExpired int64 if ch.Stats != nil { defer func(start time.Time) { locate.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } - if ch.resolveLite { - msBeforeTxnExpired, resolvedLocks, err = ch.lockResolver.ResolveLocksLite(bo, callerStartTS, locks) - } else { - msBeforeTxnExpired, resolvedLocks, err = ch.lockResolver.ResolveLocks(bo, callerStartTS, locks) - } + msBeforeTxnExpired, resolvedLocks, committedLocks, err := ch.lockResolver.ResolveLocksForRead(bo, callerStartTS, locks, ch.resolveLite) if err != nil { return msBeforeTxnExpired, err } if len(resolvedLocks) > 0 { ch.resolvedLocks.Put(resolvedLocks...) - return 0, nil + } + if len(committedLocks) > 0 { + ch.committedLocks.Put(committedLocks...) } return msBeforeTxnExpired, nil } @@ -105,6 +105,7 @@ func (ch *ClientHelper) SendReqCtx(bo *retry.Backoffer, req *tikvrpc.Request, re } sender.Stats = ch.Stats req.Context.ResolvedLocks = ch.resolvedLocks.GetAll() + req.Context.CommittedLocks = ch.committedLocks.GetAll() resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, et, opts...) return resp, ctx, sender.GetStoreAddr(), err } diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 354ca8195..a968a1bfd 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -283,7 +283,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { if err != nil { return err } - msBeforeExpired, _, err := txnlock.NewLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version, []*txnlock.Lock{lock}) + msBeforeExpired, err := txnlock.NewLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version, []*txnlock.Lock{lock}) if err != nil { return err } diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index ef5b68421..09924a6c2 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -44,7 +44,6 @@ import ( "time" "github.com/opentracing/opentracing-go" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" @@ -109,6 +108,7 @@ type KVSnapshot struct { vars *kv.Variables replicaReadSeed uint32 resolvedLocks util.TSSet + committedLocks util.TSSet scanBatchSize int // Cache the result of BatchGet. @@ -333,7 +333,7 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, c } func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, collectF func(k, v []byte)) error { - cli := NewClientHelper(s.store, &s.resolvedLocks, false) + cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, false) s.mu.RLock() if s.mu.stats != nil { cli.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats) @@ -500,13 +500,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] defer span1.Finish() opentracing.ContextWithSpan(ctx, span1) } - failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { + if _, err := util.EvalFailpoint("snapshot-get-cache-fail"); err == nil { if bo.GetCtx().Value("TestSnapshotCache") != nil { panic("cache miss") } - }) + } - cli := NewClientHelper(s.store, &s.resolvedLocks, true) + cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, true) s.mu.RLock() if s.mu.stats != nil {