Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-5.1' into release-5.1-…
Browse files Browse the repository at this point in the history
…8858592f7eae
  • Loading branch information
wshwsh12 committed Jun 28, 2022
2 parents 83a801d + 2f4f52b commit 3009052
Show file tree
Hide file tree
Showing 98 changed files with 2,292 additions and 424 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dev: checklist check test
# Install the check tools.
check-setup:tools/bin/revive tools/bin/goword tools/bin/gometalinter tools/bin/gosec

check: fmt errcheck unconvert lint tidy testSuite check-static vet staticcheck errdoc
check: fmt errcheck unconvert lint tidy testSuite check-static vet errdoc

# These need to be fixed before they can be ran regularly
check-fail: goword check-slow
Expand Down Expand Up @@ -89,6 +89,7 @@ vet:
@echo "vet"
$(GO) vet -all $(PACKAGES) 2>&1 | $(FAIL_ON_STDOUT)

# staticcheck seems not introduced in 5.2/5.3 and blocked the ci
staticcheck:
$(GO) get honnef.co/go/tools/cmd/staticcheck
$(STATICCHECK) ./...
Expand Down
302 changes: 302 additions & 0 deletions cmd/explaintest/r/collation_agg_func.result

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions cmd/explaintest/r/subquery.result
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ Projection 5.00 root Column#22
├─TableReader(Build) 5.00 root data:TableFullScan
│ └─TableFullScan 5.00 cop[tikv] table:t keep order:false
└─StreamAgg(Probe) 1.00 root funcs:count(1)->Column#21
└─IndexJoin 0.22 root inner join, inner:TableReader, outer key:test.t.a, inner key:test.t.a, equal cond:eq(test.t.a, test.t.a)
├─IndexReader(Build) 0.45 root index:IndexRangeScan
│ └─IndexRangeScan 0.45 cop[tikv] table:s, index:idx(b, c, d) range: decided by [eq(test.t.b, 1) eq(test.t.c, 1) eq(test.t.d, test.t.a)], keep order:false
└─IndexJoin 0.50 root inner join, inner:TableReader, outer key:test.t.a, inner key:test.t.a, equal cond:eq(test.t.a, test.t.a)
├─IndexReader(Build) 1.00 root index:IndexRangeScan
│ └─IndexRangeScan 1.00 cop[tikv] table:s, index:idx(b, c, d) range: decided by [eq(test.t.b, 1) eq(test.t.c, 1) eq(test.t.d, test.t.a)], keep order:false
└─TableReader(Probe) 1.00 root data:TableRangeScan
└─TableRangeScan 1.00 cop[tikv] table:t1 range: decided by [test.t.a], keep order:false
drop table if exists t;
Expand Down
102 changes: 102 additions & 0 deletions cmd/explaintest/t/collation_agg_func.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# These tests test the aggregate function's behavior according to collation.
# The result of min/max of enum/set is wrong, please fix them soon.

# prepare database
create database collation_agg_func;
use collation_agg_func;

create table t(id int, value varchar(20) charset utf8mb4 collate utf8mb4_general_ci, value1 varchar(20) charset utf8mb4 collate utf8mb4_bin);
insert into t values (1, 'abc', 'abc '),(4, 'Abc', 'abc'),(3,'def', 'def '), (5, 'abc', 'ABC');

# group_concat
desc format='brief' select group_concat(value order by 1) from t;
select group_concat(value order by 1) from t;
desc format='brief' select group_concat(value) from t;
select group_concat(value) from t;
desc format='brief' select group_concat(value collate utf8mb4_bin) from t;
select group_concat(value collate utf8mb4_bin) from t;
desc format='brief' select group_concat(distinct value order by 1) from t;
select upper(group_concat(distinct value order by 1)) from t;
desc format='brief' select group_concat(distinct value collate utf8mb4_bin order by 1) from t;
select upper(group_concat(distinct value collate utf8mb4_bin order by 1)) from t;
desc format='brief' select group_concat(distinct value) from t;
select upper(group_concat(distinct value)) from t;
desc format='brief' select group_concat(distinct value collate utf8mb4_bin) from t;
select upper(group_concat(distinct value collate utf8mb4_bin)) from t;

