From b93206174d9316ccbf3000fb3d70118e1dfaa404 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 28 Jan 2023 20:38:28 +0800 Subject: [PATCH 1/3] support timeout --- br/pkg/lightning/common/util.go | 6 ++++++ br/pkg/lightning/config/const.go | 12 ++++++++++++ br/pkg/lightning/restore/precheck_impl.go | 7 +------ br/pkg/lightning/tikv/tikv.go | 6 ++++-- br/pkg/restore/split/client.go | 8 ++++++-- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index b9bdf564403de..0c2685a84c2b4 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -44,8 +44,12 @@ const ( retryTimeout = 3 * time.Second defaultMaxRetry = 3 + + dbTimeout = 30 * time.Second ) +var () + // MySQLConnectParam records the parameters needed to connect to a MySQL database. type MySQLConnectParam struct { Host string @@ -74,6 +78,8 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config { cfg.Params["charset"] = "utf8mb4" cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode) cfg.MaxAllowedPacket = int(param.MaxAllowedPacket) + cfg.ReadTimeout = dbTimeout + cfg.WriteTimeout = dbTimeout cfg.TLS = param.TLSConfig cfg.AllowFallbackToPlaintext = param.AllowFallbackToPlaintext diff --git a/br/pkg/lightning/config/const.go b/br/pkg/lightning/config/const.go index 23a38ac41117d..0265c1fa4d7ba 100644 --- a/br/pkg/lightning/config/const.go +++ b/br/pkg/lightning/config/const.go @@ -15,7 +15,11 @@ package config import ( + "time" + "github.com/docker/go-units" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) const ( @@ -34,3 +38,11 @@ const ( DefaultBatchSize ByteSize = 100 * units.GiB ) + +var ( + DefaultGrpcKeepaliveParams = grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + PermitWithoutStream: false, + }) +) diff --git a/br/pkg/lightning/restore/precheck_impl.go b/br/pkg/lightning/restore/precheck_impl.go index f412b101ff08b..8d5142a8b5fd4 100644 --- a/br/pkg/lightning/restore/precheck_impl.go +++ b/br/pkg/lightning/restore/precheck_impl.go @@ -48,7 +48,6 @@ import ( "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" ) type clusterResourceCheckItem struct { @@ -733,11 +732,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, AutoSyncInterval: 30 * time.Second, DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{ - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 10 * time.Second, - Timeout: 3 * time.Second, - PermitWithoutStream: false, - }), + config.DefaultGrpcKeepaliveParams, grpc.WithBlock(), grpc.WithReturnConnectionError(), }, diff --git a/br/pkg/lightning/tikv/tikv.go b/br/pkg/lightning/tikv/tikv.go index 8d2d797d322d1..53c06cc6102f6 100644 --- a/br/pkg/lightning/tikv/tikv.go +++ b/br/pkg/lightning/tikv/tikv.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/debugpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/version" @@ -88,7 +89,7 @@ func withTiKVConnection(ctx context.Context, tls *common.TLS, tikvAddr string, a // Connect to the ImportSST service on the given TiKV node. // The connection is needed for executing `action` and will be tear down // when this function exits. - conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption()) + conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(), config.DefaultGrpcKeepaliveParams) if err != nil { return errors.Trace(err) } @@ -172,7 +173,8 @@ var fetchModeRegexp = regexp.MustCompile(`\btikv_config_rocksdb\{cf="default",na // FetchMode obtains the import mode status of the TiKV node. func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error) { - conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption()) + conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(), + config.DefaultGrpcKeepaliveParams) if err != nil { return 0, err } diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index 5f6788d6ee470..72482a94e87dc 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn/util" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/httputil" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/store/pdtypes" pd "github.com/tikv/pd/client" @@ -201,7 +202,9 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key if err != nil { return nil, errors.Trace(err) } - conn, err := grpc.Dial(store.GetAddress(), grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(store.GetAddress(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + config.DefaultGrpcKeepaliveParams) if err != nil { return nil, errors.Trace(err) } @@ -341,7 +344,8 @@ func sendSplitRegionRequest(ctx context.Context, c *pdClient, regionInfo *Region if c.tlsConf != nil { opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf)) } - conn, err := grpc.Dial(store.GetAddress(), opt) + conn, err := grpc.Dial(store.GetAddress(), opt, + config.DefaultGrpcKeepaliveParams) if err != nil { return false, nil, err } From 1122b7a75d8513c43a25133014893cf99071c2c8 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 29 Jan 2023 00:13:10 +0800 Subject: [PATCH 2/3] fix ci lint --- br/pkg/lightning/common/util.go | 2 -- br/pkg/lightning/config/BUILD.bazel | 2 ++ br/pkg/lightning/restore/BUILD.bazel | 1 - br/pkg/lightning/tikv/BUILD.bazel | 1 + br/pkg/restore/split/BUILD.bazel | 1 + 5 files changed, 4 insertions(+), 3 deletions(-) diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 0c2685a84c2b4..fbf275a99bfe1 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -48,8 +48,6 @@ const ( dbTimeout = 30 * time.Second ) -var () - // MySQLConnectParam records the parameters needed to connect to a MySQL database. type MySQLConnectParam struct { Host string diff --git a/br/pkg/lightning/config/BUILD.bazel b/br/pkg/lightning/config/BUILD.bazel index b69d2fca0d310..b035b506aebf2 100644 --- a/br/pkg/lightning/config/BUILD.bazel +++ b/br/pkg/lightning/config/BUILD.bazel @@ -25,6 +25,8 @@ go_library( "@com_github_docker_go_units//:go-units", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//keepalive", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], diff --git a/br/pkg/lightning/restore/BUILD.bazel b/br/pkg/lightning/restore/BUILD.bazel index ef5aeb106585b..06e503e0519db 100644 --- a/br/pkg/lightning/restore/BUILD.bazel +++ b/br/pkg/lightning/restore/BUILD.bazel @@ -80,7 +80,6 @@ go_library( "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", - "@org_golang_google_grpc//keepalive", "@org_golang_x_exp//maps", "@org_golang_x_exp//slices", "@org_golang_x_sync//errgroup", diff --git a/br/pkg/lightning/tikv/BUILD.bazel b/br/pkg/lightning/tikv/BUILD.bazel index 596aa52075758..48758bfedaacf 100644 --- a/br/pkg/lightning/tikv/BUILD.bazel +++ b/br/pkg/lightning/tikv/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//br/pkg/lightning/common", + "//br/pkg/lightning/config", "//br/pkg/lightning/log", "//br/pkg/pdutil", "//br/pkg/version", diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 1726817092ba8..5ddd7b7671822 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//br/pkg/conn/util", "//br/pkg/errors", "//br/pkg/httputil", + "//br/pkg/lightning/config", "//br/pkg/logutil", "//br/pkg/redact", "//br/pkg/utils", From c11b8a8aeaeabe632e06c11785b1605b3522f182 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 29 Jan 2023 16:46:59 +0800 Subject: [PATCH 3/3] address comment, fix ut --- br/pkg/lightning/config/config_test.go | 2 +- br/pkg/lightning/config/const.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index ea0cff40a04c7..16db98845e80c 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -643,7 +643,7 @@ func TestLoadConfig(t *testing.T) { err = taskCfg.Adjust(context.Background()) require.NoError(t, err) equivalentDSN := taskCfg.Checkpoint.MySQLParam.ToDriverConfig().FormatDSN() - expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27" + expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?readTimeout=30s&writeTimeout=30s&maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27" require.Equal(t, expectedDSN, equivalentDSN) result := taskCfg.String() diff --git a/br/pkg/lightning/config/const.go b/br/pkg/lightning/config/const.go index 0265c1fa4d7ba..e114eafd8ea88 100644 --- a/br/pkg/lightning/config/const.go +++ b/br/pkg/lightning/config/const.go @@ -42,7 +42,7 @@ const ( var ( DefaultGrpcKeepaliveParams = grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 10 * time.Second, - Timeout: 3 * time.Second, + Timeout: 20 * time.Second, PermitWithoutStream: false, }) )