Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support timeout for tikv/tidb retry #40805

Merged
merged 9 commits into from
Jan 29, 2023
4 changes: 4 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
retryTimeout = 3 * time.Second

defaultMaxRetry = 3

dbTimeout = 30 * time.Second
)

// MySQLConnectParam records the parameters needed to connect to a MySQL database.
Expand Down Expand Up @@ -74,6 +76,8 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
cfg.Params["charset"] = "utf8mb4"
cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode)
cfg.MaxAllowedPacket = int(param.MaxAllowedPacket)
cfg.ReadTimeout = dbTimeout
cfg.WriteTimeout = dbTimeout

cfg.TLS = param.TLSConfig
cfg.AllowFallbackToPlaintext = param.AllowFallbackToPlaintext
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func TestLoadConfig(t *testing.T) {
err = taskCfg.Adjust(context.Background())
require.NoError(t, err)
equivalentDSN := taskCfg.Checkpoint.MySQLParam.ToDriverConfig().FormatDSN()
expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27"
expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?readTimeout=30s&writeTimeout=30s&maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27"
require.Equal(t, expectedDSN, equivalentDSN)

result := taskCfg.String()
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package config

import (
"time"

"github.com/docker/go-units"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

const (
Expand All @@ -34,3 +38,11 @@ const (

DefaultBatchSize ByteSize = 100 * units.GiB
)

var (
DefaultGrpcKeepaliveParams = grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
PermitWithoutStream: false,
})
)
1 change: 0 additions & 1 deletion br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down
7 changes: 1 addition & 6 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type clusterResourceCheckItem struct {
Expand Down Expand Up @@ -733,11 +732,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
}),
config.DefaultGrpcKeepaliveParams,
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
},
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/pdutil",
"//br/pkg/version",
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/tikv/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -88,7 +89,7 @@ func withTiKVConnection(ctx context.Context, tls *common.TLS, tikvAddr string, a
// Connect to the ImportSST service on the given TiKV node.
// The connection is needed for executing `action` and will be tear down
// when this function exits.
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(), config.DefaultGrpcKeepaliveParams)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -172,7 +173,8 @@ var fetchModeRegexp = regexp.MustCompile(`\btikv_config_rocksdb\{cf="default",na

// FetchMode obtains the import mode status of the TiKV node.
func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error) {
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//br/pkg/conn/util",
"//br/pkg/errors",
"//br/pkg/httputil",
"//br/pkg/lightning/config",
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/utils",
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/store/pdtypes"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -201,7 +202,9 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(store.GetAddress(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -341,7 +344,8 @@ func sendSplitRegionRequest(ctx context.Context, c *pdClient, regionInfo *Region
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
conn, err := grpc.Dial(store.GetAddress(), opt,
config.DefaultGrpcKeepaliveParams)
if err != nil {
return false, nil, err
}
Expand Down