Skip to content

Commit

Permalink
mounter(ticdc): decode bytes-level checksum and encode columns-level …
Browse files Browse the repository at this point in the history
…checksum (pingcap#10706)

close pingcap#10969
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Jul 3, 2024
1 parent 29aac52 commit d6c60e7
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 263 deletions.
207 changes: 174 additions & 33 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"math"
"reflect"
"sort"
"time"
"unsafe"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/pingcap/tidb/pkg/kv"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -100,11 +100,6 @@ type mounter struct {
// they should not be nil after decode at least one event in the row format v2.
decoder *rowcodec.DatumMapDecoder
preDecoder *rowcodec.DatumMapDecoder

// encoder is used to calculate the checksum.
encoder *rowcodec.Encoder
// sctx hold some information can be used by the encoder to calculate the checksum.
sctx *stmtctx.StatementContext
}

// NewMounter creates a mounter
Expand All @@ -124,9 +119,6 @@ func NewMounter(schemaStorage SchemaStorage,
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
integrity: integrity,

encoder: &rowcodec.Encoder{},
sctx: stmtctx.NewStmtCtxWithTimeZone(tz),
}
}

Expand Down Expand Up @@ -156,6 +148,8 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
if !bytes.HasPrefix(raw.Key, tablePrefix) {
return nil, nil
}
// checksumKey is only used to calculate raw checksum if necessary.
checksumKey := raw.Key
key, physicalTableID, err := decodeTableID(raw.Key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -207,7 +201,7 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
if rowKV == nil {
return nil, nil
}
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateDataSize())
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, checksumKey, raw.ApproximateDataSize())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -297,7 +291,6 @@ func (m *mounter) decodeRow(
if err != nil {
return nil, false, errors.Trace(err)
}

datums, err = tablecodec.DecodeHandleToDatumMap(
recordID, handleColIDs, handleColFt, m.tz, datums)
if err != nil {
Expand Down Expand Up @@ -455,8 +448,8 @@ func datum2Column(
return cols, rawCols, columnInfos, nil
}

func (m *mounter) calculateChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
func calculateColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, tz *time.Location,
) (uint32, error) {
columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
Expand All @@ -475,37 +468,24 @@ func (m *mounter) calculateChecksum(
Data: make([]byte, 0),
}

checksum, err := calculator.Checksum(m.tz)
checksum, err := calculator.Checksum(tz)
if err != nil {
return 0, errors.Trace(err)
}
return checksum, nil
}

// return error when calculate the checksum failed.
// return false if the checksum is not matched
func (m *mounter) verifyChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
func verifyColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, decoder *rowcodec.DatumMapDecoder, tz *time.Location,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
if isPreRow {
decoder = m.preDecoder
} else {
decoder = m.decoder
}

// if the checksum cannot be found, which means the upstream TiDB checksum is not enabled,
// so return matched as true to skip check the event.
first, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
}

checksum, err := m.calculateChecksum(columnInfos, rawColumns)
checksum, err := calculateColumnChecksum(columnInfos, rawColumns, tz)
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
return 0, false, err
Expand Down Expand Up @@ -540,7 +520,168 @@ func (m *mounter) verifyChecksum(
return checksum, false, nil
}

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
// todo: do we really need this? how about the datum.ConvertTo ?
func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) {
if value == nil {
return types.NewDatum(nil), nil
}
switch ft.GetType() {
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear:
switch v := value.(type) {
case uint64:
return types.NewUintDatum(v), nil
case int64:
return types.NewIntDatum(v), nil
}
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
// todo: DefaultStmtNoWarningContext timezone is set to UTC, is it correct?
t, err := types.ParseTime(types.DefaultStmtNoWarningContext, value.(string), ft.GetType(), ft.GetDecimal())
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewTimeDatum(t), nil
case mysql.TypeDuration:
d, _, err := types.ParseDuration(types.StrictContext, value.(string), ft.GetDecimal())
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewDurationDatum(d), nil
case mysql.TypeJSON:
bj, err := types.ParseBinaryJSONFromString(value.(string))
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewJSONDatum(bj), nil
case mysql.TypeNewDecimal:
mysqlDecimal := new(types.MyDecimal)
err := mysqlDecimal.FromString([]byte(value.(string)))
if err != nil {
return types.Datum{}, errors.Trace(err)
}
datum := types.NewDecimalDatum(mysqlDecimal)
datum.SetLength(ft.GetFlen())
datum.SetFrac(ft.GetDecimal())
return datum, nil
case mysql.TypeEnum:
enum, err := types.ParseEnumValue(ft.GetElems(), value.(uint64))
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewMysqlEnumDatum(enum), nil
case mysql.TypeSet:
set, err := types.ParseSetValue(ft.GetElems(), value.(uint64))
if err != nil {
return types.Datum{}, errors.Trace(err)
}
return types.NewMysqlSetDatum(set, ft.GetCollate()), nil
case mysql.TypeBit:
byteSize := (ft.GetFlen() + 7) >> 3
binaryLiteral := types.NewBinaryLiteralFromUint(value.(uint64), byteSize)
return types.NewMysqlBitDatum(binaryLiteral), nil
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
switch v := value.(type) {
case []byte:
return types.NewBytesDatum(v), nil
case string:
return types.NewBytesDatum([]byte(v)), nil
}
log.Panic("unknown data type when build datum",
zap.Any("type", ft.GetType()), zap.Any("value", value), zap.Reflect("type", reflect.TypeOf(value)))
case mysql.TypeFloat:
return types.NewFloat32Datum(value.(float32)), nil
case mysql.TypeDouble:
return types.NewFloat64Datum(value.(float64)), nil
default:
log.Panic("unexpected mysql type found", zap.Any("type", ft.GetType()))
}
return types.Datum{}, nil
}

func verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
) (uint32, bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
}
var (
columnIDs []int64
datums []*types.Datum
)
for _, col := range columns {
// TiDB does not encode null value into the bytes, so just ignore it.
if col.Value == nil {
continue
}
columnID := col.ColumnID
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
return 0, false, errors.Trace(err)
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
if err != nil {
return 0, false, errors.Trace(err)
}
if obtained == expected {
return expected, true, nil
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))

return expected, false, nil
}

// return error when calculate the checksum.
// return false if the checksum is not matched
func (m *mounter) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
if isPreRow {
decoder = m.preDecoder
} else {
decoder = m.decoder
}

version := decoder.ChecksumVersion()
switch version {
case 0:
return verifyColumnChecksum(columnInfos, rawColumns, decoder, m.tz)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
if err != nil {
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
default:
}
return 0, false, errors.Errorf("unknown checksum version %d", version)
}

func (m *mounter) mountRowKVEntry(
tableInfo *model.TableInfo, row *rowKVEntry, key kv.Key, dataSize int64,
) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
Expand Down Expand Up @@ -573,7 +714,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
zap.Any("tableInfo", tableInfo),
Expand Down Expand Up @@ -605,7 +746,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
zap.Any("tableInfo", tableInfo),
Expand Down
Loading

0 comments on commit d6c60e7

Please sign in to comment.