Skip to content

Commit

Permalink
Merge branch 'split_hashjoin_part5' of github.com:XuHuaiyu/tidb into …
Browse files Browse the repository at this point in the history
…split_hashjoin_part5
  • Loading branch information
XuHuaiyu committed Nov 28, 2022
2 parents 420d7fa + 96bbab5 commit f4cfcbb
Show file tree
Hide file tree
Showing 15 changed files with 300 additions and 38 deletions.
14 changes: 12 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ http_archive(
],
)

load("@io_bazel_rules_go//go:deps.bzl", "go_register_toolchains", "go_rules_dependencies")
load("@io_bazel_rules_go//go:deps.bzl", "go_download_sdk", "go_register_toolchains", "go_rules_dependencies")
load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies")
load("//:DEPS.bzl", "go_deps")

Expand All @@ -27,9 +27,19 @@ go_deps()

go_rules_dependencies()

go_download_sdk(
name = "go_sdk",
urls = [
"http://ats.apps.svc/golang/{}",
"http://bazel-cache.pingcap.net:8080/golang/{}",
"https://mirrors.aliyun.com/golang/{}",
"https://dl.google.com/go/{}",
],
version = "1.19.3",
)

go_register_toolchains(
nogo = "@//build:tidb_nogo",
version = "1.19.3",
)

gazelle_dependencies()
Expand Down
4 changes: 0 additions & 4 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,8 +1719,6 @@ func TestCreateUniqueExpressionIndex(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")

Expand All @@ -1740,8 +1738,6 @@ func TestCreateUniqueExpressionIndex(t *testing.T) {
if checkErr != nil {
return
}
err := originalCallback.OnChanged(nil)
require.NoError(t, err)
switch job.SchemaState {
case model.StateDeleteOnly:
for _, sql := range stateDeleteOnlySQLs {
Expand Down
27 changes: 21 additions & 6 deletions ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func TestIndexChange(t *testing.T) {
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t (c1 int primary key, c2 int)")
tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")

Expand Down Expand Up @@ -221,6 +219,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
}

func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table.Table) error {
var err1 error
// WriteOnlyTable: insert t values (6, 6)
err := sessiontxn.NewTxn(context.Background(), ctx)
if err != nil {
Expand All @@ -231,7 +230,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 6, 6, true)
if err != nil {
if ddl.IsEnableFastReorg() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, 6, 6, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
// PublicTable: insert t values (7, 7)
Expand All @@ -250,10 +253,18 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 5, 7, true)
if err != nil {
if ddl.IsEnableFastReorg() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, 5, 7, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 7, 7, false)
if ddl.IsEnableFastReorg() {
err = checkIndexExists(ctx, writeTbl, 7, 7, false)
} else {
err = checkIndexExists(ctx, publicTbl, 7, 7, false)
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -283,7 +294,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
idxVal := row[1].GetInt64()
handle := row[0].GetInt64()
err = checkIndexExists(ctx, publicTbl, idxVal, handle, true)
if err != nil {
if ddl.IsEnableFastReorg() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, idxVal, handle, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
}
Expand Down
72 changes: 72 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,75 @@ func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) {
tk.MustGetErrMsg("set @@tidb_enable_amend_pessimistic_txn = 1;",
"amend pessimistic transactions is not compatible with tidb_ddl_enable_fast_reorg")
}

// TestCreateUniqueIndexKeyExist this case will test below things:
// Create one unique index idx((a*b+1));
// insert (0, 6) and delete it;
// insert (0, 9), it should be successful;
// Should check temp key exist and skip deleted mark
// The error returned below:
// Error: Received unexpected error:
//
// [kv:1062]Duplicate entry '1' for key 't.idx'
func TestCreateUniqueIndexKeyExist(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

stateDeleteOnlySQLs := []string{"insert into t values (5, 5)", "begin pessimistic;", "insert into t select * from t", "rollback", "insert into t set b = 6", "update t set b = 7 where a = 1", "delete from t where b = 4"}

// If waitReorg timeout, the worker may enter writeReorg more than 2 times.
reorgTime := 0
d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &ddl.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if t.Failed() {
return
}
var err error
switch job.SchemaState {
case model.StateDeleteOnly:
for _, sql := range stateDeleteOnlySQLs {
_, err = tk1.Exec(sql)
assert.NoError(t, err)
}
// (1, 7), (2, 2), (3, 3), (5, 5), (0, 6)
case model.StateWriteOnly:
_, err = tk1.Exec("insert into t values (8, 8)")
assert.NoError(t, err)
_, err = tk1.Exec("update t set b = 7 where a = 2")
assert.NoError(t, err)
_, err = tk1.Exec("delete from t where b = 3")
assert.NoError(t, err)
// (1, 7), (2, 7), (5, 5), (0, 6), (8, 8)
case model.StateWriteReorganization:
if reorgTime < 1 {
reorgTime++
} else {
return
}
_, err = tk1.Exec("insert into t values (10, 10)")
assert.NoError(t, err)
_, err = tk1.Exec("delete from t where b = 6")
assert.NoError(t, err)
_, err = tk1.Exec("insert into t set b = 9")
assert.NoError(t, err)
_, err = tk1.Exec("update t set b = 7 where a = 5")
assert.NoError(t, err)
// (1, 7), (2, 7), (5, 7), (8, 8), (10, 10), (0, 9)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)
tk.MustExec("alter table t add unique index idx((a*b+1))")
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "1 7", "2 7", "5 7", "8 8", "10 10"))
}
4 changes: 4 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
if !distinct {
continue
}
// If index is used ingest ways, then we should check key from temp index.
if v.Meta().BackfillState != model.BackfillStateInapplicable {
_, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key)
}
colValStr, err1 := formatDataForDupError(colVals)
if err1 != nil {
return nil, err1
Expand Down
10 changes: 10 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor

import (
"bytes"
"context"
"encoding/hex"
"fmt"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -262,6 +264,14 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}
return err
}
// Since the temp index stores deleted key with marked 'deleteu' for unique key at the end
// of value, So if return a key we check and skip deleted key.
if tablecodec.IsTempIndexKey(uk.newKey) {
rowVal := val[:len(val)-1]
if bytes.Equal(rowVal, tables.DeleteMarkerUnique) {
continue
}
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
return err
Expand Down
Loading

0 comments on commit f4cfcbb

Please sign in to comment.