Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support read generated columns with copr for adding index #39345

Merged
merged 5 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,9 +744,9 @@ func (b *backfillScheduler) initCopReqSenderPool() {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
copCtx := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if copCtx == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender")
copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
Expand Down
180 changes: 138 additions & 42 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -221,34 +222,145 @@ type copContext struct {
colInfos []*model.ColumnInfo
fieldTps []*types.FieldType
sessCtx sessionctx.Context

expColInfos []*expression.Column
idxColOutputOffsets []int
handleOutputOffsets []int
virtualColOffsets []int
virtualColFieldTps []*types.FieldType
}

func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext {
func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) (*copContext, error) {
var err error
usedColumnIDs := make(map[int64]struct{}, len(idxInfo.Columns))
usedColumnIDs, err = fillUsedColumns(usedColumnIDs, idxInfo, tblInfo)
var handleIDs []int64
if err != nil {
return nil, err
}
var primaryIdx *model.IndexInfo
if tblInfo.PKIsHandle {
pkCol := tblInfo.GetPkColInfo()
usedColumnIDs[pkCol.ID] = struct{}{}
handleIDs = []int64{pkCol.ID}
} else if tblInfo.IsCommonHandle {
primaryIdx = tables.FindPrimaryIndex(tblInfo)
handleIDs = make([]int64, 0, len(primaryIdx.Columns))
for _, pkCol := range primaryIdx.Columns {
col := tblInfo.Columns[pkCol.Offset]
handleIDs = append(handleIDs, col.ID)
}
usedColumnIDs, err = fillUsedColumns(usedColumnIDs, primaryIdx, tblInfo)
if err != nil {
return nil, err
}
}

// Only collect the columns that are used by the index.
colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns))
fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns))
for _, idxCol := range idxInfo.Columns {
c := tblInfo.Columns[idxCol.Offset]
if c.IsGenerated() && !c.GeneratedStored {
// TODO(tangenta): support reading virtual generated columns.
return nil
for i := range tblInfo.Columns {
col := tblInfo.Columns[i]
if _, found := usedColumnIDs[col.ID]; found {
colInfos = append(colInfos, col)
fieldTps = append(fieldTps, &col.FieldType)
}
colInfos = append(colInfos, c)
fieldTps = append(fieldTps, &c.FieldType)
}

pkColInfos, pkFieldTps, pkInfo := buildHandleColInfoAndFieldTypes(tblInfo)
colInfos = append(colInfos, pkColInfos...)
fieldTps = append(fieldTps, pkFieldTps...)
// Append the extra handle column when _tidb_rowid is used.
if !tblInfo.HasClusteredIndex() {
extra := model.NewExtraHandleColInfo()
colInfos = append(colInfos, extra)
fieldTps = append(fieldTps, &extra.FieldType)
handleIDs = []int64{extra.ID}
}

expColInfos, _, err := expression.ColumnInfos2ColumnsAndNames(sessCtx,
model.CIStr{} /* unused */, tblInfo.Name, colInfos, tblInfo)
if err != nil {
return nil, err
}
idxOffsets := resolveIndicesForIndex(expColInfos, idxInfo, tblInfo)
hdColOffsets := resolveIndicesForHandle(expColInfos, handleIDs)
vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(expColInfos)

copCtx := &copContext{
tblInfo: tblInfo,
idxInfo: idxInfo,
pkInfo: pkInfo,
pkInfo: primaryIdx,
colInfos: colInfos,
fieldTps: fieldTps,
sessCtx: sessCtx,

expColInfos: expColInfos,
idxColOutputOffsets: idxOffsets,
handleOutputOffsets: hdColOffsets,
virtualColOffsets: vColOffsets,
virtualColFieldTps: vColFts,
}
return copCtx, nil
}

func fillUsedColumns(usedCols map[int64]struct{}, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) (map[int64]struct{}, error) {
colsToChecks := make([]*model.ColumnInfo, 0, len(idxInfo.Columns))
for _, idxCol := range idxInfo.Columns {
colsToChecks = append(colsToChecks, tblInfo.Columns[idxCol.Offset])
}
for len(colsToChecks) > 0 {
next := colsToChecks[0]
colsToChecks = colsToChecks[1:]
usedCols[next.ID] = struct{}{}
for depColName := range next.Dependences {
// Expand the virtual generated columns.
depCol := model.FindColumnInfo(tblInfo.Columns, depColName)
if depCol == nil {
return nil, errors.Trace(errors.Errorf("dependent column %s not found", depColName))
}
if _, ok := usedCols[depCol.ID]; !ok {
colsToChecks = append(colsToChecks, depCol)
}
}
}
return copCtx
return usedCols, nil
}

func resolveIndicesForIndex(outputCols []*expression.Column, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) []int {
offsets := make([]int, 0, len(idxInfo.Columns))
for _, idxCol := range idxInfo.Columns {
hid := tblInfo.Columns[idxCol.Offset].ID
for j, col := range outputCols {
if col.ID == hid {
offsets = append(offsets, j)
break
}
}
}
return offsets
}

func resolveIndicesForHandle(cols []*expression.Column, handleIDs []int64) []int {
offsets := make([]int, 0, len(handleIDs))
for _, hid := range handleIDs {
for j, col := range cols {
if col.ID == hid {
offsets = append(offsets, j)
break
}
}
}
return offsets
}

func collectVirtualColumnOffsetsAndTypes(cols []*expression.Column) ([]int, []*types.FieldType) {
var offsets []int
var fts []*types.FieldType
for i, col := range cols {
if col.VirtualExpr != nil {
offsets = append(offsets, i)
fts = append(fts, col.GetType())
}
}
return offsets, fts
}

func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) {
Expand Down Expand Up @@ -284,8 +396,13 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se
return buf, true, nil
}
iter := chunk.NewIterator4Chunk(chk)
err = table.FillVirtualColumnValue(c.virtualColFieldTps, c.virtualColOffsets, c.expColInfos, c.colInfos, c.sessCtx, chk)
if err != nil {
return nil, false, errors.Trace(err)
}
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps)
idxDt := extractDatumByOffsets(row, c.idxColOutputOffsets, c.expColInfos)
hdDt := extractDatumByOffsets(row, c.handleOutputOffsets, c.expColInfos)
handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx)
if err != nil {
return nil, false, errors.Trace(err)
Expand Down Expand Up @@ -321,34 +438,13 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err
}

