From 2f119351bd5ce72b23f56c674b662d75741b8138 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Fri, 14 Jul 2023 10:36:07 +0800 Subject: [PATCH 1/4] *: add `SnapshotIterReverse` and make `iterReverse` supports `lowerBound` (#883) Signed-off-by: Jason Mo --- integration_tests/go.mod | 20 +++++----- integration_tests/go.sum | 52 +++++++++++++------------ internal/unionstore/memdb_iterator.go | 7 ++-- internal/unionstore/memdb_snapshot.go | 29 ++++++++++++-- internal/unionstore/memdb_test.go | 38 +++++++++++++----- internal/unionstore/mock.go | 4 +- internal/unionstore/union_store.go | 10 ++--- internal/unionstore/union_store_test.go | 16 ++++++-- rawkv/rawkv.go | 3 +- txnkv/transaction/txn.go | 4 +- txnkv/txnsnapshot/scan.go | 2 +- txnkv/txnsnapshot/snapshot.go | 4 +- 12 files changed, 123 insertions(+), 66 deletions(-) diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 261d55404..7c379e3ea 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,12 +6,12 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20230530111525-e4919c190b46 + github.com/pingcap/kvproto v0.0.0-20230703085931-3788ab4ee6b3 github.com/pingcap/tidb v1.1.0-beta.0.20230619015310-8b1006f1af04 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 github.com/tidwall/gjson v1.14.1 - github.com/tikv/client-go/v2 v2.0.8-0.20230615161845-b32f340d0609 + github.com/tikv/client-go/v2 v2.0.8-0.20230707070242-178f6fa01aab github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935 go.uber.org/goleak v1.2.1 ) @@ -67,7 +67,7 @@ require ( github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect github.com/pingcap/tidb/parser v0.0.0-20230619015310-8b1006f1af04 // indirect - github.com/pingcap/tipb v0.0.0-20230602100112-acb7942db1ca // indirect + github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_golang v1.15.1 // indirect @@ -95,14 +95,14 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/crypto v0.9.0 // indirect + golang.org/x/crypto v0.10.0 // indirect golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sync v0.2.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/net v0.11.0 // indirect + golang.org/x/sync v0.3.0 // indirect + golang.org/x/sys v0.9.0 // indirect + golang.org/x/text v0.11.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.9.3 // indirect + golang.org/x/tools v0.10.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.54.0 // indirect google.golang.org/protobuf v1.30.0 // indirect @@ -113,5 +113,7 @@ require ( replace ( github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117 + github.com/pingcap/tidb => github.com/defined2014/tidb v1.1.0-beta.0.20230712071811-87a5c15e9c7d + github.com/pingcap/tidb/parser => github.com/defined2014/tidb/parser v0.0.0-20230712071811-87a5c15e9c7d github.com/tikv/client-go/v2 => ../ ) diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 536d6ae86..8e8d6f8f0 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -96,7 +96,11 @@ github.com/danjacques/gofslock v0.0.0-20220131014315-6e321f4509c8/go.mod h1:VT5E github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= +github.com/defined2014/tidb v1.1.0-beta.0.20230712071811-87a5c15e9c7d h1:zpJ8XU5O7Ba4ClrfgFjPir3rJ2xw5lEscWvm5g24GO8= +github.com/defined2014/tidb v1.1.0-beta.0.20230712071811-87a5c15e9c7d/go.mod h1:EHSARUioGL7ZY2hjmu9wWyZ1mzSuwjddKwnOlEphiQo= +github.com/defined2014/tidb/parser v0.0.0-20230712071811-87a5c15e9c7d h1:pyklrx9LutolLFg1l9CnMPUcvcnhBT2ppsq5zkbthDo= +github.com/defined2014/tidb/parser v0.0.0-20230712071811-87a5c15e9c7d/go.mod h1:ENXEsaVS6N3CTMpL4txc6m93y6XaztF9W4SFLjhPWJg= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= @@ -149,7 +153,7 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= -github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -287,8 +291,8 @@ github.com/lestrrat-go/blackmagic v1.0.1 h1:lS5Zts+5HIC/8og6cGHb0uCcNCa3OUt1ygh3 github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= github.com/lestrrat-go/httprc v1.0.4 h1:bAZymwoZQb+Oq8MEbyipag7iSq6YIga8Wj6GOiJGdI8= github.com/lestrrat-go/iter v1.0.2 h1:gMXo1q4c2pHmC3dn8LzRhJfP1ceCbgSiT9lUydIzltI= -github.com/lestrrat-go/jwx/v2 v2.0.6 h1:RlyYNLV892Ed7+FTfj1ROoF6x7WxL965PGTHso/60G0= -github.com/lestrrat-go/option v1.0.0 h1:WqAWL8kh8VcSoD6xjSH34/1m8yxluXQbDeKNfvFeEO4= +github.com/lestrrat-go/jwx/v2 v2.0.11 h1:ViHMnaMeaO0qV16RZWBHM7GTrAnX2aFLVKofc7FuKLQ= +github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a h1:N9zuLhTvBSRt0gWSiJswwQ2HqDmtX/ZCDJURnKUt1Ik= github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= @@ -357,24 +361,20 @@ github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0Ckz github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZLmhahmvHm7n9DUxGRQT00208= +github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230530111525-e4919c190b46 h1:GBlml2UIrI9IR3DdBnUWNeXizK4PwJhYPO7eWgCNErg= -github.com/pingcap/kvproto v0.0.0-20230530111525-e4919c190b46/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230703085931-3788ab4ee6b3 h1:TN9FcS+r19rKyrsPJDPfcXWkztVHfbpZ9Xkic6kE+v0= +github.com/pingcap/kvproto v0.0.0-20230703085931-3788ab4ee6b3/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY= github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb v1.1.0-beta.0.20230619015310-8b1006f1af04 h1:rBlNvmdFTB+WPFCvCSBQwK8PtoZA7JVblygbn3X8mmg= -github.com/pingcap/tidb v1.1.0-beta.0.20230619015310-8b1006f1af04/go.mod h1:Pi4ObsVr4eRNxCyFXsho0uzA/+ZDSjB0ASvaUI3ECUI= -github.com/pingcap/tidb/parser v0.0.0-20230619015310-8b1006f1af04 h1:pNz5l+3UFAu040skaE92eyVSlnAPMaCQBRW3E746lMs= -github.com/pingcap/tidb/parser v0.0.0-20230619015310-8b1006f1af04/go.mod h1:QKyCQPh1u6N7yXpPElNJzzWL7J70duuRo45GMH0FMNk= -github.com/pingcap/tipb v0.0.0-20230602100112-acb7942db1ca h1:J2HQyR5v1AcoBzx5/AYJW9XFSIl6si6YoC6yGI1W89c= -github.com/pingcap/tipb v0.0.0-20230602100112-acb7942db1ca/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6 h1:D79RE4RVhq2ic8sqDSv7QdL0tT5aZV3CaCXUAT41iWc= +github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -424,6 +424,7 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y= github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA= github.com/shirou/gopsutil/v3 v3.23.5 h1:5SgDCeQ0KW0S4N0znjeM/eFHXXOKyv2dVNgRq/c9P6Y= @@ -578,8 +579,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= -golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM= +golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20230519143937-03e91628a987 h1:3xJIFvzUFbu4ls0BTBYcgbCGhA63eAOEMxIHugyXJqA= @@ -624,8 +625,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU= +golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -641,8 +642,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= -golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -681,13 +682,14 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= +golang.org/x/term v0.9.0 h1:GRRCnKYhdQrD8kfRAdQ6Zcw1P0OcELxGLKJvtjVMZ28= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -695,8 +697,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -724,8 +726,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM= -golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= +golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg= +golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/unionstore/memdb_iterator.go b/internal/unionstore/memdb_iterator.go index 3e37d865e..3b4bdfd8f 100644 --- a/internal/unionstore/memdb_iterator.go +++ b/internal/unionstore/memdb_iterator.go @@ -67,10 +67,11 @@ func (db *MemDB) Iter(k []byte, upperBound []byte) (Iterator, error) { // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. // The returned iterator will iterate from greater key to smaller key. // If k is nil, the returned iterator will be positioned at the last key. -// TODO: Add lower bound limit -func (db *MemDB) IterReverse(k []byte) (Iterator, error) { +// It yields only keys that >= lowerBound. If lowerBound is nil, it means the lowerBound is unbounded. +func (db *MemDB) IterReverse(k []byte, lowerBound []byte) (Iterator, error) { i := &MemdbIterator{ db: db, + start: lowerBound, end: k, reverse: true, } @@ -128,7 +129,7 @@ func (i *MemdbIterator) Valid() bool { if !i.reverse { return !i.curr.isNull() && (i.end == nil || bytes.Compare(i.Key(), i.end) < 0) } - return !i.curr.isNull() + return !i.curr.isNull() && (i.start == nil || bytes.Compare(i.Key(), i.start) >= 0) } // Flags returns flags belong to current iterator. diff --git a/internal/unionstore/memdb_snapshot.go b/internal/unionstore/memdb_snapshot.go index 2adedfbdb..ad6b87d43 100644 --- a/internal/unionstore/memdb_snapshot.go +++ b/internal/unionstore/memdb_snapshot.go @@ -60,6 +60,21 @@ func (db *MemDB) SnapshotIter(start, end []byte) Iterator { return it } +// SnapshotIterReverse returns a reverse Iterator for a snapshot of MemBuffer. +func (db *MemDB) SnapshotIterReverse(k, lowerBound []byte) Iterator { + it := &memdbSnapIter{ + MemdbIterator: &MemdbIterator{ + db: db, + start: lowerBound, + end: k, + reverse: true, + }, + cp: db.getSnapshot(), + } + it.init() + return it +} + func (db *MemDB) getSnapshot() MemDBCheckpoint { if len(db.stages) > 0 { return db.stages[0] @@ -123,10 +138,18 @@ func (i *memdbSnapIter) setValue() bool { } func (i *memdbSnapIter) init() { - if len(i.start) == 0 { - i.seekToFirst() + if i.reverse { + if len(i.end) == 0 { + i.seekToLast() + } else { + i.seek(i.end) + } } else { - i.seek(i.start) + if len(i.start) == 0 { + i.seekToFirst() + } else { + i.seek(i.start) + } } if !i.setValue() { diff --git a/internal/unionstore/memdb_test.go b/internal/unionstore/memdb_test.go index 8ce0a669a..e5fe20631 100644 --- a/internal/unionstore/memdb_test.go +++ b/internal/unionstore/memdb_test.go @@ -98,14 +98,34 @@ func TestIterator(t *testing.T) { } assert.Equal(i, cnt) - i-- - for it, _ := db.IterReverse(nil); it.Valid(); it.Next() { + for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() { + binary.BigEndian.PutUint32(buf[:], uint32(i-1)) + assert.Equal(it.Key(), buf[:]) + assert.Equal(it.Value(), buf[:]) + i-- + } + assert.Equal(i, 0) + + var upperBoundBytes, lowerBoundBytes [4]byte + bound := 400 + binary.BigEndian.PutUint32(upperBoundBytes[:], uint32(bound)) + for it, _ := db.Iter(nil, upperBoundBytes[:]); it.Valid(); it.Next() { binary.BigEndian.PutUint32(buf[:], uint32(i)) assert.Equal(it.Key(), buf[:]) assert.Equal(it.Value(), buf[:]) + i++ + } + assert.Equal(i, bound) + + i = cnt + binary.BigEndian.PutUint32(lowerBoundBytes[:], uint32(bound)) + for it, _ := db.IterReverse(nil, lowerBoundBytes[:]); it.Valid(); it.Next() { + binary.BigEndian.PutUint32(buf[:], uint32(i-1)) + assert.Equal(it.Key(), buf[:]) + assert.Equal(it.Value(), buf[:]) i-- } - assert.Equal(i, -1) + assert.Equal(i, bound) } func TestDiscard(t *testing.T) { @@ -139,7 +159,7 @@ func TestDiscard(t *testing.T) { assert.Equal(i, cnt) i-- - for it, _ := db.IterReverse(nil); it.Valid(); it.Next() { + for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() { binary.BigEndian.PutUint32(buf[:], uint32(i)) assert.Equal(it.Key(), buf[:]) assert.Equal(it.Value(), buf[:]) @@ -197,7 +217,7 @@ func TestFlushOverwrite(t *testing.T) { assert.Equal(i, cnt) i-- - for it, _ := db.IterReverse(nil); it.Valid(); it.Next() { + for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() { binary.BigEndian.PutUint32(kbuf[:], uint32(i)) binary.BigEndian.PutUint32(vbuf[:], uint32(i+1)) assert.Equal(it.Key(), kbuf[:]) @@ -279,7 +299,7 @@ func TestNestedSandbox(t *testing.T) { assert.Equal(i, 200) i-- - for it, _ := db.IterReverse(nil); it.Valid(); it.Next() { + for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() { binary.BigEndian.PutUint32(kbuf[:], uint32(i)) binary.BigEndian.PutUint32(vbuf[:], uint32(i)) if i < 100 { @@ -336,7 +356,7 @@ func TestOverwrite(t *testing.T) { assert.Equal(i, cnt) i-- - for it, _ := db.IterReverse(nil); it.Valid(); it.Next() { + for it, _ := db.IterReverse(nil, nil); it.Valid(); it.Next() { binary.BigEndian.PutUint32(buf[:], uint32(i)) assert.Equal(it.Key(), buf[:]) v := binary.BigEndian.Uint32(it.Value()) @@ -569,7 +589,7 @@ func checkConsist(t *testing.T, p1 *MemDB, p2 *leveldb.DB) { assert.Equal(it.Value(), it2.Value()) if prevKey != nil { - it, _ = p1.IterReverse(it2.Key()) + it, _ = p1.IterReverse(it2.Key(), nil) assert.Equal(it.Key(), prevKey) assert.Equal(it.Value(), prevVal) } @@ -579,7 +599,7 @@ func checkConsist(t *testing.T, p1 *MemDB, p2 *leveldb.DB) { prevVal = it2.Value() } - it1, _ = p1.IterReverse(nil) + it1, _ = p1.IterReverse(nil, nil) for it2.Last(); it2.Valid(); it2.Prev() { assert.Equal(it1.Key(), it2.Key()) assert.Equal(it1.Value(), it2.Value()) diff --git a/internal/unionstore/mock.go b/internal/unionstore/mock.go index 66c5f6cac..658697511 100644 --- a/internal/unionstore/mock.go +++ b/internal/unionstore/mock.go @@ -71,8 +71,8 @@ func (s *mockSnapshot) Iter(k []byte, upperBound []byte) (Iterator, error) { return s.store.Iter(k, upperBound) } -func (s *mockSnapshot) IterReverse(k []byte) (Iterator, error) { - return s.store.IterReverse(k) +func (s *mockSnapshot) IterReverse(k, lowerBound []byte) (Iterator, error) { + return s.store.IterReverse(k, lowerBound) } func (s *mockSnapshot) SetOption(opt int, val interface{}) {} diff --git a/internal/unionstore/union_store.go b/internal/unionstore/union_store.go index e87190fde..fbf629029 100644 --- a/internal/unionstore/union_store.go +++ b/internal/unionstore/union_store.go @@ -71,8 +71,8 @@ type uSnapshot interface { // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. // The returned iterator will iterate from greater key to smaller key. // If k is nil, the returned iterator will be positioned at the last key. - // TODO: Add lower bound limit - IterReverse(k []byte) (Iterator, error) + // It yields only keys that >= lowerBound. If lowerBound is nil, it means the lowerBound is unbounded. + IterReverse(k, lowerBound []byte) (Iterator, error) } // KVUnionStore is an in-memory Store which contains a buffer for write and a @@ -124,12 +124,12 @@ func (us *KVUnionStore) Iter(k, upperBound []byte) (Iterator, error) { } // IterReverse implements the Retriever interface. -func (us *KVUnionStore) IterReverse(k []byte) (Iterator, error) { - bufferIt, err := us.memBuffer.IterReverse(k) +func (us *KVUnionStore) IterReverse(k, lowerBound []byte) (Iterator, error) { + bufferIt, err := us.memBuffer.IterReverse(k, lowerBound) if err != nil { return nil, err } - retrieverIt, err := us.snapshot.IterReverse(k) + retrieverIt, err := us.snapshot.IterReverse(k, lowerBound) if err != nil { return nil, err } diff --git a/internal/unionstore/union_store_test.go b/internal/unionstore/union_store_test.go index 8356bf394..7d4aeca4f 100644 --- a/internal/unionstore/union_store_test.go +++ b/internal/unionstore/union_store_test.go @@ -125,25 +125,33 @@ func TestUnionStoreIterReverse(t *testing.T) { err = store.Set([]byte("3"), []byte("3")) assert.Nil(err) - iter, err := us.IterReverse(nil) + iter, err := us.IterReverse(nil, nil) assert.Nil(err) checkIterator(t, iter, [][]byte{[]byte("3"), []byte("2"), []byte("1")}, [][]byte{[]byte("3"), []byte("2"), []byte("1")}) - iter, err = us.IterReverse([]byte("3")) + iter, err = us.IterReverse([]byte("3"), []byte("1")) + assert.Nil(err) + checkIterator(t, iter, [][]byte{[]byte("2"), []byte("1")}, [][]byte{[]byte("2"), []byte("1")}) + + iter, err = us.IterReverse([]byte("3"), nil) assert.Nil(err) checkIterator(t, iter, [][]byte{[]byte("2"), []byte("1")}, [][]byte{[]byte("2"), []byte("1")}) err = us.GetMemBuffer().Set([]byte("0"), []byte("0")) assert.Nil(err) - iter, err = us.IterReverse([]byte("3")) + iter, err = us.IterReverse([]byte("3"), nil) assert.Nil(err) checkIterator(t, iter, [][]byte{[]byte("2"), []byte("1"), []byte("0")}, [][]byte{[]byte("2"), []byte("1"), []byte("0")}) err = us.GetMemBuffer().Delete([]byte("1")) assert.Nil(err) - iter, err = us.IterReverse([]byte("3")) + iter, err = us.IterReverse([]byte("3"), nil) assert.Nil(err) checkIterator(t, iter, [][]byte{[]byte("2"), []byte("0")}, [][]byte{[]byte("2"), []byte("0")}) + + iter, err = us.IterReverse([]byte("3"), []byte("1")) + assert.Nil(err) + checkIterator(t, iter, [][]byte{[]byte("2")}, [][]byte{[]byte("2")}) } func checkIterator(t *testing.T, iter Iterator, keys [][]byte, values [][]byte) { diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index cebd534d4..4e9f217db 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -550,7 +550,8 @@ func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int, o return } -// ReverseScan queries continuous kv pairs in range [endKey, startKey), up to limit pairs. +// ReverseScan queries continuous kv pairs in range [endKey, startKey), +// from startKey(upperBound) to endKey(lowerBound), up to limit pairs. // The returned keys are in reversed lexicographical order. // If endKey is empty, it means unbounded. // If you want to include the startKey or exclude the endKey, push a '\0' to the key. For example, to scan diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index a12027aa6..6d90a1658 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -247,8 +247,8 @@ func (txn *KVTxn) Iter(k []byte, upperBound []byte) (unionstore.Iterator, error) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. -func (txn *KVTxn) IterReverse(k []byte) (unionstore.Iterator, error) { - return txn.us.IterReverse(k) +func (txn *KVTxn) IterReverse(k, lowerBound []byte) (unionstore.Iterator, error) { + return txn.us.IterReverse(k, lowerBound) } // Delete removes the entry for key k from kv store. diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 221cf5910..01662f1dd 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -146,7 +146,7 @@ func (s *Scanner) Next() error { } current := s.cache[s.idx] - if (!s.reverse && (len(s.endKey) > 0 && kv.CmpKey(current.Key, s.endKey) >= 0)) || + if (!s.reverse && len(s.endKey) > 0 && kv.CmpKey(current.Key, s.endKey) >= 0) || (s.reverse && len(s.nextStartKey) > 0 && kv.CmpKey(current.Key, s.nextStartKey) < 0) { s.eof = true s.Close() diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index ef293e6bf..a4253d62e 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -771,8 +771,8 @@ func (s *KVSnapshot) Iter(k []byte, upperBound []byte) (unionstore.Iterator, err } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. -func (s *KVSnapshot) IterReverse(k []byte) (unionstore.Iterator, error) { - scanner, err := newScanner(s, nil, k, s.scanBatchSize, true) +func (s *KVSnapshot) IterReverse(k, lowerBound []byte) (unionstore.Iterator, error) { + scanner, err := newScanner(s, lowerBound, k, s.scanBatchSize, true) return scanner, err } From 4ec212d5f2f7cdf4e045c5bbbd12ecef0417586a Mon Sep 17 00:00:00 2001 From: crazycs Date: Fri, 14 Jul 2023 11:03:10 +0800 Subject: [PATCH 2/4] *: fix stale read ops metric (#878) (#889) Signed-off-by: crazycs520 Co-authored-by: disksing --- internal/locate/region_request.go | 78 +++++++++++++++++-------------- metrics/metrics.go | 37 +++++++++++---- metrics/shortcuts.go | 28 +++++++---- 3 files changed, 90 insertions(+), 53 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index aac428b48..8e4cb1cab 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -259,6 +259,7 @@ type replicaSelector struct { region *Region regionStore *regionStore replicas []*replica + labels []*metapb.StoreLabel state selectorState // replicas[targetIdx] is the replica handling the request this time targetIdx AccessIndex @@ -747,6 +748,10 @@ func newReplicaSelector( ) } + option := storeSelectorOp{} + for _, op := range opts { + op(&option) + } var state selectorState if !req.ReplicaReadType.IsFollowerRead() { if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 { @@ -755,10 +760,6 @@ func newReplicaSelector( state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx} } } else { - option := storeSelectorOp{} - for _, op := range opts { - op(&option) - } if req.ReplicaReadType == kv.ReplicaReadPreferLeader { WithPerferLeader()(&option) } @@ -778,6 +779,7 @@ func newReplicaSelector( cachedRegion, regionStore, replicas, + option.labels, state, -1, -1, @@ -1156,9 +1158,14 @@ func (s *RegionRequestSender) SendReqCtx( var staleReadCollector *staleReadMetricsCollector if req.StaleRead { - staleReadCollector = &staleReadMetricsCollector{hit: true} - staleReadCollector.onReq(req) - defer staleReadCollector.collect() + staleReadCollector = &staleReadMetricsCollector{} + defer func() { + if retryTimes == 0 { + metrics.StaleReadHitCounter.Add(1) + } else { + metrics.StaleReadMissCounter.Add(1) + } + }() } for { @@ -1171,9 +1178,6 @@ func (s *RegionRequestSender) SendReqCtx( zap.Int("times", retryTimes), ) } - if req.StaleRead && staleReadCollector != nil { - staleReadCollector.hit = false - } } rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...) @@ -1204,6 +1208,14 @@ func (s *RegionRequestSender) SendReqCtx( return resp, nil, retryTimes, err } + var isLocalTraffic bool + if staleReadCollector != nil && s.replicaSelector != nil { + if target := s.replicaSelector.targetReplica(); target != nil { + isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.labels) + staleReadCollector.onReq(req, isLocalTraffic) + } + } + logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) s.storeAddr = rpcCtx.Addr @@ -1262,7 +1274,7 @@ func (s *RegionRequestSender) SendReqCtx( } } if staleReadCollector != nil { - staleReadCollector.onResp(resp) + staleReadCollector.onResp(req.Type, resp, isLocalTraffic) } return resp, rpcCtx, retryTimes, nil } @@ -1946,35 +1958,36 @@ func (s *RegionRequestSender) onRegionError( } type staleReadMetricsCollector struct { - tp tikvrpc.CmdType - hit bool - out int - in int } -func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) { +func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) { size := 0 switch req.Type { case tikvrpc.CmdGet: - size += req.Get().Size() + size = req.Get().Size() case tikvrpc.CmdBatchGet: - size += req.BatchGet().Size() + size = req.BatchGet().Size() case tikvrpc.CmdScan: - size += req.Scan().Size() + size = req.Scan().Size() case tikvrpc.CmdCop: - size += req.Cop().Size() + size = req.Cop().Size() default: // ignore non-read requests return } - s.tp = req.Type size += req.Context.Size() - s.out = size + if isLocalTraffic { + metrics.StaleReadLocalOutBytes.Add(float64(size)) + metrics.StaleReadReqLocalCounter.Add(1) + } else { + metrics.StaleReadRemoteOutBytes.Add(float64(size)) + metrics.StaleReadReqCrossZoneCounter.Add(1) + } } -func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) { +func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) { size := 0 - switch s.tp { + switch tp { case tikvrpc.CmdGet: size += resp.Resp.(*kvrpcpb.GetResponse).Size() case tikvrpc.CmdBatchGet: @@ -1984,19 +1997,12 @@ func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) { case tikvrpc.CmdCop: size += resp.Resp.(*coprocessor.Response).Size() default: - // unreachable + // ignore non-read requests return } - s.in = size -} - -func (s *staleReadMetricsCollector) collect() { - in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic - if !s.hit { - in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic - } - if s.in > 0 && s.out > 0 { - in.Observe(float64(s.in)) - out.Observe(float64(s.out)) + if isLocalTraffic { + metrics.StaleReadLocalInBytes.Add(float64(size)) + } else { + metrics.StaleReadRemoteInBytes.Add(float64(size)) } } diff --git a/metrics/metrics.go b/metrics/metrics.go index 4f5abfd8a..5c72f05b2 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -101,7 +101,9 @@ var ( TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec - TiKVStaleReadSizeSummary *prometheus.SummaryVec + TiKVStaleReadCounter *prometheus.CounterVec + TiKVStaleReadReqCounter *prometheus.CounterVec + TiKVStaleReadBytes *prometheus.CounterVec ) // Label constants. @@ -700,13 +702,28 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblType, LblStore}) - TiKVStaleReadSizeSummary = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "stale_read_bytes", - Help: "Size of stale read.", - ConstLabels: constLabels, + TiKVStaleReadCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stale_read_counter", + Help: "Counter of stale read hit/miss", + }, []string{LblResult}) + + TiKVStaleReadReqCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stale_read_req_counter", + Help: "Counter of stale read requests", + }, []string{LblType}) + + TiKVStaleReadBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stale_read_bytes", + Help: "Counter of stale read requests bytes", }, []string{LblResult, LblDirection}) initShortcuts() @@ -789,7 +806,9 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVStoreSlowScoreGauge) prometheus.MustRegister(TiKVPreferLeaderFlowsGauge) - prometheus.MustRegister(TiKVStaleReadSizeSummary) + prometheus.MustRegister(TiKVStaleReadCounter) + prometheus.MustRegister(TiKVStaleReadReqCounter) + prometheus.MustRegister(TiKVStaleReadBytes) } // readCounter reads the value of a prometheus.Counter. diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index 0acd7aba6..c791cb987 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -160,10 +160,16 @@ var ( AggressiveLockedKeysLockedWithConflict prometheus.Counter AggressiveLockedKeysNonForceLock prometheus.Counter - StaleReadHitInTraffic prometheus.Observer - StaleReadHitOutTraffic prometheus.Observer - StaleReadMissInTraffic prometheus.Observer - StaleReadMissOutTraffic prometheus.Observer + StaleReadHitCounter prometheus.Counter + StaleReadMissCounter prometheus.Counter + + StaleReadReqLocalCounter prometheus.Counter + StaleReadReqCrossZoneCounter prometheus.Counter + + StaleReadLocalInBytes prometheus.Counter + StaleReadLocalOutBytes prometheus.Counter + StaleReadRemoteInBytes prometheus.Counter + StaleReadRemoteOutBytes prometheus.Counter ) func initShortcuts() { @@ -296,8 +302,14 @@ func initShortcuts() { // TiKV). AggressiveLockedKeysNonForceLock = TiKVAggressiveLockedKeysCounter.WithLabelValues("non_force_lock") - StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in") - StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out") - StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in") - StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out") + StaleReadHitCounter = TiKVStaleReadCounter.WithLabelValues("hit") + StaleReadMissCounter = TiKVStaleReadCounter.WithLabelValues("miss") + + StaleReadReqLocalCounter = TiKVStaleReadReqCounter.WithLabelValues("local") + StaleReadReqCrossZoneCounter = TiKVStaleReadReqCounter.WithLabelValues("cross-zone") + + StaleReadLocalInBytes = TiKVStaleReadBytes.WithLabelValues("local", "in") + StaleReadLocalOutBytes = TiKVStaleReadBytes.WithLabelValues("local", "out") + StaleReadRemoteInBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "in") + StaleReadRemoteOutBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "out") } From 51aab264f681de1b6db7e6fe4e5412fa804c12df Mon Sep 17 00:00:00 2001 From: weedge Date: Fri, 14 Jul 2023 12:08:47 +0800 Subject: [PATCH 3/4] add gc options (#828) Signed-off-by: weedge Co-authored-by: disksing --- examples/gcworker/gcworker.go | 8 +++++++- tikv/gc.go | 25 +++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/examples/gcworker/gcworker.go b/examples/gcworker/gcworker.go index 39da00df0..1191adcd2 100644 --- a/examples/gcworker/gcworker.go +++ b/examples/gcworker/gcworker.go @@ -21,6 +21,7 @@ import ( "time" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv" ) @@ -36,10 +37,15 @@ func main() { panic(err) } - sysSafepoint, err := client.GC(context.Background(), *safepoint) + + sysSafepoint, err := client.GC(context.Background(), *safepoint, tikv.WithConcurrency(10)) if err != nil { panic(err) } fmt.Printf("Finished GC, expect safepoint:%d(%+v),real safepoint:%d(+%v)\n", *safepoint, oracle.GetTimeFromTS(*safepoint), sysSafepoint, oracle.GetTimeFromTS(sysSafepoint)) + err = client.Close() + if err != nil { + panic(err) + } } diff --git a/tikv/gc.go b/tikv/gc.go index d448815ed..2b47e6bca 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -50,8 +50,15 @@ import ( // // GC is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). // We skip the second step "delete ranges" which is an optimization for TiDB. -func (s *KVStore) GC(ctx context.Context, safepoint uint64) (newSafePoint uint64, err error) { - err = s.resolveLocks(ctx, safepoint, 8) +func (s *KVStore) GC(ctx context.Context, safepoint uint64, opts ...GCOpt) (newSafePoint uint64, err error) { + // default concurrency 8 + opt := &gcOption{concurrency: 8} + // Apply gc options. + for _, o := range opts { + o(opt) + } + + err = s.resolveLocks(ctx, safepoint, opt.concurrency) if err != nil { return } @@ -59,6 +66,20 @@ func (s *KVStore) GC(ctx context.Context, safepoint uint64) (newSafePoint uint64 return s.pdClient.UpdateGCSafePoint(ctx, safepoint) } +type gcOption struct { + concurrency int +} + +// GCOpt gc options +type GCOpt func(*gcOption) + +// WithConcurrency is used to set gc RangeTaskRunner concurrency. +func WithConcurrency(concurrency int) GCOpt { + return func(opt *gcOption) { + opt.concurrency = concurrency + } +} + func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error { handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) { return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey) From 85fc8f3375656b008ce86165662ce4a957dea28a Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 14 Jul 2023 13:27:14 +0800 Subject: [PATCH 4/4] reload region cache when store is resolved from invalid status (#843) Signed-off-by: you06 Co-authored-by: disksing --- error/error.go | 2 +- internal/locate/region_cache.go | 91 +++++++++++++++++++++---- internal/locate/region_cache_test.go | 18 ++--- internal/locate/region_request.go | 33 +++++++-- internal/locate/region_request3_test.go | 4 +- internal/locate/region_request_test.go | 4 +- 6 files changed, 117 insertions(+), 35 deletions(-) diff --git a/error/error.go b/error/error.go index 761ebef6d..62a75ff65 100644 --- a/error/error.go +++ b/error/error.go @@ -246,7 +246,7 @@ type ErrAssertionFailed struct { *kvrpcpb.AssertionFailed } -// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues“ is not. +// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not. type ErrLockOnlyIfExistsNoReturnValue struct { StartTS uint64 ForUpdateTs uint64 diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8581dc5b2..a5fb471af 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -153,6 +153,7 @@ type Region struct { syncFlag int32 // region need be sync in next turn lastAccess int64 // last region access time, see checkRegionCacheTTL invalidReason InvalidReason // the reason why the region is invalidated + asyncReload atomic.Bool // the region need to be reloaded in async mode } // AccessIndex represent the index for accessIndex array @@ -420,7 +421,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu { r.latestVersions = make(map[uint64]RegionVerID) r.sorted = NewSortedRegions(btreeDegree) for _, region := range rs { - r.insertRegionToCache(region) + r.insertRegionToCache(region, true) } return r } @@ -466,6 +467,11 @@ type RegionCache struct { // requestLiveness always returns unreachable. mockRequestLiveness atomic.Pointer[livenessFunc] } + + regionsNeedReload struct { + sync.Mutex + regions []uint64 + } } // NewRegionCache creates a RegionCache. @@ -519,8 +525,8 @@ func (c *RegionCache) clear() { } // thread unsafe, should use with lock -func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { - c.mu.insertRegionToCache(cachedRegion) +func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { + c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion) } // Close releases region cache's resource. @@ -531,8 +537,13 @@ func (c *RegionCache) Close() { // asyncCheckAndResolveLoop with func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { ticker := time.NewTicker(interval) - defer ticker.Stop() + reloadRegionTicker := time.NewTicker(10 * time.Second) + defer func() { + ticker.Stop() + reloadRegionTicker.Stop() + }() var needCheckStores []*Store + reloadNextLoop := make(map[uint64]struct{}) for { needCheckStores = needCheckStores[:0] select { @@ -550,6 +561,21 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { // there's a deleted store in the stores map which guaranteed by reReslve(). return state != unresolved && state != tombstone && state != deleted }) + case <-reloadRegionTicker.C: + for regionID := range reloadNextLoop { + c.reloadRegion(regionID) + delete(reloadNextLoop, regionID) + } + c.regionsNeedReload.Lock() + for _, regionID := range c.regionsNeedReload.regions { + // will reload in next tick, wait a while for two reasons: + // 1. there may an unavailable duration while recreating the connection. + // 2. the store may just be started, and wait safe ts synced to avoid the + // possible dataIsNotReady error. + reloadNextLoop[regionID] = struct{}{} + } + c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0] + c.regionsNeedReload.Unlock() } } } @@ -1060,7 +1086,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } else if r.checkNeedReloadAndMarkUpdated() { // load region when it be marked as need reload. @@ -1073,7 +1099,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } } @@ -1214,7 +1240,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } else { r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } } @@ -1233,7 +1259,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() return &KeyLocation{ Region: r.VerID(), @@ -1243,6 +1269,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K }, nil } +func (c *RegionCache) scheduleReloadRegion(region *Region) { + if region == nil || !region.asyncReload.CompareAndSwap(false, true) { + // async reload scheduled by other thread. + return + } + regionID := region.GetID() + if regionID > 0 { + c.regionsNeedReload.Lock() + c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID) + c.regionsNeedReload.Unlock() + } +} + +func (c *RegionCache) reloadRegion(regionID uint64) { + bo := retry.NewNoopBackoff(context.Background()) + lr, err := c.loadRegionByID(bo, regionID) + if err != nil { + // ignore error and use old region info. + logutil.Logger(bo.GetCtx()).Error("load region failure", + zap.Uint64("regionID", regionID), zap.Error(err)) + if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { + oldRegion.asyncReload.Store(false) + } + return + } + c.mu.Lock() + c.insertRegionToCache(lr, false) + c.mu.Unlock() +} + // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. @@ -1327,7 +1383,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey // TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information // for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it. for _, region := range regions { - c.insertRegionToCache(region) + c.insertRegionToCache(region, true) } return @@ -1401,7 +1457,9 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // insertRegionToCache tries to insert the Region to cache. // It should be protected by c.mu.l.Lock(). -func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { +// if `invalidateOldRegion` is false, the old region cache should be still valid, +// and it may still be used by some kv requests. +func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) if oldRegion != nil { store := cachedRegion.getStore() @@ -1416,8 +1474,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader { store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly)) } - // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. - oldRegion.invalidate(Other) + // If the old region is still valid, do not invalidate it to avoid unnecessary backoff. + if invalidateOldRegion { + // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. + oldRegion.invalidate(Other) + } // Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which // is under transferring regions. store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load()) @@ -1939,7 +2000,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext c.mu.Lock() for _, region := range newRegions { - c.insertRegionToCache(region) + c.insertRegionToCache(region, true) } c.mu.Unlock() @@ -2057,7 +2118,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV return } c.mu.Lock() - c.insertRegionToCache(new) + c.insertRegionToCache(new, true) c.mu.Unlock() }() } @@ -2527,8 +2588,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { } func (s *Store) getResolveState() resolveState { - var state resolveState if s == nil { + var state resolveState return state } return resolveState(atomic.LoadUint64(&s.state)) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 03ef3e309..619da2d2e 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -966,7 +966,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() { region := createSampleRegion([]byte("k1"), []byte("k2")) region.meta.Id = 1 region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10} - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}} r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}} @@ -1257,7 +1257,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() { filterUnavailablePeers(cpRegion) region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) // OnSendFail should not panic s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) @@ -1293,7 +1293,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() { cpRegion := &pd.Region{Meta: cpMeta} region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) // OnSendFail should not panic s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) @@ -1466,12 +1466,12 @@ func (s *testRegionCacheSuite) TestBuckets() { fakeRegion.setStore(cachedRegion.getStore().clone()) // no buckets fakeRegion.getStore().buckets = nil - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(defaultBuckets, cachedRegion.getStore().buckets) // stale buckets fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1} - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(defaultBuckets, cachedRegion.getStore().buckets) // new buckets @@ -1481,7 +1481,7 @@ func (s *testRegionCacheSuite) TestBuckets() { Keys: buckets.Keys, } fakeRegion.getStore().buckets = newBuckets - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(newBuckets, cachedRegion.getStore().buckets) @@ -1614,7 +1614,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() { region, err := s.cache.loadRegion(s.bo, []byte("c"), false) s.Nil(err) s.Equal(region.GetID(), regions[0]) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) loc, err = s.cache.LocateKey(s.bo, []byte{'c'}) s.Nil(err) s.Equal(loc.Region.GetID(), regions[0]) @@ -1625,7 +1625,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() { region, err = s.cache.loadRegion(s.bo, []byte("e"), false) s.Nil(err) s.Equal(region.GetID(), regions[0]) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) loc, err = s.cache.LocateKey(s.bo, []byte{'e'}) s.Nil(err) s.Equal(loc.Region.GetID(), regions[0]) @@ -1739,7 +1739,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10) s.Equal(len(r), 2) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 8e4cb1cab..9d3193431 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -572,18 +572,39 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if state.option.preferLeader { state.lastIdx = state.leaderIdx } + var offset int + if state.lastIdx >= 0 { + offset = rand.Intn(replicaSize) + } + reloadRegion := false for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - idx := AccessIndex((int(state.lastIdx) + i) % replicaSize) - // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader - // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. - for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ { - idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize) + var idx AccessIndex + if state.option.preferLeader { + if i == 0 { + idx = state.lastIdx + } else { + // randomly select next replica, but skip state.lastIdx + if (i+offset)%replicaSize == 0 { + offset++ + } + } + } else { + idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) } - if state.isCandidate(idx, selector.replicas[idx]) { + selectReplica := selector.replicas[idx] + if state.isCandidate(idx, selectReplica) { state.lastIdx = idx selector.targetIdx = idx break } + if selectReplica.isEpochStale() && + selectReplica.store.getResolveState() == resolved && + selectReplica.store.getLivenessState() == reachable { + reloadRegion = true + } + } + if reloadRegion { + selector.regionCache.scheduleReloadRegion(selector.region) } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index e05c2cac5..ad29e4710 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -322,7 +322,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) cache.mu.Unlock() // Test accessFollower state with kv.ReplicaReadLearner request type. @@ -373,7 +373,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) cache.mu.Unlock() // Verify creating the replicaSelector. diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 7f52d34cd..42c8ceb34 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -608,7 +608,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) @@ -618,7 +618,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v3 := region.Region.confVer + 1 r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}} st = &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region)