Skip to content

Commit

Permalink
sink/mysql: rollback txn to recycle db conn, refine timeout param in …
Browse files Browse the repository at this point in the history
…db conn (#1279)
  • Loading branch information
amyangfei authored Jan 7, 2021
1 parent d767cc7 commit 8525eb9
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 331 deletions.
34 changes: 34 additions & 0 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
defaultBatchReplaceSize = 20
defaultReadTimeout = "2m"
defaultWriteTimeout = "2m"
defaultDialTimeout = "2m"
defaultSafeMode = true
)

Expand Down Expand Up @@ -285,6 +286,7 @@ type sinkParams struct {
batchReplaceSize int
readTimeout string
writeTimeout string
dialTimeout string
enableOldValue bool
safeMode bool
timezone string
Expand All @@ -304,6 +306,7 @@ var defaultParams = &sinkParams{
batchReplaceSize: defaultBatchReplaceSize,
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
dialTimeout: defaultDialTimeout,
safeMode: defaultSafeMode,
}

Expand Down Expand Up @@ -342,6 +345,7 @@ func configureSinkURI(
}
dsnCfg.Params["readTimeout"] = params.readTimeout
dsnCfg.Params["writeTimeout"] = params.writeTimeout
dsnCfg.Params["timeout"] = params.dialTimeout

autoRandom, err := checkTiDBVariable(ctx, testDB, "allow_auto_random_explicit_insert", "1")
if err != nil {
Expand Down Expand Up @@ -466,6 +470,23 @@ func parseSinkURI(ctx context.Context, sinkURI *url.URL, opts map[string]string)
params.timezone = fmt.Sprintf(`"%s"`, tz.String())
}

// read, write, and dial timeout for each individual connection, equals to
// readTimeout, writeTimeout, timeout in go mysql driver respectively.
// ref: https://github.com/go-sql-driver/mysql#connection-pool-and-timeouts
// To keep the same style with other sink parameters, we use dash as word separator.
s = sinkURI.Query().Get("read-timeout")
if s != "" {
params.readTimeout = s
}
s = sinkURI.Query().Get("write-timeout")
if s != "" {
params.writeTimeout = s
}
s = sinkURI.Query().Get("timeout")
if s != "" {
params.dialTimeout = s
}

return params, nil
}

Expand All @@ -479,6 +500,10 @@ func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) {
}
err = db.PingContext(ctx)
if err != nil {
// close db to recycle resources
if closeErr := db.Close(); closeErr != nil {
log.Warn("close db failed", zap.Error(err))
}
return nil, errors.Annotate(
cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection")
}
Expand Down Expand Up @@ -527,6 +552,9 @@ func newMySQLSink(
if params.timezone != "" {
dsn.Params["time_zone"] = params.timezone
}
dsn.Params["readTimeout"] = params.readTimeout
dsn.Params["writeTimeout"] = params.writeTimeout
dsn.Params["timeout"] = params.dialTimeout
testDB, err := getDBConnImpl(ctx, dsn.FormatDSN())
if err != nil {
return nil, err
Expand Down Expand Up @@ -814,12 +842,18 @@ func (s *mysqlSink) execDMLWithMaxRetries(
args := dmls.values[i]
log.Debug("exec row", zap.String("sql", query), zap.Any("args", args))
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Warn("failed to rollback txn", zap.Error(err))
}
return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err))
}
}
if len(dmls.markSQL) != 0 {
log.Debug("exec row", zap.String("sql", dmls.markSQL))
if _, err := tx.ExecContext(ctx, dmls.markSQL); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Warn("failed to rollback txn", zap.Error(err))
}
return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err))
}
}
Expand Down
Loading

0 comments on commit 8525eb9

Please sign in to comment.