Skip to content

Commit

Permalink
table: add some injections for testing data-inconsistency defense (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
ekexium authored Jun 7, 2022
1 parent 2b4a4c6 commit 30f7037
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
3 changes: 3 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type MemBuffer interface {

// Size returns sum of keys and values length.
Size() int

// RemoveFromBuffer removes the entry from the buffer. It's used for testing.
RemoveFromBuffer(Key)
}

// LockCtx contains information for LockKeys method.
Expand Down
4 changes: 4 additions & 0 deletions store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (m *memBuffer) Delete(k kv.Key) error {
return m.MemDB.Delete(k)
}

func (m *memBuffer) RemoveFromBuffer(k kv.Key) {
m.MemDB.RemoveFromBuffer(k)
}

func (m *memBuffer) DeleteWithFlags(k kv.Key, ops ...kv.FlagsOp) error {
err := m.MemDB.DeleteWithFlags(k, getTiKVFlagsOps(ops)...)
return derr.ToTiDBErr(err)
Expand Down
94 changes: 94 additions & 0 deletions table/tables/mutation_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package tables

import (
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -384,3 +388,93 @@ func getOrBuildColumnMaps(
}
return maps
}

// only used in tests
// commands is a comma separated string, each representing a type of corruptions to the mutations
// The injection depends on actual encoding rules.
func corruptMutations(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle, cmds string) error {
commands := strings.Split(cmds, ",")
memBuffer := txn.GetMemBuffer()

indexMutations, _, err := collectTableMutationsFromBufferStage(t, memBuffer, sh)
if err != nil {
return errors.Trace(err)
}

for _, cmd := range commands {
switch cmd {
case "extraIndex":
// an extra index mutation
{
if len(indexMutations) == 0 {
continue
}
indexMutation := indexMutations[0]
key := make([]byte, len(indexMutation.key))
copy(key, indexMutation.key)
key[len(key)-1] += 1
if len(indexMutation.value) == 0 {
if err := memBuffer.Delete(key); err != nil {
return errors.Trace(err)
}
} else {
if err := memBuffer.Set(key, indexMutation.value); err != nil {
return errors.Trace(err)
}
}
}
case "missingIndex":
// an index mutation is missing
// "missIndex" should be placed in front of "extraIndex"es,
// in case it removes the mutation that was just added
{
indexMutation := indexMutations[0]
memBuffer.RemoveFromBuffer(indexMutation.key)
}
case "corruptIndexKey":
// a corrupted index mutation.
// TODO: distinguish which part is corrupted, value or handle
{
indexMutation := indexMutations[0]
key := indexMutation.key
memBuffer.RemoveFromBuffer(key)
key[len(key)-1] += 1
if len(indexMutation.value) == 0 {
if err := memBuffer.Delete(key); err != nil {
return errors.Trace(err)
}
} else {
if err := memBuffer.Set(key, indexMutation.value); err != nil {
return errors.Trace(err)
}
}
}
case "corruptIndexValue":
// TODO: distinguish which part to corrupt, int handle, common handle, or restored data?
// It doesn't make much sense to always corrupt the last byte
{
if len(indexMutations) == 0 {
continue
}
indexMutation := indexMutations[0]
value := indexMutation.value
if len(value) > 0 {
value[len(value)-1] += 1
if err := memBuffer.Set(indexMutation.key, value); err != nil {
return errors.Trace(err)
}
}
}
default:
return errors.New(fmt.Sprintf("unknown command to corrupt mutation: %s", cmd))
}
}
return nil
}

func injectMutationError(t *TableCommon, txn kv.Transaction, sh kv.StagingHandle) error {
failpoint.Inject("corruptMutations", func(commands failpoint.Value) {
failpoint.Return(corruptMutations(t, txn, sh, commands.(string)))
})
return nil
}
10 changes: 10 additions & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,15 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
return err
}

if err = injectMutationError(t, txn, sh); err != nil {
return err
}
if sessVars.EnableMutationChecker {
if err = CheckDataConsistency(txn, sessVars, t, newData, oldData, memBuffer, sh); err != nil {
return errors.Trace(err)
}
}

memBuffer.Release(sh)
if shouldWriteBinlog(sctx, t.meta) {
if !t.meta.PKIsHandle && !t.meta.IsCommonHandle {
Expand Down Expand Up @@ -886,6 +890,9 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
return h, err
}

if err = injectMutationError(t, txn, sh); err != nil {
return nil, err
}
if sessVars.EnableMutationChecker {
if err = CheckDataConsistency(txn, sessVars, t, r, nil, memBuffer, sh); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -1146,6 +1153,9 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type

sessVars := ctx.GetSessionVars()
sc := sessVars.StmtCtx
if err = injectMutationError(t, txn, sh); err != nil {
return err
}
if sessVars.EnableMutationChecker {
if err = CheckDataConsistency(txn, sessVars, t, nil, r, memBuffer, sh); err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit 30f7037

Please sign in to comment.