Skip to content

Commit

Permalink
mysql sink: refine log (#1883) (#1897)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 1, 2021
1 parent ea1b554 commit 582894e
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
select {
case s.errCh <- err:
default:
log.Info("mysql sink receives redundant error", zap.Error(err))
}
}
}()
Expand Down Expand Up @@ -1339,23 +1340,23 @@ func (s *mysqlSyncpointStore) CreateSynctable(ctx context.Context) error {
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error())
log.Error("failed to create syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2)))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
}
_, err = tx.Exec("USE " + database)
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error())
log.Error("failed to create syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2)))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
}
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS " + syncpointTableName + " (cf varchar(255),primary_ts varchar(18),secondary_ts varchar(18),PRIMARY KEY ( `cf`, `primary_ts` ) )")
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error())
log.Error("failed to create syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2)))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
}
Expand All @@ -1376,15 +1377,15 @@ func (s *mysqlSyncpointStore) SinkSyncpoint(ctx context.Context, id string, chec
log.Info("sync table: get tidb_current_ts err")
err2 := tx.Rollback()
if err2 != nil {
log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error())
log.Error("failed to write syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2)))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
}
_, err = tx.Exec("insert ignore into "+mark.SchemaName+"."+syncpointTableName+"(cf, primary_ts, secondary_ts) VALUES (?,?,?)", id, checkpointTs, secondaryTs)
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error())
log.Error("failed to write syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2)))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
}
Expand Down

0 comments on commit 582894e

Please sign in to comment.