diff --git a/executor/builder.go b/executor/builder.go index bfee62a677c2b..a2bda4b00c87d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1200,15 +1200,6 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo } } - // consider collations - leftTypes := make([]*types.FieldType, 0, len(retTypes(leftExec))) - for _, tp := range retTypes(leftExec) { - leftTypes = append(leftTypes, tp.Clone()) - } - rightTypes := make([]*types.FieldType, 0, len(retTypes(rightExec))) - for _, tp := range retTypes(rightExec) { - rightTypes = append(rightTypes, tp.Clone()) - } leftIsBuildSide := true e.isNullEQ = v.IsNullEQ @@ -1251,24 +1242,32 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo } executorCountHashJoinExec.Inc() + // We should use JoinKey to construct the type information using by hashing, instead of using the child's schema directly. + // When a hybrid type column is hashed multiple times, we need to distinguish what field types are used. + // For example, the condition `enum = int and enum = string`, we should use ETInt to hash the first column, + // and use ETString to hash the second column, although they may be the same column. + leftExecTypes, rightExecTypes := retTypes(leftExec), retTypes(rightExec) + leftTypes, rightTypes := make([]*types.FieldType, 0, len(v.LeftJoinKeys)), make([]*types.FieldType, 0, len(v.RightJoinKeys)) + for i, col := range v.LeftJoinKeys { + leftTypes = append(leftTypes, leftExecTypes[col.Index].Clone()) + leftTypes[i].Flag = col.RetType.Flag + } + for i, col := range v.RightJoinKeys { + rightTypes = append(rightTypes, rightExecTypes[col.Index].Clone()) + rightTypes[i].Flag = col.RetType.Flag + } + + // consider collations for i := range v.EqualConditions { chs, coll := v.EqualConditions[i].CharsetAndCollation(e.ctx) - bt := leftTypes[v.LeftJoinKeys[i].Index] - bt.Charset, bt.Collate = chs, coll - pt := rightTypes[v.RightJoinKeys[i].Index] - pt.Charset, pt.Collate = chs, coll + leftTypes[i].Charset, leftTypes[i].Collate = chs, coll + rightTypes[i].Charset, rightTypes[i].Collate = chs, coll } if leftIsBuildSide { e.buildTypes, e.probeTypes = leftTypes, rightTypes } else { e.buildTypes, e.probeTypes = rightTypes, leftTypes } - for _, key := range e.buildKeys { - e.buildTypes[key.Index].Flag = key.RetType.Flag - } - for _, key := range e.probeKeys { - e.probeTypes[key.Index].Flag = key.RetType.Flag - } return e } @@ -2615,6 +2614,21 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) outerTypes[col.Index].Flag = col.RetType.Flag } + // We should use JoinKey to construct the type information using by hashing, instead of using the child's schema directly. + // When a hybrid type column is hashed multiple times, we need to distinguish what field types are used. + // For example, the condition `enum = int and enum = string`, we should use ETInt to hash the first column, + // and use ETString to hash the second column, although they may be the same column. + innerHashTypes := make([]*types.FieldType, len(v.InnerHashKeys)) + outerHashTypes := make([]*types.FieldType, len(v.OuterHashKeys)) + for i, col := range v.InnerHashKeys { + innerHashTypes[i] = innerTypes[col.Index].Clone() + innerHashTypes[i].Flag = col.RetType.Flag + } + for i, col := range v.OuterHashKeys { + outerHashTypes[i] = outerTypes[col.Index].Clone() + outerHashTypes[i].Flag = col.RetType.Flag + } + var ( outerFilter []expression.Expression leftTypes, rightTypes []*types.FieldType @@ -2649,12 +2663,14 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) e := &IndexLookUpJoin{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec), outerCtx: outerCtx{ - rowTypes: outerTypes, - filter: outerFilter, + rowTypes: outerTypes, + hashTypes: outerHashTypes, + filter: outerFilter, }, innerCtx: innerCtx{ readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b}, rowTypes: innerTypes, + hashTypes: innerHashTypes, colLens: v.IdxColLens, hasPrefixCol: hasPrefixCol, }, diff --git a/executor/hash_table.go b/executor/hash_table.go index cd356d0549a5f..8d59da9855b8b 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -33,6 +33,7 @@ import ( // hashContext keeps the needed hash context of a db table in hash join. type hashContext struct { + // allTypes one-to-one correspondence with keyColIdx allTypes []*types.FieldType keyColIdx []int buf []byte @@ -80,9 +81,9 @@ type hashRowContainer struct { rowContainer *chunk.RowContainer } -func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext) *hashRowContainer { +func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { maxChunkSize := sCtx.GetSessionVars().MaxChunkSize - rc := chunk.NewRowContainer(hCtx.allTypes, maxChunkSize) + rc := chunk.NewRowContainer(allTypes, maxChunkSize) c := &hashRowContainer{ sc: sCtx.GetSessionVars().StmtCtx, hCtx: hCtx, @@ -160,7 +161,7 @@ func (c *hashRowContainer) PutChunkSelected(chk *chunk.Chunk, selected, ignoreNu hCtx := c.hCtx for keyIdx, colIdx := range c.hCtx.keyColIdx { ignoreNull := len(ignoreNulls) > keyIdx && ignoreNulls[keyIdx] - err := codec.HashChunkSelected(c.sc, hCtx.hashVals, chk, hCtx.allTypes[colIdx], colIdx, hCtx.buf, hCtx.hasNull, selected, ignoreNull) + err := codec.HashChunkSelected(c.sc, hCtx.hashVals, chk, hCtx.allTypes[keyIdx], colIdx, hCtx.buf, hCtx.hasNull, selected, ignoreNull) if err != nil { return errors.Trace(err) } diff --git a/executor/hash_table_test.go b/executor/hash_table_test.go index 3458002987f1e..5bc651f6733ba 100644 --- a/executor/hash_table_test.go +++ b/executor/hash_table_test.go @@ -118,7 +118,12 @@ func (s *pkgTestSerialSuite) testHashRowContainer(c *C, hashFunc func() hash.Has for i := 0; i < numRows; i++ { hCtx.hashVals = append(hCtx.hashVals, hashFunc()) } +<<<<<<< HEAD:executor/hash_table_test.go rowContainer := newHashRowContainer(sctx, 0, hCtx) +======= + rowContainer := newHashRowContainer(sctx, 0, hCtx, hCtx.allTypes) + copiedRC = rowContainer.ShallowCopy() +>>>>>>> fa8cbd588... executor: fix wrong result for join with enum type (#29375):executor/hash_table_serial_test.go tracker := rowContainer.GetMemTracker() tracker.SetLabel(memory.LabelForBuildSideResult) if spill { diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index ed7da54baf3f5..2ae0a4fef82ec 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -553,7 +553,7 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con } } h.Reset() - err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, hashColIdx, buf) + err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.hashTypes, hashColIdx, buf) failpoint.Inject("testIndexHashJoinBuildErr", func() { err = errors.New("mockIndexHashJoinBuildErr") }) @@ -644,7 +644,7 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task *indexHashJoinTask, h hash.Hash64, buf []byte) (matchedRows []chunk.Row, matchedRowPtr []chunk.RowPtr, err error) { h.Reset() - err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.rowTypes, iw.hashCols, buf) + err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.hashTypes, iw.hashCols, buf) if err != nil { return nil, nil, err } @@ -658,7 +658,7 @@ func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task matchedRowPtr = make([]chunk.RowPtr, 0, len(iw.matchedOuterPtrs)) for _, ptr := range iw.matchedOuterPtrs { outerRow := task.outerResult.GetRow(ptr) - ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.rowTypes, iw.keyCols, outerRow, iw.outerCtx.rowTypes, iw.outerCtx.hashCols) + ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.hashTypes, iw.hashCols, outerRow, iw.outerCtx.hashTypes, iw.outerCtx.hashCols) if err != nil { return nil, nil, err } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index d617cddf77bd7..df80c989249eb 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -85,10 +85,11 @@ type IndexLookUpJoin struct { } type outerCtx struct { - rowTypes []*types.FieldType - keyCols []int - hashCols []int - filter expression.CNFExprs + rowTypes []*types.FieldType + keyCols []int + hashTypes []*types.FieldType + hashCols []int + filter expression.CNFExprs } type innerCtx struct { @@ -96,6 +97,7 @@ type innerCtx struct { rowTypes []*types.FieldType keyCols []int keyColIDs []int64 // the original ID in its table, used by dynamic partition pruning + hashTypes []*types.FieldType hashCols []int colLens []int hasPrefixCol bool diff --git a/executor/join.go b/executor/join.go index 1a3f62de47ac1..47f23cd1bb513 100644 --- a/executor/join.go +++ b/executor/join.go @@ -571,7 +571,11 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx hCtx.initHash(probeSideChk.NumRows()) for keyIdx, i := range hCtx.keyColIdx { ignoreNull := len(e.isNullEQ) > keyIdx && e.isNullEQ[keyIdx] +<<<<<<< HEAD err = codec.HashChunkSelected(e.rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[i], i, hCtx.buf, hCtx.hasNull, selected, ignoreNull) +======= + err = codec.HashChunkSelected(rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[keyIdx], i, hCtx.buf, hCtx.hasNull, selected, ignoreNull) +>>>>>>> fa8cbd588... executor: fix wrong result for join with enum type (#29375) if err != nil { joinResult.err = err return false, joinResult @@ -612,8 +616,13 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx // join2ChunkForOuterHashJoin joins chunks when using the outer to build a hash table (refer to outer hash join) func (e *HashJoinExec) join2ChunkForOuterHashJoin(workerID uint, probeSideChk *chunk.Chunk, hCtx *hashContext, joinResult *hashjoinWorkerResult) (ok bool, _ *hashjoinWorkerResult) { hCtx.initHash(probeSideChk.NumRows()) +<<<<<<< HEAD for _, i := range hCtx.keyColIdx { err := codec.HashChunkColumns(e.rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[i], i, hCtx.buf, hCtx.hasNull) +======= + for keyIdx, i := range hCtx.keyColIdx { + err := codec.HashChunkColumns(rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[keyIdx], i, hCtx.buf, hCtx.hasNull) +>>>>>>> fa8cbd588... executor: fix wrong result for join with enum type (#29375) if err != nil { joinResult.err = err return false, joinResult @@ -653,6 +662,27 @@ func (e *HashJoinExec) join2ChunkForOuterHashJoin(workerID uint, probeSideChk *c func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { if !e.prepared { e.buildFinished = make(chan error, 1) +<<<<<<< HEAD +======= + buildKeyColIdx := make([]int, len(e.buildKeys)) + for i := range e.buildKeys { + buildKeyColIdx[i] = e.buildKeys[i].Index + } + hCtx := &hashContext{ + allTypes: e.buildTypes, + keyColIdx: buildKeyColIdx, + } + e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec)) + // we shallow copies rowContainer for each probe worker to avoid lock contention + e.rowContainerForProbe = make([]*hashRowContainer, e.concurrency) + for i := uint(0); i < e.concurrency; i++ { + if i == 0 { + e.rowContainerForProbe[i] = e.rowContainer + } else { + e.rowContainerForProbe[i] = e.rowContainer.ShallowCopy() + } + } +>>>>>>> fa8cbd588... executor: fix wrong result for join with enum type (#29375) go util.WithRecovery(func() { defer trace.StartRegion(ctx, "HashJoinHashTableBuilder").End() e.fetchAndBuildHashTable(ctx) diff --git a/expression/integration_test.go b/expression/integration_test.go index e2f620da24811..fcc5039857e96 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -10198,3 +10198,51 @@ func (s *testIntegrationSuite) TestIssue28643(c *C) { tk.MustExec("set tidb_enable_vectorized_expression = off;") tk.MustQuery("select hour(a) from t;").Check(testkit.Rows("838", "838")) } +<<<<<<< HEAD +======= + +func (s *testIntegrationSuite) TestIssue27831(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a enum(\"a\", \"b\"), b enum(\"a\", \"b\"), c bool)") + tk.MustExec("insert into t values(\"a\", \"a\", 1);") + tk.MustQuery("select * from t t1 right join t t2 on t1.a=t2.b and t1.a= t2.c;").Check(testkit.Rows("a a 1 a a 1")) + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a enum(\"a\", \"b\"), b enum(\"a\", \"b\"), c bool, d int, index idx(d))") + tk.MustExec("insert into t values(\"a\", \"a\", 1, 1);") + tk.MustQuery("select /*+ inl_hash_join(t1) */ * from t t1 right join t t2 on t1.a=t2.b and t1.a= t2.c and t1.d=t2.d;").Check(testkit.Rows("a a 1 1 a a 1 1")) +} + +func (s *testIntegrationSuite) TestIssue29434(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 datetime);") + tk.MustExec("insert into t1 values('2021-12-12 10:10:10.000');") + tk.MustExec("set tidb_enable_vectorized_expression = on;") + tk.MustQuery("select greatest(c1, '99999999999999') from t1;").Check(testkit.Rows("99999999999999")) + tk.MustExec("set tidb_enable_vectorized_expression = off;") + tk.MustQuery("select greatest(c1, '99999999999999') from t1;").Check(testkit.Rows("99999999999999")) + + tk.MustExec("set tidb_enable_vectorized_expression = on;") + tk.MustQuery("select least(c1, '99999999999999') from t1;").Check(testkit.Rows("2021-12-12 10:10:10")) + tk.MustExec("set tidb_enable_vectorized_expression = off;") + tk.MustQuery("select least(c1, '99999999999999') from t1;").Check(testkit.Rows("2021-12-12 10:10:10")) +} + +func (s *testIntegrationSuite) TestIssue29244(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a time(4));") + tk.MustExec("insert into t values(\"-700:10:10.123456111\");") + tk.MustExec("insert into t values(\"700:10:10.123456111\");") + tk.MustExec("set tidb_enable_vectorized_expression = on;") + tk.MustQuery("select microsecond(a) from t;").Check(testkit.Rows("123500", "123500")) + tk.MustExec("set tidb_enable_vectorized_expression = off;") + tk.MustQuery("select microsecond(a) from t;").Check(testkit.Rows("123500", "123500")) +} +>>>>>>> fa8cbd588... executor: fix wrong result for join with enum type (#29375) diff --git a/util/codec/codec.go b/util/codec/codec.go index 2f9ce666ed643..08ce5a527b192 100644 --- a/util/codec/codec.go +++ b/util/codec/codec.go @@ -664,8 +664,8 @@ func HashChunkSelected(sc *stmtctx.StatementContext, h []hash.Hash64, chk *chunk // If two rows are logically equal, it will generate the same bytes. func HashChunkRow(sc *stmtctx.StatementContext, w io.Writer, row chunk.Row, allTypes []*types.FieldType, colIdx []int, buf []byte) (err error) { var b []byte - for _, idx := range colIdx { - buf[0], b, err = encodeHashChunkRowIdx(sc, row, allTypes[idx], idx) + for i, idx := range colIdx { + buf[0], b, err = encodeHashChunkRowIdx(sc, row, allTypes[i], idx) if err != nil { return errors.Trace(err) } @@ -687,13 +687,16 @@ func EqualChunkRow(sc *stmtctx.StatementContext, row1 chunk.Row, allTypes1 []*types.FieldType, colIdx1 []int, row2 chunk.Row, allTypes2 []*types.FieldType, colIdx2 []int, ) (bool, error) { + if len(colIdx1) != len(colIdx2) { + return false, errors.Errorf("Internal error: Hash columns count mismatch, col1: %d, col2: %d", len(colIdx1), len(colIdx2)) + } for i := range colIdx1 { idx1, idx2 := colIdx1[i], colIdx2[i] - flag1, b1, err := encodeHashChunkRowIdx(sc, row1, allTypes1[idx1], idx1) + flag1, b1, err := encodeHashChunkRowIdx(sc, row1, allTypes1[i], idx1) if err != nil { return false, errors.Trace(err) } - flag2, b2, err := encodeHashChunkRowIdx(sc, row2, allTypes2[idx2], idx2) + flag2, b2, err := encodeHashChunkRowIdx(sc, row2, allTypes2[i], idx2) if err != nil { return false, errors.Trace(err) } diff --git a/util/codec/codec_test.go b/util/codec/codec_test.go index 204590421a91e..1271a5a242637 100644 --- a/util/codec/codec_test.go +++ b/util/codec/codec_test.go @@ -1279,9 +1279,9 @@ func TestHashChunkColumns(t *testing.T) { for i := 0; i < 12; i++ { require.True(t, chk.GetRow(0).IsNull(i)) err1 := HashChunkSelected(sc, vecHash, chk, tps[i], i, buf, hasNull, sel, false) - err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps, colIdx[i:i+1], buf) - err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps, colIdx[i:i+1], buf) - err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps, colIdx[i:i+1], buf) + err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps[i:i+1], colIdx[i:i+1], buf) + err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps[i:i+1], colIdx[i:i+1], buf) + err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps[i:i+1], colIdx[i:i+1], buf) require.NoError(t, err1) require.NoError(t, err2) require.NoError(t, err3) @@ -1304,9 +1304,9 @@ func TestHashChunkColumns(t *testing.T) { require.False(t, chk.GetRow(0).IsNull(i)) err1 := HashChunkSelected(sc, vecHash, chk, tps[i], i, buf, hasNull, sel, false) - err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps, colIdx[i:i+1], buf) - err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps, colIdx[i:i+1], buf) - err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps, colIdx[i:i+1], buf) + err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps[i:i+1], colIdx[i:i+1], buf) + err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps[i:i+1], colIdx[i:i+1], buf) + err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps[i:i+1], colIdx[i:i+1], buf) require.NoError(t, err1) require.NoError(t, err2)