From b4d2ddb8b1c8ecc46b7c1c496a56728e07c6e27d Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 23 Nov 2022 18:17:12 +0800 Subject: [PATCH 1/4] ddl: support read generated columns with copr for adding index --- ddl/backfilling.go | 6 +- ddl/index_cop.go | 172 +++++++++++++----- ddl/index_cop_test.go | 3 +- .../addindextest/integration_test.go | 47 +++++ 4 files changed, 182 insertions(+), 46 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 35abc16cd1a6f..0c06fa0fab551 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -744,9 +744,9 @@ func (b *backfillScheduler) initCopReqSenderPool() { logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) return } - copCtx := newCopContext(b.tbl.Meta(), indexInfo, sessCtx) - if copCtx == nil { - logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender") + copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx) + if err != nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) return } ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index c1229ae1f7a1e..ba17f6445b6ee 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -21,13 +21,14 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -221,34 +222,137 @@ type copContext struct { colInfos []*model.ColumnInfo fieldTps []*types.FieldType sessCtx sessionctx.Context + + expColInfos []*expression.Column + idxColOutputOffsets []int + handleOutputOffsets []int + virtualColOffsets []int + virtualColFieldTps []*types.FieldType } -func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext { +func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) (*copContext, error) { + var err error + usedColumnIDs := make(map[int64]struct{}, len(idxInfo.Columns)) + usedColumnIDs, err = fillUsedColumns(usedColumnIDs, idxInfo, tblInfo) + var handleIDs []int64 + if err != nil { + return nil, err + } + var primaryIdx *model.IndexInfo + if tblInfo.PKIsHandle { + usedColumnIDs[tblInfo.GetPkColInfo().ID] = struct{}{} + handleIDs = []int64{tblInfo.GetPkColInfo().ID} + } else if tblInfo.IsCommonHandle { + primaryIdx = tables.FindPrimaryIndex(tblInfo) + handleIDs = make([]int64, 0, len(primaryIdx.Columns)) + for _, pkCol := range primaryIdx.Columns { + col := tblInfo.Columns[pkCol.Offset] + handleIDs = append(handleIDs, col.ID) + } + usedColumnIDs, err = fillUsedColumns(usedColumnIDs, primaryIdx, tblInfo) + } + + // Only collect the columns that are used by the index. colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns)) fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns)) - for _, idxCol := range idxInfo.Columns { - c := tblInfo.Columns[idxCol.Offset] - if c.IsGenerated() && !c.GeneratedStored { - // TODO(tangenta): support reading virtual generated columns. - return nil + for _, c := range tblInfo.Columns { + if _, found := usedColumnIDs[c.ID]; found { + colInfos = append(colInfos, c) + fieldTps = append(fieldTps, &c.FieldType) } - colInfos = append(colInfos, c) - fieldTps = append(fieldTps, &c.FieldType) } - pkColInfos, pkFieldTps, pkInfo := buildHandleColInfoAndFieldTypes(tblInfo) - colInfos = append(colInfos, pkColInfos...) - fieldTps = append(fieldTps, pkFieldTps...) + // Append the extra handle column when _tidb_rowid is used. + if !tblInfo.HasClusteredIndex() { + extra := model.NewExtraHandleColInfo() + colInfos = append(colInfos, extra) + fieldTps = append(fieldTps, &extra.FieldType) + handleIDs = []int64{extra.ID} + } + + expColInfos, _, err := expression.ColumnInfos2ColumnsAndNames(sessCtx, + model.CIStr{} /* unused */, tblInfo.Name, colInfos, tblInfo) + idxOffsets := resolveIndicesForIndex(expColInfos, idxInfo, tblInfo) + hdColOffsets := resolveIndicesForHandle(expColInfos, handleIDs) + vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(expColInfos) copCtx := &copContext{ tblInfo: tblInfo, idxInfo: idxInfo, - pkInfo: pkInfo, + pkInfo: primaryIdx, colInfos: colInfos, fieldTps: fieldTps, sessCtx: sessCtx, + + expColInfos: expColInfos, + idxColOutputOffsets: idxOffsets, + handleOutputOffsets: hdColOffsets, + virtualColOffsets: vColOffsets, + virtualColFieldTps: vColFts, } - return copCtx + return copCtx, nil +} + +func fillUsedColumns(usedCols map[int64]struct{}, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) (map[int64]struct{}, error) { + colsToChecks := make([]*model.ColumnInfo, 0, len(idxInfo.Columns)) + for _, idxCol := range idxInfo.Columns { + colsToChecks = append(colsToChecks, tblInfo.Columns[idxCol.Offset]) + } + for len(colsToChecks) > 0 { + next := colsToChecks[0] + colsToChecks = colsToChecks[1:] + usedCols[next.ID] = struct{}{} + for depColName := range next.Dependences { + // Expand the virtual generated columns. + depCol := model.FindColumnInfo(tblInfo.Columns, depColName) + if depCol == nil { + return nil, errors.Trace(errors.Errorf("dependent column %s not found", depColName)) + } + if _, ok := usedCols[depCol.ID]; !ok { + colsToChecks = append(colsToChecks, depCol) + } + } + } + return usedCols, nil +} + +func resolveIndicesForIndex(outputCols []*expression.Column, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) []int { + offsets := make([]int, 0, len(idxInfo.Columns)) + for _, idxCol := range idxInfo.Columns { + hid := tblInfo.Columns[idxCol.Offset].ID + for j, col := range outputCols { + if col.ID == hid { + offsets = append(offsets, j) + break + } + } + } + return offsets +} + +func resolveIndicesForHandle(cols []*expression.Column, handleIDs []int64) []int { + offsets := make([]int, 0, len(handleIDs)) + for _, hid := range handleIDs { + for j, col := range cols { + if col.ID == hid { + offsets = append(offsets, j) + break + } + } + } + return offsets +} + +func collectVirtualColumnOffsetsAndTypes(cols []*expression.Column) ([]int, []*types.FieldType) { + var offsets []int + var fts []*types.FieldType + for i, col := range cols { + if col.VirtualExpr != nil { + offsets = append(offsets, i) + fts = append(fts, col.GetType()) + } + } + return offsets, fts } func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) { @@ -284,8 +388,13 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se return buf, true, nil } iter := chunk.NewIterator4Chunk(chk) + err = table.FillVirtualColumnValue(c.virtualColFieldTps, c.virtualColOffsets, c.expColInfos, c.colInfos, c.sessCtx, chk) + if err != nil { + return nil, false, errors.Trace(err) + } for row := iter.Begin(); row != iter.End(); row = iter.Next() { - idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) + idxDt := extractDatumByOffsets(row, c.idxColOutputOffsets, c.expColInfos) + hdDt := extractDatumByOffsets(row, c.handleOutputOffsets, c.expColInfos) handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx) if err != nil { return nil, false, errors.Trace(err) @@ -321,34 +430,13 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err } -func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, *model.IndexInfo) { - if tbInfo.PKIsHandle { - for i := range tbInfo.Columns { - if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) { - return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType}, nil - } - } - } else if tbInfo.IsCommonHandle { - primaryIdx := tables.FindPrimaryIndex(tbInfo) - pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns)) - pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns)) - for _, pkCol := range primaryIdx.Columns { - pkCols = append(pkCols, tbInfo.Columns[pkCol.Offset]) - pkFts = append(pkFts, &tbInfo.Columns[pkCol.Offset].FieldType) - } - return pkCols, pkFts, primaryIdx - } - extra := model.NewExtraHandleColInfo() - return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType}, nil -} - -func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) { - datumBuf := make([]types.Datum, 0, len(fieldTps)) - idxColLen := len(idxInfo.Columns) - for i, ft := range fieldTps { - datumBuf = append(datumBuf, row.GetDatum(i, ft)) +func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.Column) []types.Datum { + datumBuf := make([]types.Datum, 0, len(offsets)) + for _, offset := range offsets { + c := expCols[offset] + datumBuf = append(datumBuf, row.GetDatum(offset, c.GetType())) } - return datumBuf[:idxColLen], datumBuf[idxColLen:] + return datumBuf } func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go index 333afa997d3bc..56bdc9297d95c 100644 --- a/ddl/index_cop_test.go +++ b/ddl/index_cop_test.go @@ -37,7 +37,8 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { require.NoError(t, err) tblInfo := tbl.Meta() idxInfo := tblInfo.FindIndexByName(idx) - copCtx := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session()) + copCtx, err := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session()) + require.NoError(t, err) startKey := tbl.RecordPrefix() endKey := startKey.PrefixNext() txn, err := store.Begin() diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 7427f935c78ca..001e6f7d04c3e 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -206,3 +206,50 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { require.True(t, strings.Contains(jobTp, "ingest"), jobTp) tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;") } + +func TestAddIndexIngestGeneratedColumns(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + assertLastNDDLUseIngest := func(n int) { + tk.MustExec("admin check table t;") + rows := tk.MustQuery(fmt.Sprintf("admin show ddl jobs %d;", n)).Rows() + require.Len(t, rows, n) + for i := 0; i < n; i++ { + jobTp := rows[i][3].(string) + require.True(t, strings.Contains(jobTp, "ingest"), jobTp) + } + } + tk.MustExec("create table t (a int, b int, c int as (b+10), d int as (b+c), primary key (a) clustered);") + tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3);") + tk.MustExec("alter table t add index idx(c);") + tk.MustExec("alter table t add index idx1(c, a);") + tk.MustExec("alter table t add index idx2(a);") + tk.MustExec("alter table t add index idx3(d);") + tk.MustExec("alter table t add index idx4(d, c);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 11 12", "2 2 12 14", "3 3 13 16")) + assertLastNDDLUseIngest(5) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b char(10), c char(10) as (concat(b, 'x')), d int, e char(20) as (c));") + tk.MustExec("insert into t (a, b, d) values (1, '1', 1), (2, '2', 2), (3, '3', 3);") + tk.MustExec("alter table t add index idx(c);") + tk.MustExec("alter table t add index idx1(a, c);") + tk.MustExec("alter table t add index idx2(c(7));") + tk.MustExec("alter table t add index idx3(e(5));") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1x 1 1x", "2 2 2x 2 2x", "3 3 3x 3 3x")) + assertLastNDDLUseIngest(4) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b char(10), c tinyint, d int as (a + c), e bigint as (d - a), primary key(b, a) clustered);") + tk.MustExec("insert into t (a, b, c) values (1, '1', 1), (2, '2', 2), (3, '3', 3);") + tk.MustExec("alter table t add index idx(d);") + tk.MustExec("alter table t add index idx1(b(2), d);") + tk.MustExec("alter table t add index idx2(d, c);") + tk.MustExec("alter table t add index idx3(e);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1 2 1", "2 2 2 4 2", "3 3 3 6 3")) + assertLastNDDLUseIngest(4) +} From ad456d15160fb48effa107e6215e555d905a6b45 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 23 Nov 2022 19:03:51 +0800 Subject: [PATCH 2/4] fix nogo linter --- ddl/index_cop.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index ba17f6445b6ee..94c16135afea9 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -250,15 +250,19 @@ func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx s handleIDs = append(handleIDs, col.ID) } usedColumnIDs, err = fillUsedColumns(usedColumnIDs, primaryIdx, tblInfo) + if err != nil { + return nil, err + } } // Only collect the columns that are used by the index. colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns)) fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns)) - for _, c := range tblInfo.Columns { - if _, found := usedColumnIDs[c.ID]; found { - colInfos = append(colInfos, c) - fieldTps = append(fieldTps, &c.FieldType) + for i := range tblInfo.Columns { + col := tblInfo.Columns[i] + if _, found := usedColumnIDs[col.ID]; found { + colInfos = append(colInfos, col) + fieldTps = append(fieldTps, &col.FieldType) } } @@ -272,6 +276,9 @@ func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx s expColInfos, _, err := expression.ColumnInfos2ColumnsAndNames(sessCtx, model.CIStr{} /* unused */, tblInfo.Name, colInfos, tblInfo) + if err != nil { + return nil, err + } idxOffsets := resolveIndicesForIndex(expColInfos, idxInfo, tblInfo) hdColOffsets := resolveIndicesForHandle(expColInfos, handleIDs) vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(expColInfos) From 131ce10879d8652a855fd9142f70cc9e0452e78f Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 24 Nov 2022 10:34:19 +0800 Subject: [PATCH 3/4] refine code and test --- ddl/index_cop.go | 5 +++-- ddl/ingest/config.go | 6 ++++++ tests/realtikvtest/addindextest/integration_test.go | 3 +++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 94c16135afea9..c5c2476ade5a5 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -240,8 +240,9 @@ func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx s } var primaryIdx *model.IndexInfo if tblInfo.PKIsHandle { - usedColumnIDs[tblInfo.GetPkColInfo().ID] = struct{}{} - handleIDs = []int64{tblInfo.GetPkColInfo().ID} + pkCol := tblInfo.GetPkColInfo() + usedColumnIDs[pkCol.ID] = struct{}{} + handleIDs = []int64{pkCol.ID} } else if tblInfo.IsCommonHandle { primaryIdx = tables.FindPrimaryIndex(tblInfo) handleIDs = make([]int64, 0, len(primaryIdx.Columns)) diff --git a/ddl/ingest/config.go b/ddl/ingest/config.go index 3a96e8ae5201b..ebaeb6eafc851 100644 --- a/ddl/ingest/config.go +++ b/ddl/ingest/config.go @@ -16,6 +16,7 @@ package ingest import ( "path/filepath" + "sync/atomic" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -26,12 +27,17 @@ import ( "go.uber.org/zap" ) +var ImporterRangeConcurrencyForTest *atomic.Int32 + func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config.Config, error) { tidbCfg := tidbconf.GetGlobalConfig() cfg := config.NewConfig() cfg.TikvImporter.Backend = config.BackendLocal // Each backend will build a single dir in lightning dir. cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, encodeBackendTag(jobID)) + if ImporterRangeConcurrencyForTest != nil { + cfg.TikvImporter.RangeConcurrency = int(ImporterRangeConcurrencyForTest.Load()) + } _, err := cfg.AdjustCommon() if err != nil { logutil.BgLogger().Warn(LitWarnConfigError, zap.Error(err)) diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 001e6f7d04c3e..70da49e58364b 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -187,6 +187,8 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + ingest.ImporterRangeConcurrencyForTest = &atomic.Int32{} + ingest.ImporterRangeConcurrencyForTest.Store(2) tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 20;") tk.MustExec("create table t (a int primary key);") var sb strings.Builder @@ -205,6 +207,7 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { jobTp := rows[0][3].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;") + ingest.ImporterRangeConcurrencyForTest = nil } func TestAddIndexIngestGeneratedColumns(t *testing.T) { From 6bebf559e074793ec41ddd58ef8ab5fc5a7ea4b1 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 24 Nov 2022 10:58:57 +0800 Subject: [PATCH 4/4] fix linter --- ddl/ingest/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/ingest/config.go b/ddl/ingest/config.go index ebaeb6eafc851..e9c1458b1ab0a 100644 --- a/ddl/ingest/config.go +++ b/ddl/ingest/config.go @@ -27,6 +27,7 @@ import ( "go.uber.org/zap" ) +// ImporterRangeConcurrencyForTest is only used for test. var ImporterRangeConcurrencyForTest *atomic.Int32 func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config.Config, error) {