From bb27b57106e78dbff22d7211b4a4cfdbe62df919 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 3 Dec 2021 15:13:54 +0800 Subject: [PATCH] cherry pick #30290 to release-5.0 Signed-off-by: ti-srebot --- executor/builder.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/executor/builder.go b/executor/builder.go index d3793cac94f22..fae7d12a78df6 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -80,6 +80,16 @@ type executorBuilder struct { snapshotTSCached bool err error // err is set when there is error happened during Executor building process. hasLock bool +<<<<<<< HEAD +======= + Ti *TelemetryInfo + // isStaleness means whether this statement use stale read. + isStaleness bool + readReplicaScope string + inUpdateStmt bool + inDeleteStmt bool + inInsertStmt bool +>>>>>>> 4fbbd5a77... executor: make projection executor unparallel for insert/update/delete (#30290) } func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder { @@ -731,6 +741,7 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor { } func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { + b.inInsertStmt = true if v.SelectPlan != nil { // Try to update the forUpdateTS for insert/replace into select statements. // Set the selectPlan parameter to nil to make it always update the forUpdateTS. @@ -1345,6 +1356,12 @@ func (b *executorBuilder) buildProjection(v *plannercore.PhysicalProjection) Exe if int64(v.StatsCount()) < int64(b.ctx.GetSessionVars().MaxChunkSize) { e.numWorkers = 0 } + + // Use un-parallel projection for query that write on memdb to avoid data race. + // See also https://github.com/pingcap/tidb/issues/26832 + if b.inUpdateStmt || b.inDeleteStmt || b.inInsertStmt || b.hasLock { + e.numWorkers = 0 + } return e } @@ -1811,6 +1828,7 @@ func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor } func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { + b.inUpdateStmt = true tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos)) multiUpdateOnSameTable := make(map[int64]bool) for _, info := range v.TblColPosInfos { @@ -1882,6 +1900,7 @@ func getAssignFlag(ctx sessionctx.Context, v *plannercore.Update, schemaLen int) } func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { + b.inDeleteStmt = true tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos)) for _, info := range v.TblColPosInfos { tblID2table[info.TblID], _ = b.is.TableByID(info.TblID)