func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, *model.IndexInfo) {
if tbInfo.PKIsHandle {
for i := range tbInfo.Columns {
if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) {
return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType}, nil
}
}
} else if tbInfo.IsCommonHandle {
primaryIdx := tables.FindPrimaryIndex(tbInfo)
pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns))
pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns))
for _, pkCol := range primaryIdx.Columns {
pkCols = append(pkCols, tbInfo.Columns[pkCol.Offset])
pkFts = append(pkFts, &tbInfo.Columns[pkCol.Offset].FieldType)
}
return pkCols, pkFts, primaryIdx
}
extra := model.NewExtraHandleColInfo()
return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType}, nil
}

func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) {
datumBuf := make([]types.Datum, 0, len(fieldTps))
idxColLen := len(idxInfo.Columns)
for i, ft := range fieldTps {
datumBuf = append(datumBuf, row.GetDatum(i, ft))
func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.Column) []types.Datum {
datumBuf := make([]types.Datum, 0, len(offsets))
for _, offset := range offsets {
c := expCols[offset]
datumBuf = append(datumBuf, row.GetDatum(offset, c.GetType()))
}
return datumBuf[:idxColLen], datumBuf[idxColLen:]
return datumBuf
}

func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
Expand Down
3 changes: 2 additions & 1 deletion ddl/index_cop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName(idx)
copCtx := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session())
copCtx, err := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session())
require.NoError(t, err)
startKey := tbl.RecordPrefix()
endKey := startKey.PrefixNext()
txn, err := store.Begin()
Expand Down
7 changes: 7 additions & 0 deletions ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ingest

import (
"path/filepath"
"sync/atomic"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand All @@ -26,12 +27,18 @@ import (
"go.uber.org/zap"
)

// ImporterRangeConcurrencyForTest is only used for test.
var ImporterRangeConcurrencyForTest *atomic.Int32

func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config.Config, error) {
tidbCfg := tidbconf.GetGlobalConfig()
cfg := config.NewConfig()
cfg.TikvImporter.Backend = config.BackendLocal
// Each backend will build a single dir in lightning dir.
cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, encodeBackendTag(jobID))
if ImporterRangeConcurrencyForTest != nil {
cfg.TikvImporter.RangeConcurrency = int(ImporterRangeConcurrencyForTest.Load())
}
_, err := cfg.AdjustCommon()
if err != nil {
logutil.BgLogger().Warn(LitWarnConfigError, zap.Error(err))
Expand Down
50 changes: 50 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) {
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
ingest.ImporterRangeConcurrencyForTest = &atomic.Int32{}
ingest.ImporterRangeConcurrencyForTest.Store(2)
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 20;")
tk.MustExec("create table t (a int primary key);")
var sb strings.Builder
Expand All @@ -205,4 +207,52 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) {
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;")
ingest.ImporterRangeConcurrencyForTest = nil
}

func TestAddIndexIngestGeneratedColumns(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
assertLastNDDLUseIngest := func(n int) {
tk.MustExec("admin check table t;")
rows := tk.MustQuery(fmt.Sprintf("admin show ddl jobs %d;", n)).Rows()
require.Len(t, rows, n)
for i := 0; i < n; i++ {
jobTp := rows[i][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}
}
tk.MustExec("create table t (a int, b int, c int as (b+10), d int as (b+c), primary key (a) clustered);")
tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3);")
tk.MustExec("alter table t add index idx(c);")
tk.MustExec("alter table t add index idx1(c, a);")
tk.MustExec("alter table t add index idx2(a);")
tk.MustExec("alter table t add index idx3(d);")
tk.MustExec("alter table t add index idx4(d, c);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 11 12", "2 2 12 14", "3 3 13 16"))
assertLastNDDLUseIngest(5)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b char(10), c char(10) as (concat(b, 'x')), d int, e char(20) as (c));")
tk.MustExec("insert into t (a, b, d) values (1, '1', 1), (2, '2', 2), (3, '3', 3);")
tk.MustExec("alter table t add index idx(c);")
tk.MustExec("alter table t add index idx1(a, c);")
tk.MustExec("alter table t add index idx2(c(7));")
tk.MustExec("alter table t add index idx3(e(5));")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1x 1 1x", "2 2 2x 2 2x", "3 3 3x 3 3x"))
assertLastNDDLUseIngest(4)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b char(10), c tinyint, d int as (a + c), e bigint as (d - a), primary key(b, a) clustered);")
tk.MustExec("insert into t (a, b, c) values (1, '1', 1), (2, '2', 2), (3, '3', 3);")
tk.MustExec("alter table t add index idx(d);")
tk.MustExec("alter table t add index idx1(b(2), d);")
tk.MustExec("alter table t add index idx2(d, c);")
tk.MustExec("alter table t add index idx3(e);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1 2 1", "2 2 2 4 2", "3 3 3 6 3"))
assertLastNDDLUseIngest(4)
}