Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support timeout for tikv/tidb retry #40805

Merged
merged 9 commits into from
Jan 29, 2023
4 changes: 4 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
retryTimeout = 3 * time.Second

defaultMaxRetry = 3

dbTimeout = 30 * time.Second
)

// MySQLConnectParam records the parameters needed to connect to a MySQL database.
Expand Down Expand Up @@ -74,6 +76,8 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
cfg.Params["charset"] = "utf8mb4"
cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode)
cfg.MaxAllowedPacket = int(param.MaxAllowedPacket)
cfg.ReadTimeout = dbTimeout
cfg.WriteTimeout = dbTimeout

cfg.TLS = param.TLSConfig
cfg.AllowFallbackToPlaintext = param.AllowFallbackToPlaintext
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package config

import (
"time"

"github.com/docker/go-units"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

const (
Expand All @@ -34,3 +38,11 @@ const (

DefaultBatchSize ByteSize = 100 * units.GiB
)

var (
DefaultGrpcKeepaliveParams = grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems Timeout should be larger than Time. Otherwise, Time is useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	// After a duration of this time if the client doesn't see any activity it
	// pings the server to see if the transport is still alive.
	// If set below 10s, a minimum value of 10s will be used instead.
	Time time.Duration // The current default value is infinity.
	// After having pinged for keepalive check, the client waits for a duration
	// of Timeout and if no activity is seen even after that the connection is
	// closed.
	Timeout time.Duration // The current default value is 20 seconds.

According to this comment, I think they are irrelevant. I change this to 20 seconds which is the default value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember these values are copied from CDC? maybe they have the reason to choose it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that doesn't matter much. Lightning needn't to assure second-level replication. So I think cutting this limit a bit loose is reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, if timeout is less than time, the connection is always closed before it can ping the server when there is no activity.

PermitWithoutStream: false,
})
)
1 change: 0 additions & 1 deletion br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down
7 changes: 1 addition & 6 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type clusterResourceCheckItem struct {
Expand Down Expand Up @@ -733,11 +732,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
}),
config.DefaultGrpcKeepaliveParams,
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
},
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/pdutil",
"//br/pkg/version",
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/tikv/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -88,7 +89,7 @@ func withTiKVConnection(ctx context.Context, tls *common.TLS, tikvAddr string, a
// Connect to the ImportSST service on the given TiKV node.
// The connection is needed for executing `action` and will be tear down
// when this function exits.
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(), config.DefaultGrpcKeepaliveParams)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -172,7 +173,8 @@ var fetchModeRegexp = regexp.MustCompile(`\btikv_config_rocksdb\{cf="default",na

// FetchMode obtains the import mode status of the TiKV node.
func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error) {
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//br/pkg/conn/util",
"//br/pkg/errors",
"//br/pkg/httputil",
"//br/pkg/lightning/config",
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/utils",
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/store/pdtypes"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -201,7 +202,9 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(store.GetAddress(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -341,7 +344,8 @@ func sendSplitRegionRequest(ctx context.Context, c *pdClient, regionInfo *Region
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
conn, err := grpc.Dial(store.GetAddress(), opt,
config.DefaultGrpcKeepaliveParams)
if err != nil {
return false, nil, err
}
Expand Down