Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): decode bytes-level checksum and encode columns-level checksum (#10706) #11453

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 174 additions & 33 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"math"
"reflect"
"sort"
"time"
"unsafe"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/pingcap/tidb/pkg/kv"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -86,11 +86,6 @@ type mounter struct {
// they should not be nil after decode at least one event in the row format v2.
decoder *rowcodec.DatumMapDecoder
preDecoder *rowcodec.DatumMapDecoder

// encoder is used to calculate the checksum.
encoder *rowcodec.Encoder
// sctx hold some information can be used by the encoder to calculate the checksum.
sctx *stmtctx.StatementContext
}

// NewMounter creates a mounter
Expand All @@ -110,9 +105,6 @@ func NewMounter(schemaStorage SchemaStorage,
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
integrity: integrity,

encoder: &rowcodec.Encoder{},
sctx: stmtctx.NewStmtCtxWithTimeZone(tz),
}
}

Expand Down Expand Up @@ -142,6 +134,8 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
if !bytes.HasPrefix(raw.Key, tablePrefix) {
return nil, nil
}
// checksumKey is only used to calculate raw checksum if necessary.
checksumKey := raw.Key
key, physicalTableID, err := decodeTableID(raw.Key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -193,7 +187,7 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
if rowKV == nil {
return nil, nil
}
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateDataSize())
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, checksumKey, raw.ApproximateDataSize())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -283,7 +277,6 @@ func (m *mounter) decodeRow(
if err != nil {
return nil, false, errors.Trace(err)
}

datums, err = tablecodec.DecodeHandleToDatumMap(
recordID, handleColIDs, handleColFt, m.tz, datums)
if err != nil {
Expand Down Expand Up @@ -391,8 +384,8 @@ func datum2Column(
return cols, rawCols, columnInfos, nil
}

func (m *mounter) calculateChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
func calculateColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, tz *time.Location,
) (uint32, error) {
columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
Expand All @@ -411,37 +404,24 @@ func (m *mounter) calculateChecksum(
Data: make([]byte, 0),
}

checksum, err := calculator.Checksum(m.tz)
checksum, err := calculator.Checksum(tz)
if err != nil {
return 0, errors.Trace(err)
}
return checksum, nil
}

// return error when calculate the checksum failed.
// return false if the checksum is not matched
func (m *mounter) verifyChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
func verifyColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, decoder *rowcodec.DatumMapDecoder, tz *time.Location,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
if isPreRow {
decoder = m.preDecoder
} else {
decoder = m.decoder
}

// if the checksum cannot be found, which means the upstream TiDB checksum is not enabled,
// so return matched as true to skip check the event.
first, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
}

checksum, err := m.calculateChecksum(columnInfos, rawColumns)
checksum, err := calculateColumnChecksum(columnInfos, rawColumns, tz)
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
return 0, false, err
Expand Down Expand Up @@ -476,7 +456,168 @@ func (m *mounter) verifyChecksum(
return checksum, false, nil
}

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
// todo: do we really need this? how about the datum.ConvertTo ?
func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) {
if value == nil {
return types.NewDatum(nil), nil
}
switch ft.GetType() {
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear:
switch v := value.(type) {
case uint64:
return types.NewUintDatum(v), nil
case int64:
return types.NewIntDatum(v), nil
}
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
// todo: DefaultStmtNoWarningContext timezone is set to UTC, is it correct?
t, err := types.ParseTime(types.DefaultStmtNoWarningContext, value.(string), ft.GetType(), ft.GetDecimal())
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewTimeDatum(t), nil
case mysql.TypeDuration:
d, _, err := types.ParseDuration(types.StrictContext, value.(string), ft.GetDecimal())
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewDurationDatum(d), nil
case mysql.TypeJSON:
bj, err := types.ParseBinaryJSONFromString(value.(string))
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewJSONDatum(bj), nil
case mysql.TypeNewDecimal:
mysqlDecimal := new(types.MyDecimal)
err := mysqlDecimal.FromString([]byte(value.(string)))
if err != nil {
return types.Datum{}, errors.Trace(err)
}
datum := types.NewDecimalDatum(mysqlDecimal)
datum.SetLength(ft.GetFlen())
datum.SetFrac(ft.GetDecimal())
return datum, nil
case mysql.TypeEnum:
enum, err := types.ParseEnumValue(ft.GetElems(), value.(uint64))
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewMysqlEnumDatum(enum), nil
case mysql.TypeSet:
set, err := types.ParseSetValue(ft.GetElems(), value.(uint64))
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewMysqlSetDatum(set, ft.GetCollate()), nil
case mysql.TypeBit:
byteSize := (ft.GetFlen() + 7) >> 3
binaryLiteral := types.NewBinaryLiteralFromUint(value.(uint64), byteSize)
return types.NewMysqlBitDatum(binaryLiteral), nil
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
switch v := value.(type) {
case []byte:
return types.NewBytesDatum(v), nil
case string:
return types.NewBytesDatum([]byte(v)), nil
}
log.Panic("unknown data type when build datum",
zap.Any("type", ft.GetType()), zap.Any("value", value), zap.Reflect("type", reflect.TypeOf(value)))
case mysql.TypeFloat:
return types.NewFloat32Datum(value.(float32)), nil
case mysql.TypeDouble:
return types.NewFloat64Datum(value.(float64)), nil
default:
log.Panic("unexpected mysql type found", zap.Any("type", ft.GetType()))
}
return types.Datum{}, nil
}

func verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
) (uint32, bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
}
var (
columnIDs []int64
datums []*types.Datum
)
for _, col := range columns {
// TiDB does not encode null value into the bytes, so just ignore it.
if col.Value == nil {
continue
}
columnID := col.ColumnID
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
return 0, false, errors.Trace(err)
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
if err != nil {
return 0, false, errors.Trace(err)
}
if obtained == expected {
return expected, true, nil
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))

return expected, false, nil
}

// return error when calculate the checksum.
// return false if the checksum is not matched
func (m *mounter) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
if isPreRow {
decoder = m.preDecoder
} else {
decoder = m.decoder
}

version := decoder.ChecksumVersion()
switch version {
case 0:
return verifyColumnChecksum(columnInfos, rawColumns, decoder, m.tz)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
if err != nil {
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
default:
}
return 0, false, errors.Errorf("unknown checksum version %d", version)
}

func (m *mounter) mountRowKVEntry(
tableInfo *model.TableInfo, row *rowKVEntry, key kv.Key, dataSize int64,
) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
Expand Down Expand Up @@ -509,7 +650,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
zap.Any("tableInfo", tableInfo),
Expand Down Expand Up @@ -541,7 +682,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
zap.Any("tableInfo", tableInfo),
Expand Down
Loading
Loading