Skip to content

Commit

Permalink
Merge branch 'master' into view-hint1
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 authored Nov 14, 2022
2 parents 4ef74cd + 86d3b46 commit cc09a5d
Show file tree
Hide file tree
Showing 94 changed files with 12,619 additions and 10,619 deletions.
8 changes: 6 additions & 2 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestPrepareCacheWithBinding(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`set tidb_enable_prepared_plan_cache=1`)
tk.MustExec("set tidb_cost_model_version=2")
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a int, b int, c int, key idx_b(b), key idx_c(c))")
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestPrepareCacheWithBinding(t *testing.T) {
ps = []*util.ProcessInfo{tkProcess}
tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps})
res = tk.MustQuery("explain for connection " + strconv.FormatUint(tkProcess.ID, 10))
require.False(t, tk.HasPlan4ExplainFor(res, "IndexReader"))
require.True(t, tk.HasPlan4ExplainFor(res, "IndexReader"))
tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

Expand Down Expand Up @@ -297,6 +298,7 @@ func TestExplain(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_cost_model_version=2")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(id int)")
Expand All @@ -313,7 +315,7 @@ func TestExplain(t *testing.T) {

// Add test for SetOprStmt
tk.MustExec("create index index_id on t1(id)")
require.False(t, tk.HasPlan("SELECT * from t1 union SELECT * from t1", "IndexReader"))
require.True(t, tk.HasPlan("SELECT * from t1 union SELECT * from t1", "IndexReader"))
require.True(t, tk.HasPlan("SELECT * from t1 use index(index_id) union SELECT * from t1", "IndexReader"))

tk.MustExec("create global binding for SELECT * from t1 union SELECT * from t1 using SELECT * from t1 use index(index_id) union SELECT * from t1")
Expand Down Expand Up @@ -874,6 +876,7 @@ func TestNotEvolvePlanForReadStorageHint(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_cost_model_version=2")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx_a(a), index idx_b(b))")
tk.MustExec("insert into t values (1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10)")
Expand Down Expand Up @@ -918,6 +921,7 @@ func TestBindingWithIsolationRead(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_cost_model_version=2")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx_a(a), index idx_b(b))")
tk.MustExec("insert into t values (1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10)")
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,12 @@ func (pp *ParquetParser) SetLogger(l log.Logger) {
pp.logger = l
}

// SetRowID sets the rowID in a parquet file when we start a compressed file.
// It implements the Parser interface.
func (pp *ParquetParser) SetRowID(rowID int64) {
pp.lastRow.RowID = rowID
}

func jdToTime(jd int32, nsec int64) time.Time {
sec := int64(jd-jan011970) * secPerDay
// it's fine not to check the value of nsec
Expand Down
27 changes: 27 additions & 0 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type Parser interface {
SetColumns([]string)

SetLogger(log.Logger)

SetRowID(rowID int64)
}

// NewChunkParser creates a new parser which can read chunks out of a file.
Expand Down Expand Up @@ -174,6 +176,7 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error {
}

// Pos returns the current file offset.
// Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (parser *blockParser) Pos() (pos int64, lastRowID int64) {
return parser.pos, parser.lastRow.RowID
}
Expand Down Expand Up @@ -205,6 +208,11 @@ func (parser *blockParser) SetLogger(logger log.Logger) {
parser.Logger = logger
}

// SetRowID changes the reported row ID when we firstly read compressed files.
func (parser *blockParser) SetRowID(rowID int64) {
parser.lastRow.RowID = rowID
}

type token byte

const (
Expand Down Expand Up @@ -592,3 +600,22 @@ func ReadChunks(parser Parser, minSize int64) ([]Chunk, error) {
}
}
}

// ReadUntil parses the entire file and splits it into continuous chunks of
// size >= minSize.
func ReadUntil(parser Parser, pos int64) error {
var curOffset int64
for curOffset < pos {
switch err := parser.ReadRow(); errors.Cause(err) {
case nil:
curOffset, _ = parser.Pos()

case io.EOF:
return nil

default:
return errors.Trace(err)
}
}
return nil
}
17 changes: 17 additions & 0 deletions br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/filter"
"github.com/pingcap/tidb/util/slice"
"go.uber.org/zap"
Expand Down Expand Up @@ -71,6 +72,22 @@ const (
CompressionSnappy
)

