-
in vreplication/controller.go:runBlp: switch {
case len(ct.source.Tables) > 0:
// Table names can have search patterns. Resolve them against the schema.
tables, err := mysqlctl.ResolveTables(ctx, ct.mysqld, dbClient.DBName(), ct.source.Tables)
if err != nil {
ct.blpStats.ErrorCounts.Add([]string{"Invalid Source"}, 1)
return vterrors.Wrap(err, "failed to resolve table names")
}
player := binlogplayer.NewBinlogPlayerTables(dbClient, tablet, tables, ct.id, ct.blpStats)
return player.ApplyBinlogEvents(ctx)
case ct.source.KeyRange != nil:
player := binlogplayer.NewBinlogPlayerKeyRange(dbClient, tablet, ct.source.KeyRange, ct.id, ct.blpStats)
return player.ApplyBinlogEvents(ctx)
case ct.source.Filter != nil:
// Timestamp fields from binlogs are always sent as UTC.
// So, we should set the timezone to be UTC for those values to be correctly inserted.
if _, err := dbClient.ExecuteFetch("set @@session.time_zone = '+00:00'", 10000); err != nil {
return err
}
// Tables may have varying character sets. To ship the bits without interpreting them
// we set the character set to be binary.
if _, err := dbClient.ExecuteFetch("set names binary", 10000); err != nil {
return err
}
// We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid.
if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil {
return err
}
var vsClient VStreamerClient
var err error
if name := ct.source.GetExternalMysql(); name != "" {
vsClient, err = ct.vre.ec.Get(name)
if err != nil {
return err
}
} else {
vsClient = newTabletConnector(tablet)
}
if err := vsClient.Open(ctx); err != nil {
return err
}
defer vsClient.Close(ctx)
vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre)
err = vr.Replicate(ctx)
ct.lastWorkflowError.Record(err)
// If this is a mysql error that we know needs manual intervention OR
// we cannot identify this as non-recoverable, but it has persisted beyond the retry limit (maxTimeToRetryError)
if isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() {
log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err)
if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil {
return err // yes, err and not errSetState.
}
return nil // this will cause vreplicate to quit the workflow
}
return err Why make this distinction? What is the purpose? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
I don't know either. Maybe the first and second |
Beta Was this translation helpful? Give feedback.
-
Although it is not explicitly mentioned why it is divided into three stages, I feel that this article answers this question to some extent. |
Beta Was this translation helpful? Give feedback.
Although it is not explicitly mentioned why it is divided into three stages, I feel that this article answers this question to some extent.
https://vitess.io/docs/18.0/reference/vreplication/internal/life-of-a-stream/