# count(distinct)
desc format='brief' select count(distinct value) from t;
select count(distinct value) from t;
desc format='brief' select count(distinct value collate utf8mb4_bin) from t;
select count(distinct value collate utf8mb4_bin) from t;
desc format='brief' select count(distinct value, value1) from t;
select count(distinct value, value1) from t;
desc format='brief' select count(distinct value collate utf8mb4_bin, value1) from t;
select count(distinct value collate utf8mb4_bin, value1) from t;

# approxCountDistinct
desc format='brief' select approx_count_distinct(value) from t;
select approx_count_distinct(value) from t;
desc format='brief' select approx_count_distinct(value collate utf8mb4_bin) from t;
select approx_count_distinct(value collate utf8mb4_bin) from t;
desc format='brief' select approx_count_distinct(value, value1) from t;
select approx_count_distinct(value, value1) from t;
desc format='brief' select approx_count_distinct(value collate utf8mb4_bin, value1) from t;
select approx_count_distinct(value collate utf8mb4_bin, value1) from t;

# minMax
create table tt(a char(10), b enum('a', 'B', 'c'), c set('a', 'B', 'c'), d json) collate utf8mb4_general_ci;
insert into tt values ("a", "a", "a", JSON_OBJECT("a", "a"));
--error 1265
insert into tt values ("A", "A", "A", JSON_OBJECT("A", "A"));
--error 1265
insert into tt values ("b", "b", "b", JSON_OBJECT("b", "b"));
insert into tt values ("B", "B", "B", JSON_OBJECT("B", "B"));
insert into tt values ("c", "c", "c", JSON_OBJECT("c", "c"));
--error 1265
insert into tt values ("C", "C", "C", JSON_OBJECT("C", "C"));
split table tt by (0), (1), (2), (3), (4), (5);
desc format='brief' select min(a) from tt;
select min(a) from tt;
desc format='brief' select min(a collate utf8mb4_bin) from tt;
select min(a collate utf8mb4_bin) from tt;
desc format='brief' select max(a) from tt;
select max(a) from tt;
desc format='brief' select max(a collate utf8mb4_bin) from tt;
select max(a collate utf8mb4_bin) from tt;
desc format='brief' select min(b) from tt;
select min(b) from tt;
desc format='brief' select min(b collate utf8mb4_bin) from tt;
# Fix me later.
# select min(b collate utf8mb4_bin) from tt;
desc format='brief' select max(b) from tt;
select max(b) from tt;
desc format='brief' select max(b collate utf8mb4_bin) from tt;
# Fix me later.
# select max(b collate utf8mb4_bin) from tt;
desc format='brief' select min(c) from tt;
select min(c) from tt;
desc format='brief' select min(c collate utf8mb4_bin) from tt;
# Fix me later.
# select min(c collate utf8mb4_bin) from tt;
desc format='brief' select max(c) from tt;
select max(c) from tt;
desc format='brief' select max(c collate utf8mb4_bin) from tt;
# Fix me later.
# select max(c collate utf8mb4_bin) from tt;
desc format='brief' select min(d) from tt;
select min(d) from tt;
--error 1253
desc format='brief' select min(d collate utf8mb4_bin) from tt;
--error 1253
select min(d collate utf8mb4_bin) from tt;
desc format='brief' select max(d) from tt;
select max(d) from tt;
--error 1253
desc format='brief' select max(d collate utf8mb4_bin) from tt;
--error 1253
select max(d collate utf8mb4_bin) from tt;

# cleanup environment
drop database collation_agg_func;
use test
7 changes: 7 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,13 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
}