// ToStorageCompressType converts Compression to storage.CompressType.
func ToStorageCompressType(compression Compression) (storage.CompressType, error) {
switch compression {
case CompressionGZ:
return storage.Gzip, nil
case CompressionSnappy:
return storage.Snappy, nil
case CompressionZStd:
return storage.Zstd, nil
case CompressionNone:
return storage.NoCompression, nil
default:
return storage.NoCompression, errors.Errorf("compression %d doesn't have related storage compressType", compression)
}
}

func parseSourceType(t string) (SourceType, error) {
switch strings.ToLower(strings.TrimSpace(t)) {
case SchemaSchema:
Expand Down
126 changes: 126 additions & 0 deletions br/pkg/lightning/restore/chunk_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
package restore

import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"testing"

Expand All @@ -40,8 +44,10 @@ import (
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
tmock "github.com/pingcap/tidb/util/mock"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -654,3 +660,123 @@ func (s *chunkRestoreSuite) TestRestore() {
require.NoError(s.T(), err)
require.Len(s.T(), saveCpCh, 2)
}

func TestCompressChunkRestore(t *testing.T) {
// Produce a mock table info
p := parser.New()
p.SetSQLMode(mysql.ModeANSIQuotes)
node, err := p.ParseOneStmt(`
CREATE TABLE "table" (
a INT,
b INT,
c INT,
KEY (b)
)
`, "", "")
require.NoError(t, err)
core, err := ddl.BuildTableInfoFromAST(node.(*ast.CreateTableStmt))
require.NoError(t, err)
core.State = model.StatePublic

// Write some sample CSV dump
fakeDataDir := t.TempDir()
store, err := storage.NewLocalStorage(fakeDataDir)
require.NoError(t, err)

fakeDataFiles := make([]mydump.FileInfo, 0)

csvName := "db.table.1.csv.gz"
file, err := os.Create(filepath.Join(fakeDataDir, csvName))
require.NoError(t, err)
gzWriter := gzip.NewWriter(file)

var totalBytes int64
for i := 0; i < 300; i += 3 {
n, err := gzWriter.Write([]byte(fmt.Sprintf("%d,%d,%d\r\n", i, i+1, i+2)))
require.NoError(t, err)
totalBytes += int64(n)
}

err = gzWriter.Close()
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)

fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{
TableName: filter.Table{Schema: "db", Name: "table"},
FileMeta: mydump.SourceFileMeta{
Path: csvName,
Type: mydump.SourceTypeCSV,
Compression: mydump.CompressionGZ,
SortKey: "99",
FileSize: totalBytes,
},
})

chunk := checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: 0},
FileMeta: fakeDataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: totalBytes,
PrevRowIDMax: 0,
RowIDMax: 100,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w := worker.NewPool(ctx, 5, "io")
cfg := config.NewConfig()
cfg.Mydumper.BatchSize = 111
cfg.App.TableConcurrency = 2
cfg.Mydumper.CSV.Header = false

cr, err := newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil)
require.NoError(t, err)
var (
id, lastID int
offset int64 = 0
rowID int64 = 0
)
for id < 100 {
offset, rowID = cr.parser.Pos()
err = cr.parser.ReadRow()
require.NoError(t, err)
rowData := cr.parser.LastRow().Row
require.Len(t, rowData, 3)
lastID = id
for i := 0; id < 100 && i < 3; i++ {
require.Equal(t, strconv.Itoa(id), rowData[i].GetString())
id++
}
}
require.Equal(t, int64(33), rowID)

