Skip to content

Commit

Permalink
table: avoid redundant encode in insert/update/delete. (#11532)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and jackysp committed Aug 7, 2019
1 parent 45790b5 commit 0dc9106
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 19 deletions.
25 changes: 11 additions & 14 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,19 +339,18 @@ func (t *tableCommon) UpdateRecord(ctx sessionctx.Context, h int64, oldData, new
return err
}
}
colSize := make(map[int64]int64)
encodedCol := make([]byte, 0, 16)
colSize := make(map[int64]int64, len(t.Cols()))
for id, col := range t.Cols() {
encodedCol, err = tablecodec.EncodeValue(sc, encodedCol[:0], newData[id])
size, err := codec.EstimateValueSize(sc, newData[id])
if err != nil {
continue
}
newLen := len(encodedCol) - 1
encodedCol, err = tablecodec.EncodeValue(sc, encodedCol[:0], oldData[id])
newLen := size - 1
size, err = codec.EstimateValueSize(sc, oldData[id])
if err != nil {
continue
}
oldLen := len(encodedCol) - 1
oldLen := size - 1
colSize[col.ID] = int64(newLen - oldLen)
}
sessVars.TxnCtx.UpdateDeltaForTable(t.physicalTableID, 0, 1, colSize)
Expand Down Expand Up @@ -544,14 +543,13 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ..
}
}
sc.AddAffectedRows(1)
colSize := make(map[int64]int64)
encodedCol := make([]byte, 0, 16)
colSize := make(map[int64]int64, len(r))
for id, col := range t.Cols() {
encodedCol, err = tablecodec.EncodeValue(sc, encodedCol[:0], r[id])
size, err := codec.EstimateValueSize(sc, r[id])
if err != nil {
continue
}
colSize[col.ID] = int64(len(encodedCol) - 1)
colSize[col.ID] = int64(size) - 1
}
sessVars.TxnCtx.UpdateDeltaForTable(t.physicalTableID, 1, 1, colSize)
return recordID, nil
Expand Down Expand Up @@ -726,15 +724,14 @@ func (t *tableCommon) RemoveRecord(ctx sessionctx.Context, h int64, r []types.Da
}
err = t.addDeleteBinlog(ctx, binlogRow, colIDs)
}
colSize := make(map[int64]int64)
encodedCol := make([]byte, 0, 16)
colSize := make(map[int64]int64, len(t.Cols()))
sc := ctx.GetSessionVars().StmtCtx
for id, col := range t.Cols() {
encodedCol, err = tablecodec.EncodeValue(sc, encodedCol[:0], r[id])
size, err := codec.EstimateValueSize(sc, r[id])
if err != nil {
continue
}
colSize[col.ID] = -int64(len(encodedCol) - 1)
colSize[col.ID] = -int64(size - 1)
}
ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(t.physicalTableID, -1, 1, colSize)
return err
Expand Down
10 changes: 5 additions & 5 deletions types/mydecimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ with the correct -1/0/+1 result
then the encoded value is not memory comparable.
NOTE
the buffer is assumed to be of the size decimalBinSize(precision, frac)
the buffer is assumed to be of the size DecimalBinSize(precision, frac)
RETURN VALUE
bin - binary value
Expand Down Expand Up @@ -1334,7 +1334,7 @@ func (d *MyDecimal) FromBin(bin []byte, precision, frac int) (binSize int, err e
if bin[binIdx]&0x80 > 0 {
mask = 0
}
binSize = decimalBinSize(precision, frac)
binSize = DecimalBinSize(precision, frac)
dCopy := make([]byte, 40)
dCopy = dCopy[:binSize]
copy(dCopy, bin)
Expand Down Expand Up @@ -1409,8 +1409,8 @@ func (d *MyDecimal) FromBin(bin []byte, precision, frac int) (binSize int, err e
return binSize, err
}

// decimalBinSize returns the size of array to hold a binary representation of a decimal.
func decimalBinSize(precision, frac int) int {
// DecimalBinSize returns the size of array to hold a binary representation of a decimal.
func DecimalBinSize(precision, frac int) int {
digitsInt := precision - frac
wordsInt := digitsInt / digitsPerWord
wordsFrac := frac / digitsPerWord
Expand Down Expand Up @@ -2242,7 +2242,7 @@ func DecimalPeak(b []byte) (int, error) {
}
precision := int(b[0])
frac := int(b[1])
return decimalBinSize(precision, frac) + 2, nil
return DecimalBinSize(precision, frac) + 2, nil
}

// NewDecFromInt creates a MyDecimal from int.
Expand Down
61 changes: 61 additions & 0 deletions util/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,38 @@ func encode(sc *stmtctx.StatementContext, b []byte, vals []types.Datum, comparab
return b, errors.Trace(err)
}

// EstimateValueSize uses to estimate the value size of the encoded values.
func EstimateValueSize(sc *stmtctx.StatementContext, val types.Datum) (int, error) {
l := 0
switch val.Kind() {
case types.KindInt64:
l = valueSizeOfSignedInt(val.GetInt64())
case types.KindUint64:
l = valueSizeOfUnsignedInt(val.GetUint64())
case types.KindFloat32, types.KindFloat64, types.KindMysqlTime, types.KindMysqlDuration:
l = 9
case types.KindString, types.KindBytes:
l = valueSizeOfBytes(val.GetBytes())
case types.KindMysqlDecimal:
l = valueSizeOfDecimal(val.GetMysqlDecimal(), val.Length(), val.Frac()) + 1
case types.KindMysqlEnum:
l = valueSizeOfUnsignedInt(uint64(val.GetMysqlEnum().ToNumber()))
case types.KindMysqlSet:
l = valueSizeOfUnsignedInt(uint64(val.GetMysqlSet().ToNumber()))
case types.KindMysqlBit, types.KindBinaryLiteral:
val, err := val.GetBinaryLiteral().ToInt(sc)
terror.Log(errors.Trace(err))
l = valueSizeOfUnsignedInt(val)
case types.KindMysqlJSON:
l = 2 + len(val.GetMysqlJSON().Value)
case types.KindNull, types.KindMinNotNull, types.KindMaxValue:
l = 1
default:
return l, errors.Errorf("unsupported encode type %d", val.Kind())
}
return l, nil
}

// EncodeMySQLTime encodes datum of `KindMysqlTime` to []byte.
func EncodeMySQLTime(sc *stmtctx.StatementContext, d types.Datum, tp byte, b []byte) (_ []byte, err error) {
t := d.GetMysqlTime()
Expand Down Expand Up @@ -181,6 +213,10 @@ func encodeBytes(b []byte, v []byte, comparable bool) []byte {
return b
}

func valueSizeOfBytes(v []byte) int {
return valueSizeOfSignedInt(int64(len(v))) + len(v)
}

func sizeBytes(v []byte, comparable bool) int {
if comparable {
reallocSize := (len(v)/encGroupSize + 1) * (encGroupSize + 1)
Expand All @@ -201,6 +237,20 @@ func encodeSignedInt(b []byte, v int64, comparable bool) []byte {
return b
}

func valueSizeOfSignedInt(v int64) int {
if v < 0 {
v = 0 - v - 1
}
// Flag occupy 1 bit and at lease 1 bit.
size := 2
v = v >> 6
for v > 0 {
size++
v = v >> 7
}
return size
}

func encodeUnsignedInt(b []byte, v uint64, comparable bool) []byte {
if comparable {
b = append(b, uintFlag)
Expand All @@ -212,6 +262,17 @@ func encodeUnsignedInt(b []byte, v uint64, comparable bool) []byte {
return b
}

func valueSizeOfUnsignedInt(v uint64) int {
// Flag occupy 1 bit and at lease 1 bit.
size := 2
v = v >> 7
for v > 0 {
size++
v = v >> 7
}
return size
}

func sizeInt(comparable bool) int {
if comparable {
return 9
Expand Down
66 changes: 66 additions & 0 deletions util/codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,27 @@ func (s *testCodecSuite) TestCodecKey(c *C) {

b, err = EncodeValue(sc, nil, t.Input...)
c.Assert(err, IsNil)
size, err := estimateValuesSize(sc, t.Input)
c.Assert(err, IsNil)
c.Assert(len(b), Equals, size)
args, err = Decode(b, 1)
c.Assert(err, IsNil)
c.Assert(args, DeepEquals, t.Expect)
}
}

func estimateValuesSize(sc *stmtctx.StatementContext, vals []types.Datum) (int, error) {
size := 0
for _, val := range vals {
length, err := EstimateValueSize(sc, val)
if err != nil {
return 0, err
}
size += length
}
return size, nil
}

func (s *testCodecSuite) TestCodecKeyCompare(c *C) {
defer testleak.AfterTest(c)()
table := []struct {
Expand Down Expand Up @@ -729,6 +744,13 @@ func (s *testCodecSuite) TestDecimal(c *C) {

ret := bytes.Compare(b1, b2)
c.Assert(ret, Equals, t.Ret, Commentf("%v %x %x", t, b1, b2))

b1, err = EncodeValue(sc, b1[:0], d1)
c.Assert(err, IsNil)
size, err := EstimateValueSize(sc, d1)
c.Assert(err, IsNil)
c.Assert(len(b1), Equals, size)

}

floats := []float64{-123.45, -123.40, -23.45, -1.43, -0.93, -0.4333, -0.068,
Expand All @@ -743,6 +765,10 @@ func (s *testCodecSuite) TestDecimal(c *C) {
b, err := EncodeDecimal(nil, d.GetMysqlDecimal(), d.Length(), d.Frac())
c.Assert(err, IsNil)
decs = append(decs, b)
size, err := EstimateValueSize(sc, d)
c.Assert(err, IsNil)
// size - 1 because the flag occupy 1 bit.
c.Assert(len(b), Equals, size-1)
}
for i := 0; i < len(decs)-1; i++ {
cmp := bytes.Compare(decs[i], decs[i+1])
Expand Down Expand Up @@ -1023,3 +1049,43 @@ func (s *testCodecSuite) TestHashChunkRow(c *C) {
c.Assert(err2, IsNil)
c.Assert(b1, BytesEquals, b2)
}

func (s *testCodecSuite) TestValueSizeOfSignedInt(c *C) {
testCase := []int64{64, 8192, 1048576, 134217728, 17179869184, 2199023255552, 281474976710656, 36028797018963968, 4611686018427387904}
var b []byte
for _, v := range testCase {
b := encodeSignedInt(b[:0], v-10, false)
c.Assert(len(b), Equals, valueSizeOfSignedInt(v-10))

b = encodeSignedInt(b[:0], v, false)
c.Assert(len(b), Equals, valueSizeOfSignedInt(v))

b = encodeSignedInt(b[:0], v+10, false)
c.Assert(len(b), Equals, valueSizeOfSignedInt(v+10))

// Test for negative value.
b = encodeSignedInt(b[:0], 0-v, false)
c.Assert(len(b), Equals, valueSizeOfSignedInt(0-v))

b = encodeSignedInt(b[:0], 0-v+10, false)
c.Assert(len(b), Equals, valueSizeOfSignedInt(0-v+10))

b = encodeSignedInt(b[:0], 0-v-10, false)
c.Assert(len(b), Equals, valueSizeOfSignedInt(0-v-10))
}
}

func (s *testCodecSuite) TestValueSizeOfUnsignedInt(c *C) {
testCase := []uint64{128, 16384, 2097152, 268435456, 34359738368, 4398046511104, 562949953421312, 72057594037927936, 9223372036854775808}
var b []byte
for _, v := range testCase {
b := encodeUnsignedInt(b[:0], v-10, false)
c.Assert(len(b), Equals, valueSizeOfUnsignedInt(v-10))

b = encodeUnsignedInt(b[:0], v, false)
c.Assert(len(b), Equals, valueSizeOfUnsignedInt(v))

b = encodeUnsignedInt(b[:0], v+10, false)
c.Assert(len(b), Equals, valueSizeOfUnsignedInt(v+10))
}
}
7 changes: 7 additions & 0 deletions util/codec/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func EncodeDecimal(b []byte, dec *types.MyDecimal, precision, frac int) ([]byte,
return b, errors.Trace(err)
}

func valueSizeOfDecimal(dec *types.MyDecimal, precision, frac int) int {
if precision == 0 {
precision, frac = dec.PrecisionAndFrac()
}
return types.DecimalBinSize(precision, frac) + 2
}

// DecodeDecimal decodes bytes to decimal.
func DecodeDecimal(b []byte) ([]byte, *types.MyDecimal, int, int, error) {
failpoint.Inject("errorInDecodeDecimal", func(val failpoint.Value) {
Expand Down

0 comments on commit 0dc9106

Please sign in to comment.