Skip to content

Commit

Permalink
parser: move LOAD DATA REMOTE into LOAD DATA (#41091)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Feb 7, 2023
1 parent 52e5a7b commit 391c551
Show file tree
Hide file tree
Showing 7 changed files with 9,174 additions and 9,220 deletions.
44 changes: 21 additions & 23 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -63,10 +64,6 @@ type LoadDataExec struct {
// Next implements the Executor Next interface.
func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
// TODO: support load data without local field.
if e.FileLocRef == ast.FileLocServer {
return errors.New("Load Data: don't support load data without local field")
}
// TODO: support lines terminated is "".
if len(e.loadDataInfo.LinesInfo.Terminated) == 0 {
return errors.New("Load Data: don't support load data terminated is nil")
Expand All @@ -79,8 +76,21 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

switch e.FileLocRef {
case ast.FileLocServer:
panic("FileLocServer should be handled earlier")
case ast.FileLocServerOrRemote:
u, err := storage.ParseRawURL(e.loadDataInfo.Path)
if err != nil {
return err
}
var filename string
u.Path, filename = filepath.Split(u.Path)
b, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return err
}
if b.GetLocal() != nil {
return errors.Errorf("Load Data: don't support load data from tidb-server's disk")
}
return e.loadFromRemote(ctx, b, filename)
case ast.FileLocClient:
// let caller use handleQuerySpecial to read data in this connection
sctx := e.loadDataInfo.ctx
Expand All @@ -90,27 +100,15 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
return errors.New("Load Data: previous load data option wasn't closed normally")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
case ast.FileLocRemote:
return e.loadFromRemote(ctx)
}
return nil
}

func (e *LoadDataExec) loadFromRemote(ctx context.Context) error {
u, err := storage.ParseRawURL(e.loadDataInfo.Path)
if err != nil {
return err
}
var filename string
u.Path, filename = filepath.Split(u.Path)
b, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return err
}
if b.GetLocal() != nil {
return errors.Errorf("Load Data: don't support load data from tidb-server when set REMOTE, path %s", e.loadDataInfo.Path)
}

func (e *LoadDataExec) loadFromRemote(
ctx context.Context,
b *backup.StorageBackend,
filename string,
) error {
opt := &storage.ExternalStorageOptions{}
if InTest {
opt.NoCredentials = true
Expand Down
6 changes: 3 additions & 3 deletions executor/loadremotetest/one_csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s *mockGCSSuite) TestLoadCSV() {
s.tk.MustExec("CREATE TABLE load_csv.t (i INT, s varchar(32));")

// no-new-line-at-end
sql := fmt.Sprintf(`LOAD DATA REMOTE INFILE 'gcs://test-bucket/no-new-line-at-end.csv?endpoint=%s' INTO TABLE load_csv.t
sql := fmt.Sprintf(`LOAD DATA INFILE 'gcs://test-bucket/no-new-line-at-end.csv?endpoint=%s' INTO TABLE load_csv.t
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n' IGNORE 1 LINES;`, gcsEndpoint)
s.tk.MustExec(sql)
Expand All @@ -39,7 +39,7 @@ func (s *mockGCSSuite) TestLoadCSV() {
s.tk.MustExec("TRUNCATE TABLE load_csv.t;")

// new-line-at-end
sql = fmt.Sprintf(`LOAD DATA REMOTE INFILE 'gcs://test-bucket/new-line-at-end.csv?endpoint=%s' INTO TABLE load_csv.t
sql = fmt.Sprintf(`LOAD DATA INFILE 'gcs://test-bucket/new-line-at-end.csv?endpoint=%s' INTO TABLE load_csv.t
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n' IGNORE 1 LINES;`, gcsEndpoint)
s.tk.MustExec(sql)
Expand All @@ -52,6 +52,6 @@ func (s *mockGCSSuite) TestLoadCSV() {
s.tk.MustExec("TRUNCATE TABLE load_csv.t;")

// can't read file at tidb-server
sql = "LOAD DATA REMOTE INFILE '/etc/passwd' INTO TABLE load_csv.t;"
sql = "LOAD DATA INFILE '/etc/passwd' INTO TABLE load_csv.t;"
s.tk.MustContainErrMsg(sql, "don't support load data from tidb-server")
}
12 changes: 4 additions & 8 deletions parser/ast/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -1804,14 +1804,12 @@ func (n *ColumnNameOrUserVar) Accept(v Visitor) (node Node, ok bool) {
type FileLocRefTp int

const (
// FileLocServer is used when there's no keywords in SQL, which means the data file should be located on the tidb-server.
FileLocServer FileLocRefTp = iota
// FileLocServerOrRemote is used when there's no keywords in SQL, which means the data file should be located on the
// tidb-server or on remote storage (S3 for example).
FileLocServerOrRemote FileLocRefTp = iota
// FileLocClient is used when there's LOCAL keyword in SQL, which means the data file should be located on the MySQL
// client.
FileLocClient
// FileLocRemote is used when there's REMOTE keyword in SQL, which means the data file should be located on a remote
// server, such as a cloud storage.
FileLocRemote
)

// LoadDataStmt is a statement to load data from a specified file, then insert this rows into an existing table.
Expand All @@ -1837,11 +1835,9 @@ type LoadDataStmt struct {
func (n *LoadDataStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("LOAD DATA ")
switch n.FileLocRef {
case FileLocServer:
case FileLocServerOrRemote:
case FileLocClient:
ctx.WriteKeyWord("LOCAL ")
case FileLocRemote:
ctx.WriteKeyWord("REMOTE ")
}
ctx.WriteKeyWord("INFILE ")
ctx.WriteString(n.Path)
Expand Down
1 change: 0 additions & 1 deletion parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,6 @@ var tokenMap = map[string]int{
"RELEASE": release,
"RELOAD": reload,
"REMOVE": remove,
"REMOTE": remote,
"RENAME": rename,
"REORGANIZE": reorganize,
"REPAIR": repair,
Expand Down
Loading

0 comments on commit 391c551

Please sign in to comment.