func tryDecodeToHandleString(key kv.Key) string {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Warn("tryDecodeToHandleString panic",
zap.Any("recover()", r),
zap.Binary("key", key))
}
}()
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
recordPrefixIdx := bytes.Index(key, []byte("_r"))
Expand Down
10 changes: 7 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
if job.IsRollingback() {
// For those column-type-change jobs which don't reorg the data.
if !needChangeColumnData(oldCol, jobParam.newCol) {
return rollbackModifyColumnJob(t, tblInfo, job, oldCol, jobParam.modifyColumnTp)
return rollbackModifyColumnJob(t, tblInfo, job, jobParam.newCol, oldCol, jobParam.modifyColumnTp)
}
// For those column-type-change jobs which reorg the data.
return rollbackModifyColumnJobWithData(t, tblInfo, job, oldCol, jobParam)
Expand Down Expand Up @@ -1454,6 +1454,10 @@ func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.Ind
func (w *worker) doModifyColumn(
d *ddlCtx, t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) {
if oldCol.ID != newCol.ID {
job.State = model.JobStateRollingback
return ver, errKeyColumnDoesNotExits.GenWithStack("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", oldCol.Name, newCol.ID)
}
// Column from null to not null.
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag)
Expand Down Expand Up @@ -1762,9 +1766,9 @@ func checkAddColumnTooManyColumns(colNum int) error {
}

// rollbackModifyColumnJob rollbacks the job when an error occurs.
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, newCol, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
var err error
if modifyColumnTp == mysql.TypeNull {
if oldCol.ID == newCol.ID && modifyColumnTp == mysql.TypeNull {
// field NotNullFlag flag reset.
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.NotNullFlag
// field PreventNullInsertFlag flag reset.
Expand Down
53 changes: 52 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,54 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAlterModifyColumnWithData(c *C) {
sql := "ALTER TABLE t MODIFY COLUMN c int;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[0][2], Equals, "3")
c.Assert(rs[0].Close(), IsNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)")
c.Assert(err, IsNil)
rs, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[1][2], Equals, "33")
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAlterModifyColumnToNotNullWithData(c *C) {
sql := "ALTER TABLE t MODIFY COLUMN c int not null;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[0][2], Equals, "3")
c.Assert(rs[0].Close(), IsNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, null, 44, 55)")
c.Assert(err, NotNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)")
c.Assert(err, IsNil)
rs, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[1][2], Equals, "33")
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColumn(c *C) {
sql1 := "ALTER TABLE t ADD COLUMN f INT GENERATED ALWAYS AS(a+1);"
sql2 := "ALTER TABLE t MODIFY COLUMN a tinyint;"
Expand Down Expand Up @@ -1322,12 +1370,15 @@ func (s *testStateChangeSuiteBase) prepareTestControlParallelExecSQL(c *C) (sess
func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c double default null, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
c.Assert(err, IsNil)
if len(s.preSQL) != 0 {
_, err := s.se.Execute(context.Background(), s.preSQL)
c.Assert(err, IsNil)
}
_, err = s.se.Execute(context.Background(), "insert into t values(1, 2, 3.1234, 4, 5)")
c.Assert(err, IsNil)

defer func() {
_, err := s.se.Execute(context.Background(), "drop table t")
c.Assert(err, IsNil)
Expand Down
3 changes: 3 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,10 @@ func (s *testSerialSuite) TestTableLocksEnable(c *C) {
})

tk.MustExec("lock tables t1 write")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1235 LOCK TABLES is not supported. To enable this experimental feature, set 'enable-table-lock' in the configuration file."))
checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone)
tk.MustExec("unlock tables")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1235 UNLOCK TABLES is not supported. To enable this experimental feature, set 'enable-table-lock' in the configuration file."))
}

func (s *testSerialDBSuite) TestAutoRandomOnTemporaryTable(c *C) {
Expand Down
40 changes: 36 additions & 4 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"sort"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -553,13 +554,25 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {

// IndexRangesToKVRanges converts index ranges to "KeyRange".
func IndexRangesToKVRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
return IndexRangesToKVRangesForTables(sc, []int64{tid}, idxID, ranges, fb)
return IndexRangesToKVRangesWithInterruptSignal(sc, tid, idxID, ranges, fb, nil, nil)
}

// IndexRangesToKVRangesWithInterruptSignal converts index ranges to "KeyRange".
// The process can be interrupted by set `interruptSignal` to true.
func IndexRangesToKVRangesWithInterruptSignal(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) {
return indexRangesToKVRangesForTablesWithInterruptSignal(sc, []int64{tid}, idxID, ranges, fb, memTracker, interruptSignal)
}

// IndexRangesToKVRangesForTables converts indexes ranges to "KeyRange".
func IndexRangesToKVRangesForTables(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
return indexRangesToKVRangesForTablesWithInterruptSignal(sc, tids, idxID, ranges, fb, nil, nil)
}

// IndexRangesToKVRangesForTablesWithInterruptSignal converts indexes ranges to "KeyRange".
// The process can be interrupted by set `interruptSignal` to true.
func indexRangesToKVRangesForTablesWithInterruptSignal(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) {
if fb == nil || fb.Hist == nil {
return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges)
return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges, memTracker, interruptSignal)
}
feedbackRanges := make([]*ranger.Range, 0, len(ranges))
for _, ran := range ranges {
Expand Down Expand Up @@ -644,18 +657,37 @@ func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSc
return true
}

func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range) ([]kv.KeyRange, error) {
func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) {
krs := make([]kv.KeyRange, 0, len(ranges))
for _, ran := range ranges {
const CheckSignalStep = 8
var estimatedMemUsage int64
// encodeIndexKey and EncodeIndexSeekKey is time-consuming, thus we need to
// check the interrupt signal periodically.
for i, ran := range ranges {
low, high, err := encodeIndexKey(sc, ran)
if err != nil {
return nil, err
}
if i == 0 {
estimatedMemUsage += int64(cap(low) + cap(high))
}
for _, tid := range tids {
startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low)
endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high)
if i == 0 {
estimatedMemUsage += int64(cap(startKey)) + int64(cap(endKey))
}
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
if i%CheckSignalStep == 0 {
if i == 0 && memTracker != nil {
estimatedMemUsage *= int64(len(ranges))
memTracker.Consume(estimatedMemUsage)
}
if interruptSignal != nil && interruptSignal.Load().(bool) {
return nil, nil
}
}
}
return krs, nil
}
Expand Down
18 changes: 15 additions & 3 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/errno"
Expand Down Expand Up @@ -272,6 +273,12 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) {
failpoint.Inject("mockNextRawError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mockNextRawError"))
}
})