// test read starting from compress files' middle
chunk = checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: offset},
FileMeta: fakeDataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: offset,
EndOffset: totalBytes,
PrevRowIDMax: rowID,
RowIDMax: 100,
},
}
cr, err = newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil)
require.NoError(t, err)
for id = lastID; id < 300; {
err = cr.parser.ReadRow()
require.NoError(t, err)
rowData := cr.parser.LastRow().Row
require.Len(t, rowData, 3)
for i := 0; id < 300 && i < 3; i++ {
require.Equal(t, strconv.Itoa(id), rowData[i].GetString())
id++
}
}
_, rowID = cr.parser.Pos()
require.Equal(t, int64(100), rowID)
err = cr.parser.ReadRow()
require.Equal(t, io.EOF, errors.Cause(err))
}
29 changes: 23 additions & 6 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2190,11 +2190,21 @@ func newChunkRestore(
) (*chunkRestore, error) {
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)

var reader storage.ReadSeekCloser
var err error
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
var (
reader storage.ReadSeekCloser
compressType storage.CompressType
err error
)
switch {
case chunk.FileMeta.Type == mydump.SourceTypeParquet:
reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize)
} else {
case chunk.FileMeta.Compression != mydump.CompressionNone:
compressType, err = mydump.ToStorageCompressType(chunk.FileMeta.Compression)
if err != nil {
break
}
reader, err = storage.WithCompression(store, compressType).Open(ctx, chunk.FileMeta.Path)
default:
reader, err = store.Open(ctx, chunk.FileMeta.Path)
}
if err != nil {
Expand Down Expand Up @@ -2225,8 +2235,15 @@ func newChunkRestore(
panic(fmt.Sprintf("file '%s' with unknown source type '%s'", chunk.Key.Path, chunk.FileMeta.Type.String()))
}

if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
if chunk.FileMeta.Compression == mydump.CompressionNone {
if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
}
} else {
if err = mydump.ReadUntil(parser, chunk.Chunk.Offset); err != nil {
return nil, errors.Trace(err)
}
parser.SetRowID(chunk.Chunk.PrevRowIDMax)
}
if len(chunk.ColumnPermutation) > 0 {
parser.SetColumns(getColumnNames(tableInfo.Core, chunk.ColumnPermutation))
Expand Down
20 changes: 11 additions & 9 deletions br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ type interceptBuffer interface {
}

func createSuffixString(compressType CompressType) string {
if compressType == Gzip {
return ".txt.gz"
}
if compressType == Snappy {
return ".txt.snappy"
}
if compressType == Zstd {
return ".txt.zst"
txtSuffix := ".txt"
switch compressType {
case Gzip:
txtSuffix += ".gz"
case Snappy:
txtSuffix += ".snappy"
case Zstd:
txtSuffix += ".zst"
default:
return ""
}
return ""
return txtSuffix
}

func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {
Expand Down
4 changes: 3 additions & 1 deletion ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
// 3: (stats_extended)
// 4: (stats_fm_sketch)
// 5: (stats_history, stats_meta_history)
require.Len(t, kvRanges, 6)
// 6: (stats_table_locked)
require.Len(t, kvRanges, 7)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE employees (" +
Expand All @@ -73,6 +74,7 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
tk.MustExec("truncate table mysql.stats_fm_sketch")
tk.MustExec("truncate table mysql.stats_history")
tk.MustExec("truncate table mysql.stats_meta_history")
tk.MustExec("truncate table mysql.stats_table_locked")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 2)
Expand Down
5 changes: 1 addition & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6542,10 +6542,7 @@ func CheckIsDropPrimaryKey(indexName model.CIStr, indexInfo *model.IndexInfo, t
if indexInfo == nil && !t.Meta().PKIsHandle {
return isPK, dbterror.ErrCantDropFieldOrKey.GenWithStackByArgs("PRIMARY")
}
if t.Meta().PKIsHandle {
return isPK, dbterror.ErrUnsupportedModifyPrimaryKey.GenWithStack("Unsupported drop primary key when the table's pkIsHandle is true")
}
if t.Meta().IsCommonHandle {
if t.Meta().IsCommonHandle || t.Meta().PKIsHandle {
return isPK, dbterror.ErrUnsupportedModifyPrimaryKey.GenWithStack("Unsupported drop primary key when the table is using clustered index")
}
}
Expand Down
Loading

0 comments on commit cc09a5d

Please sign in to comment.