diff --git a/ddl/db_test.go b/ddl/db_test.go index 5990df4634a68..4685cc24713f4 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1763,3 +1763,15 @@ func TestDDLBlockedCreateView(t *testing.T) { dom.DDL().SetHook(hook) tk.MustExec("alter table t modify column a char(10)") } + +func TestMDLPutETCDFail(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/putEtcdFailed", `return(true)`)) + defer require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/putEtcdFailed")) + tk.MustExec("alter table t add column b int") +} diff --git a/ddl/mock.go b/ddl/mock.go index 57a60794f514a..9475a5c34ee2e 100644 --- a/ddl/mock.go +++ b/ddl/mock.go @@ -66,6 +66,12 @@ func (s *MockSchemaSyncer) WatchGlobalSchemaVer(context.Context) {} // UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface. func (s *MockSchemaSyncer) UpdateSelfVersion(ctx context.Context, jobID int64, version int64) error { + failpoint.Inject("putEtcdFailed", func() { + if mockDDLErrOnce < 3 { + mockDDLErrOnce++ + failpoint.Return(errors.New("mock putEtcdFailed")) + } + }) if variable.EnableMDL.Load() { s.mdlSchemaVersions.Store(jobID, version) } else { diff --git a/domain/domain.go b/domain/domain.go index c68f6c1349d31..8d433ed1d7184 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -727,6 +727,7 @@ func (do *Domain) mdlCheckLoop() { err := do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), jobID, ver) if err != nil { logutil.BgLogger().Warn("update self version failed", zap.Error(err)) + jobNeedToSync = true } else { jobCache[jobID] = ver } diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 542ba5d5f963c..0968c629b4b0d 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -906,10 +906,14 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) joinSchema.Append(cols1...) } - joinKeysColIdx := make([]int, 0, len(testCase.keyIdx)) - joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...) - probeKeysColIdx := make([]int, 0, len(testCase.keyIdx)) - probeKeysColIdx = append(probeKeysColIdx, testCase.keyIdx...) + joinKeys := make([]*expression.Column, 0, len(testCase.keyIdx)) + for _, keyIdx := range testCase.keyIdx { + joinKeys = append(joinKeys, cols0[keyIdx]) + } + probeKeys := make([]*expression.Column, 0, len(testCase.keyIdx)) + for _, keyIdx := range testCase.keyIdx { + probeKeys = append(probeKeys, cols1[keyIdx]) + } e := &HashJoinExec{ baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec), hashJoinCtx: &hashJoinCtx{ @@ -917,31 +921,26 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) isOuterJoin: false, useOuterToBuild: testCase.useOuterToBuild, concurrency: uint(testCase.concurrency), - probeTypes: retTypes(outerExec), - buildTypes: retTypes(innerExec), }, probeSideTupleFetcher: &probeSideTupleFetcher{ probeSideExec: outerExec, }, - probeWorkers: make([]*probeWorker, testCase.concurrency), - buildWorker: &buildWorker{ - buildKeyColIdx: joinKeysColIdx, - buildSideExec: innerExec, - }, + probeWorkers: make([]probeWorker, testCase.concurrency), + buildKeys: joinKeys, + probeKeys: probeKeys, + buildSideExec: innerExec, + buildSideEstCount: float64(testCase.rows), } childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema()) - defaultValues := make([]types.Datum, e.buildWorker.buildSideExec.Schema().Len()) + defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len()) lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec) for i := uint(0); i < e.concurrency; i++ { - e.probeWorkers[i] = &probeWorker{ - workerID: i, - sessCtx: e.ctx, - hashJoinCtx: e.hashJoinCtx, - joiner: newJoiner(testCase.ctx, e.joinType, true, defaultValues, - nil, lhsTypes, rhsTypes, childrenUsedSchema, false), - probeKeyColIdx: probeKeysColIdx, - } + e.probeWorkers[i].workerID = i + e.probeWorkers[i].sessCtx = e.ctx + e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx + e.probeWorkers[i].joiner = newJoiner(testCase.ctx, e.joinType, true, defaultValues, + nil, lhsTypes, rhsTypes, childrenUsedSchema, false) } memLimit := int64(-1) if testCase.disk { diff --git a/executor/builder.go b/executor/builder.go index 7b3ca2d192983..af7817764f4eb 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1400,6 +1400,17 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu return e } +func (b *executorBuilder) buildSideEstCount(v *plannercore.PhysicalHashJoin) float64 { + buildSide := v.Children()[v.InnerChildIdx] + if v.UseOuterToBuild { + buildSide = v.Children()[1-v.InnerChildIdx] + } + if buildSide.Stats().HistColl == nil || buildSide.Stats().HistColl.Pseudo { + return 0.0 + } + return buildSide.StatsCount() +} + func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executor { leftExec := b.build(v.Children()[0]) if b.err != nil { @@ -1414,8 +1425,6 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo e := &HashJoinExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec), probeSideTupleFetcher: &probeSideTupleFetcher{}, - probeWorkers: make([]*probeWorker, v.Concurrency), - buildWorker: &buildWorker{}, hashJoinCtx: &hashJoinCtx{ isOuterJoin: v.JoinType.IsOuterJoin(), useOuterToBuild: v.UseOuterToBuild, @@ -1440,17 +1449,15 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo leftIsBuildSide := true e.isNullEQ = v.IsNullEQ - var probeKeys, probeNAKeys, buildKeys, buildNAKeys []*expression.Column - var buildSideExec Executor if v.UseOuterToBuild { // update the buildSideEstCount due to changing the build side if v.InnerChildIdx == 1 { - buildSideExec, buildKeys, buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys e.outerFilter = v.LeftConditions } else { - buildSideExec, buildKeys, buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys e.outerFilter = v.RightConditions leftIsBuildSide = false } @@ -1459,48 +1466,30 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo } } else { if v.InnerChildIdx == 0 { - buildSideExec, buildKeys, buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys e.outerFilter = v.RightConditions } else { - buildSideExec, buildKeys, buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys e.outerFilter = v.LeftConditions leftIsBuildSide = false } if defaultValues == nil { - defaultValues = make([]types.Datum, buildSideExec.Schema().Len()) + defaultValues = make([]types.Datum, e.buildSideExec.Schema().Len()) } } - probeKeyColIdx := make([]int, len(probeKeys)) - probeNAKeColIdx := make([]int, len(probeNAKeys)) - buildKeyColIdx := make([]int, len(buildKeys)) - buildNAKeyColIdx := make([]int, len(buildNAKeys)) - for i := range buildKeys { - buildKeyColIdx[i] = buildKeys[i].Index - } - for i := range buildNAKeys { - buildNAKeyColIdx[i] = buildNAKeys[i].Index - } - for i := range probeKeys { - probeKeyColIdx[i] = probeKeys[i].Index - } - for i := range probeNAKeys { - probeNAKeColIdx[i] = probeNAKeys[i].Index - } isNAJoin := len(v.LeftNAJoinKeys) > 0 + e.buildSideEstCount = b.buildSideEstCount(v) childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) + e.probeWorkers = make([]probeWorker, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.probeWorkers[i] = &probeWorker{ - hashJoinCtx: e.hashJoinCtx, - workerID: i, - sessCtx: e.ctx, - joiner: newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin), - probeKeyColIdx: probeKeyColIdx, - probeNAKeyColIdx: probeNAKeColIdx, - } + e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx + e.probeWorkers[i].workerID = i + e.probeWorkers[i].sessCtx = e.ctx + e.probeWorkers[i].joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, + v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin) } - e.buildWorker.buildKeyColIdx, e.buildWorker.buildNAKeyColIdx, e.buildWorker.buildSideExec = buildKeyColIdx, buildNAKeyColIdx, buildSideExec e.hashJoinCtx.isNullAware = isNAJoin executorCountHashJoinExec.Inc() diff --git a/executor/hash_table.go b/executor/hash_table.go index 2ba840d04fdc9..b7c875148bffa 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -117,7 +117,7 @@ type hashRowContainer struct { chkBuf *chunk.Chunk } -func newHashRowContainer(sCtx sessionctx.Context, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { +func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { maxChunkSize := sCtx.GetSessionVars().MaxChunkSize rc := chunk.NewRowContainer(allTypes, maxChunkSize) c := &hashRowContainer{ diff --git a/executor/hash_table_test.go b/executor/hash_table_test.go index 0a387e0e7e5b6..3b4a4acee5284 100644 --- a/executor/hash_table_test.go +++ b/executor/hash_table_test.go @@ -127,7 +127,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool) for i := 0; i < numRows; i++ { hCtx.hashVals = append(hCtx.hashVals, hashFunc()) } - rowContainer := newHashRowContainer(sctx, hCtx, colTypes) + rowContainer := newHashRowContainer(sctx, 0, hCtx, colTypes) copiedRC = rowContainer.ShallowCopy() tracker := rowContainer.GetMemTracker() tracker.SetLabel(memory.LabelForBuildSideResult) diff --git a/executor/join.go b/executor/join.go index 214e2edb1d440..91cc73e8e1e0e 100644 --- a/executor/join.go +++ b/executor/join.go @@ -80,10 +80,8 @@ type probeSideTupleFetcher struct { type probeWorker struct { hashJoinCtx *hashJoinCtx sessCtx sessionctx.Context - workerID uint - probeKeyColIdx []int - probeNAKeyColIdx []int + workerID uint // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently buildSideRows []chunk.Row buildSideRowPtrs []chunk.RowPtr @@ -101,20 +99,20 @@ type probeWorker struct { probeResultCh chan *chunk.Chunk } -type buildWorker struct { - buildSideExec Executor - buildKeyColIdx []int - buildNAKeyColIdx []int -} - // HashJoinExec implements the hash join algorithm. type HashJoinExec struct { baseExecutor *hashJoinCtx probeSideTupleFetcher *probeSideTupleFetcher - probeWorkers []*probeWorker - buildWorker *buildWorker + probeWorkers []probeWorker + + buildSideExec Executor + buildSideEstCount float64 + probeKeys []*expression.Column + probeNAKeys []*expression.Column + buildKeys []*expression.Column + buildNAKeys []*expression.Column worker util.WaitGroupWrapper waiter util.WaitGroupWrapper @@ -209,6 +207,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error { e.closeCh = make(chan struct{}) e.finished.Store(false) + if e.probeTypes == nil { + e.probeTypes = retTypes(e.probeSideTupleFetcher.probeSideExec) + } + if e.buildTypes == nil { + e.buildTypes = retTypes(e.buildSideExec) + } if e.runtimeStats != nil { e.stats = &hashJoinRuntimeStats{ concurrent: int(e.concurrency), @@ -309,8 +313,8 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu if e.finished.Load() { return } - chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildWorker.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) - err = Next(ctx, e.buildWorker.buildSideExec, chk) + chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) + err = Next(ctx, e.buildSideExec, chk) if err != nil { errCh <- errors.Trace(err) return @@ -371,11 +375,19 @@ func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { e.probeSideTupleFetcher.fetchProbeSideChunks(ctx, e.maxChunkSize) }, e.probeSideTupleFetcher.handleProbeSideFetcherPanic) + probeKeyColIdx := make([]int, len(e.probeKeys)) + probeNAKeColIdx := make([]int, len(e.probeNAKeys)) + for i := range e.probeKeys { + probeKeyColIdx[i] = e.probeKeys[i].Index + } + for i := range e.probeNAKeys { + probeNAKeColIdx[i] = e.probeNAKeys[i].Index + } for i := uint(0); i < e.concurrency; i++ { workerID := i e.worker.RunWithRecover(func() { defer trace.StartRegion(ctx, "HashJoinWorker").End() - e.probeWorkers[workerID].runJoinWorker() + e.probeWorkers[workerID].runJoinWorker(probeKeyColIdx, probeNAKeColIdx) }, e.probeWorkers[workerID].handleProbeWorkerPanic) } e.waiter.RunWithRecover(e.waitJoinWorkersAndCloseResultChan, nil) @@ -451,7 +463,7 @@ func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() { close(e.joinResultCh) } -func (w *probeWorker) runJoinWorker() { +func (w *probeWorker) runJoinWorker(probeKeyColIdx, probeNAKeyColIdx []int) { probeTime := int64(0) if w.hashJoinCtx.stats != nil { start := time.Now() @@ -478,8 +490,8 @@ func (w *probeWorker) runJoinWorker() { } hCtx := &hashContext{ allTypes: w.hashJoinCtx.probeTypes, - keyColIdx: w.probeKeyColIdx, - naKeyColIdx: w.probeNAKeyColIdx, + keyColIdx: probeKeyColIdx, + naKeyColIdx: probeNAKeyColIdx, } for ok := true; ok; { if w.hashJoinCtx.finished.Load() { @@ -1093,12 +1105,20 @@ func (w *probeWorker) join2ChunkForOuterHashJoin(probeSideChk *chunk.Chunk, hCtx func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { if !e.prepared { e.buildFinished = make(chan error, 1) + buildKeyColIdx := make([]int, len(e.buildKeys)) + for i := range e.buildKeys { + buildKeyColIdx[i] = e.buildKeys[i].Index + } + buildNAKeyColIdx := make([]int, len(e.buildNAKeys)) + for i := range e.buildNAKeys { + buildNAKeyColIdx[i] = e.buildNAKeys[i].Index + } hCtx := &hashContext{ allTypes: e.buildTypes, - keyColIdx: e.buildWorker.buildKeyColIdx, - naKeyColIdx: e.buildWorker.buildNAKeyColIdx, + keyColIdx: buildKeyColIdx, + naKeyColIdx: buildNAKeyColIdx, } - e.rowContainer = newHashRowContainer(e.ctx, hCtx, retTypes(e.buildWorker.buildSideExec)) + e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec)) // we shallow copies rowContainer for each probe worker to avoid lock contention for i := uint(0); i < e.concurrency; i++ { if i == 0 {