resultSubset, err := r.resp.Next(ctx)
r.partialCount++
r.feedback.Invalidate()
Expand Down Expand Up @@ -394,9 +401,14 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
} else {
// For cop task cases, we still need this protection.
if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) {
logutil.Logger(ctx).Error("invalid cop task execution summaries length",
zap.Int("expected", len(r.copPlanIDs)),
zap.Int("received", len(r.selectResp.GetExecutionSummaries())))
// for TiFlash streaming call(BatchCop and MPP), it is by design that only the last response will
// carry the execution summaries, so it is ok if some responses have no execution summaries, should
// not trigger an error log in this case.
if !(r.storeType == kv.TiFlash && len(r.selectResp.GetExecutionSummaries()) == 0) {
logutil.Logger(ctx).Error("invalid cop task execution summaries length",
zap.Int("expected", len(r.copPlanIDs)),
zap.Int("received", len(r.selectResp.GetExecutionSummaries())))
}
return
}
for i, detail := range r.selectResp.GetExecutionSummaries() {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ error = '''
Incorrect usage of %s and %s
'''

["executor:1235"]
error = '''
%-.32s is not supported. To enable this experimental feature, set '%-.32s' in the configuration file.
'''

["executor:1242"]
error = '''
Subquery returns more than 1 row
Expand Down
Loading

0 comments on commit 3009052

Please sign in to comment.