From bce9977e455cd7ea3b11bf015dd2da7f5761ff4f Mon Sep 17 00:00:00 2001 From: glorv Date: Sun, 27 Sep 2020 13:59:01 +0800 Subject: [PATCH] mydump: support multi bytes csv delimiter and separator (#406) * more flexible csv * fix config and add unit test * remove useless code * fix unit test * use empty string for default quote * update comments in tidb-lightning.toml for separator and delimiter Co-authored-by: kennytm --- lightning/config/config.go | 12 +- lightning/config/config_test.go | 24 ++- lightning/lightning_test.go | 2 +- lightning/mydump/bytes.go | 32 +-- lightning/mydump/csv_parser.go | 303 ++++++++++++++++++++++------ lightning/mydump/csv_parser_test.go | 159 +++++++++++---- lightning/mydump/region_test.go | 14 -- tidb-lightning.toml | 6 +- 8 files changed, 393 insertions(+), 159 deletions(-) diff --git a/lightning/config/config.go b/lightning/config/config.go index ca46c3118..27559b232 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -384,16 +384,12 @@ func (cfg *Config) LoadFromTOML(data []byte) error { func (cfg *Config) Adjust() error { // Reject problematic CSV configurations. csv := &cfg.Mydumper.CSV - if len(csv.Separator) != 1 { - return errors.New("invalid config: `mydumper.csv.separator` must be exactly one byte long") + if len(csv.Separator) == 0 { + return errors.New("invalid config: `mydumper.csv.separator` must not be empty") } - if len(csv.Delimiter) > 1 { - return errors.New("invalid config: `mydumper.csv.delimiter` must be one byte long or empty") - } - - if csv.Separator == csv.Delimiter { - return errors.New("invalid config: cannot use the same character for both CSV delimiter and separator") + if len(csv.Delimiter) > 0 && (strings.HasPrefix(csv.Separator, csv.Delimiter) || strings.HasPrefix(csv.Delimiter, csv.Separator)) { + return errors.New("invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other") } if csv.BackslashEscape { diff --git a/lightning/config/config_test.go b/lightning/config/config_test.go index 1626cb136..d9e36df6e 100644 --- a/lightning/config/config_test.go +++ b/lightning/config/config_test.go @@ -275,14 +275,23 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { [mydumper.csv] separator = '' `, - err: "invalid config: `mydumper.csv.separator` must be exactly one byte long", + err: "invalid config: `mydumper.csv.separator` must not be empty", }, { input: ` [mydumper.csv] separator = 'hello' + delimiter = 'hel' `, - err: "invalid config: `mydumper.csv.separator` must be exactly one byte long", + err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other", + }, + { + input: ` + [mydumper.csv] + separator = 'hel' + delimiter = 'hello' + `, + err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other", }, { input: ` @@ -297,7 +306,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { [mydumper.csv] separator = ',' `, - err: "invalid config: `mydumper.csv.separator` must be exactly one byte long", + err: "", }, { input: ` @@ -311,7 +320,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { [mydumper.csv] delimiter = 'hello' `, - err: "invalid config: `mydumper.csv.delimiter` must be one byte long or empty", + err: "", }, { input: ` @@ -324,9 +333,10 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { { input: ` [mydumper.csv] - delimiter = '“' + separator = '\s' + delimiter = '\d' `, - err: "invalid config: `mydumper.csv.delimiter` must be one byte long or empty", + err: "", }, { input: ` @@ -334,7 +344,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { separator = '|' delimiter = '|' `, - err: "invalid config: cannot use the same character for both CSV delimiter and separator", + err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other", }, { input: ` diff --git a/lightning/lightning_test.go b/lightning/lightning_test.go index e334d6b74..a45baf0e5 100644 --- a/lightning/lightning_test.go +++ b/lightning/lightning_test.go @@ -153,7 +153,7 @@ func (s *lightningServerSuite) TestRunServer(c *C) { c.Assert(data["error"], Matches, "cannot parse task.*") resp.Body.Close() - resp, err = http.Post(url, "application/toml", strings.NewReader("[mydumper.csv]\nseparator = 'fooo'")) + resp, err = http.Post(url, "application/toml", strings.NewReader("[mydumper.csv]\nseparator = 'fooo'\ndelimiter= 'foo'")) c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, http.StatusBadRequest) err = json.NewDecoder(resp.Body).Decode(&data) diff --git a/lightning/mydump/bytes.go b/lightning/mydump/bytes.go index 3ce5d3e77..d11027520 100644 --- a/lightning/mydump/bytes.go +++ b/lightning/mydump/bytes.go @@ -9,43 +9,31 @@ package mydump -import "unicode/utf8" +// byteSet is a 32-byte value, where each bit represents the presence of a +// given byte value in the set. +type byteSet [8]uint32 -// asciiSet is a 32-byte value, where each bit represents the presence of a -// given ASCII character in the set. The 128-bits of the lower 16 bytes, -// starting with the least-significant bit of the lowest word to the -// most-significant bit of the highest word, map to the full range of all -// 128 ASCII characters. The 128-bits of the upper 16 bytes will be zeroed, -// ensuring that any non-ASCII character will be reported as not in the set. -type asciiSet [8]uint32 - -// makeASCIISet creates a set of ASCII characters and reports whether all -// characters in chars are ASCII. -func makeASCIISet(chars string) (as asciiSet, ok bool) { +// makeByteSet creates a set of byte value. +func makeByteSet(chars []byte) (as byteSet) { for i := 0; i < len(chars); i++ { c := chars[i] - if c >= utf8.RuneSelf { - return as, false - } as[c>>5] |= 1 << uint(c&31) } - return as, true + return as } // contains reports whether c is inside the set. -func (as *asciiSet) contains(c byte) bool { +func (as *byteSet) contains(c byte) bool { return (as[c>>5] & (1 << uint(c&31))) != 0 } -// IndexAnyAscii returns the byte index of the first occurrence in s of any of the Unicode -// code points in chars. It returns -1 if there is no code -// point in common. -func IndexAnyAscii(s []byte, as *asciiSet) int { +// IndexAnyByte returns the byte index of the first occurrence in s of any of the byte +// points in chars. It returns -1 if there is no code point in common. +func IndexAnyByte(s []byte, as *byteSet) int { for i, c := range s { if as.contains(c) { return i } } return -1 - } diff --git a/lightning/mydump/csv_parser.go b/lightning/mydump/csv_parser.go index 3d368c54b..0cb5bffae 100644 --- a/lightning/mydump/csv_parser.go +++ b/lightning/mydump/csv_parser.go @@ -19,6 +19,8 @@ import ( "strings" "unicode" + "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/errors" "github.com/pingcap/tidb/types" @@ -38,8 +40,8 @@ type CSVParser struct { cfg *config.CSVConfig escFlavor backslashEscapeFlavor - comma byte - quote byte + comma []byte + quote []byte quoteIndexFunc func([]byte) int unquoteIndexFunc func([]byte) int @@ -66,18 +68,17 @@ func NewCSVParser( ioWorkers *worker.Pool, shouldParseHeader bool, ) *CSVParser { - quote := byte(0) + escFlavor := backslashEscapeFlavorNone + var quoteStopSet []byte + unquoteStopSet := []byte{'\r', '\n', cfg.Separator[0]} if len(cfg.Delimiter) > 0 { - quote = cfg.Delimiter[0] + quoteStopSet = []byte{cfg.Delimiter[0]} + unquoteStopSet = append(unquoteStopSet, cfg.Delimiter[0]) } - - escFlavor := backslashEscapeFlavorNone - quoteStopSet := cfg.Delimiter - unquoteStopSet := "\r\n" + cfg.Separator + cfg.Delimiter if cfg.BackslashEscape { escFlavor = backslashEscapeFlavorMySQL - quoteStopSet += `\` - unquoteStopSet += `\` + quoteStopSet = append(quoteStopSet, '\\') + unquoteStopSet = append(unquoteStopSet, '\\') // we need special treatment of the NULL value \N, used by MySQL. if !cfg.NotNull && cfg.Null == `\N` { escFlavor = backslashEscapeFlavorMySQLWithNull @@ -87,8 +88,8 @@ func NewCSVParser( return &CSVParser{ blockParser: makeBlockParser(reader, blockBufSize, ioWorkers), cfg: cfg, - comma: cfg.Separator[0], - quote: quote, + comma: []byte(cfg.Separator), + quote: []byte(cfg.Delimiter), escFlavor: escFlavor, quoteIndexFunc: makeBytesIndexFunc(quoteStopSet), unquoteIndexFunc: makeBytesIndexFunc(unquoteStopSet), @@ -96,11 +97,11 @@ func NewCSVParser( } } -func makeBytesIndexFunc(chars string) func([]byte) int { +func makeBytesIndexFunc(chars []byte) func([]byte) int { // chars are guaranteed to be ascii str, so this call will always success - as, _ := makeASCIISet(chars) + as := makeByteSet(chars) return func(s []byte) int { - return IndexAnyAscii(s, &as) + return IndexAnyByte(s, &as) } } @@ -130,21 +131,43 @@ func (parser *CSVParser) readByte() (byte, error) { return b, nil } -func (parser *CSVParser) peekByte() (byte, error) { - if len(parser.buf) == 0 { +func (parser *CSVParser) readBytes(buf []byte) (int, error) { + cnt := 0 + for cnt < len(buf) { + if len(parser.buf) == 0 { + if err := parser.readBlock(); err != nil { + return cnt, err + } + } + if len(parser.buf) == 0 { + parser.pos += int64(cnt) + return cnt, io.EOF + } + readCnt := utils.MinInt(len(buf)-cnt, len(parser.buf)) + copy(buf[cnt:], parser.buf[:readCnt]) + parser.buf = parser.buf[readCnt:] + cnt += readCnt + } + parser.pos += int64(cnt) + return cnt, nil +} + +func (parser *CSVParser) peekBytes(cnt int) ([]byte, error) { + if len(parser.buf) < cnt { if err := parser.readBlock(); err != nil { - return 0, err + return nil, err } } if len(parser.buf) == 0 { - return 0, io.EOF + return nil, io.EOF } - return parser.buf[0], nil + cnt = utils.MinInt(cnt, len(parser.buf)) + return parser.buf[:cnt], nil } -func (parser *CSVParser) skipByte() { - parser.buf = parser.buf[1:] - parser.pos++ +func (parser *CSVParser) skipBytes(n int) { + parser.buf = parser.buf[n:] + parser.pos += int64(n) } // readUntil reads the buffer until any character from the `chars` set is found. @@ -186,6 +209,58 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) { isEmptyLine := true whitespaceLine := true + + processDefault := func(b byte) error { + if b == '\\' && parser.escFlavor != backslashEscapeFlavorNone { + if err := parser.readByteForBackslashEscape(); err != nil { + return err + } + } else { + parser.recordBuffer = append(parser.recordBuffer, b) + } + return parser.readUnquoteField() + } + + processQuote := func(b byte) error { + return parser.readQuotedField() + } + if len(parser.quote) > 1 { + processQuote = func(b byte) error { + pb, err := parser.peekBytes(len(parser.quote) - 1) + if err != nil && errors.Cause(err) != io.EOF { + return err + } + if bytes.Equal(pb, parser.quote[1:]) { + parser.skipBytes(len(parser.quote) - 1) + return parser.readQuotedField() + } + return processDefault(b) + } + } + + processComma := func(b byte) error { + parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) + return nil + } + if len(parser.comma) > 1 { + processNotComma := processDefault + if len(parser.quote) > 0 && parser.comma[0] == parser.quote[0] { + processNotComma = processQuote + } + processComma = func(b byte) error { + pb, err := parser.peekBytes(len(parser.comma) - 1) + if err != nil && errors.Cause(err) != io.EOF { + return err + } + if bytes.Equal(pb, parser.comma[1:]) { + parser.skipBytes(len(parser.comma) - 1) + parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) + return nil + } + return processNotComma(b) + } + } + outside: for { firstByte, err := parser.readByte() @@ -197,17 +272,19 @@ outside: firstByte = '\n' } - switch firstByte { - case parser.comma: - parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) + switch { + case firstByte == parser.comma[0]: whitespaceLine = false - case parser.quote: - if err := parser.readQuotedField(); err != nil { + if err = processComma(firstByte); err != nil { return nil, err } - whitespaceLine = false - case '\r', '\n': + case len(parser.quote) > 0 && firstByte == parser.quote[0]: + if err = processQuote(firstByte); err != nil { + return nil, err + } + whitespaceLine = false + case firstByte == '\r', firstByte == '\n': // new line = end of record (ignore empty lines) if isEmptyLine { continue @@ -219,22 +296,13 @@ outside: } parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) break outside - default: - if firstByte == '\\' && parser.escFlavor != backslashEscapeFlavorNone { - if err := parser.readByteForBackslashEscape(); err != nil { - return nil, err - } - } else { - parser.recordBuffer = append(parser.recordBuffer, firstByte) - } - if err := parser.readUnquoteField(); err != nil { + if err = processDefault(firstByte); err != nil { return nil, err } } isEmptyLine = false } - // Create a single string and create slices out of it. // This pins the memory of the fields together, but allocates once. str := string(parser.recordBuffer) // Convert to string once to batch allocations @@ -264,6 +332,25 @@ func (parser *CSVParser) readByteForBackslashEscape() error { } func (parser *CSVParser) readQuotedField() error { + processDefault := func() error { + // in all other cases, we've got a syntax error. + parser.logSyntaxError() + return errors.AddStack(errUnexpectedQuoteField) + } + + processComma := func() error { return nil } + if len(parser.comma) > 1 { + processComma = func() error { + b, err := parser.peekBytes(len(parser.comma)) + if err != nil && errors.Cause(err) != io.EOF { + return err + } + if !bytes.Equal(b, parser.comma) { + return processDefault() + } + return nil + } + } for { content, terminator, err := parser.readUntil(parser.quoteIndexFunc) err = parser.replaceEOF(err, errUnterminatedQuotedField) @@ -271,29 +358,56 @@ func (parser *CSVParser) readQuotedField() error { return err } parser.recordBuffer = append(parser.recordBuffer, content...) - parser.skipByte() - switch terminator { - case parser.quote: + parser.skipBytes(1) + switch { + case len(parser.quote) > 0 && terminator == parser.quote[0]: + if len(parser.quote) > 1 { + b, err := parser.peekBytes(len(parser.quote) - 1) + if err != nil && err != io.EOF { + return err + } + if !bytes.Equal(b, parser.quote[1:]) { + parser.recordBuffer = append(parser.recordBuffer, terminator) + continue + } + parser.skipBytes(len(parser.quote) - 1) + } // encountered '"' -> continue if we're seeing '""'. - b, err := parser.peekByte() - err = parser.replaceEOF(err, nil) + b, err := parser.peekBytes(1) if err != nil { + if err == io.EOF { + err = nil + } return err } - switch b { - case parser.quote: + switch b[0] { + case parser.quote[0]: // consume the double quotation mark and continue - parser.skipByte() + if len(parser.quote) > 1 { + b, err := parser.peekBytes(len(parser.quote)) + if err != nil && err != io.EOF { + return err + } + if !bytes.Equal(b, parser.quote) { + if parser.quote[0] == parser.comma[0] { + return processComma() + } else { + return processDefault() + } + } + } + parser.skipBytes(len(parser.quote)) parser.recordBuffer = append(parser.recordBuffer, '"') - case '\r', '\n', parser.comma, 0: + case '\r', '\n': // end the field if the next is a separator return nil + case parser.comma[0]: + return processComma() default: - // in all other cases, we've got a syntax error. - parser.logSyntaxError() - return errors.AddStack(errUnexpectedQuoteField) + return processDefault() } - case '\\': + + case terminator == '\\': if err := parser.readByteForBackslashEscape(); err != nil { return err } @@ -302,22 +416,70 @@ func (parser *CSVParser) readQuotedField() error { } func (parser *CSVParser) readUnquoteField() error { + addByte := func(b byte) { + // read the following byte + parser.recordBuffer = append(parser.recordBuffer, b) + parser.skipBytes(1) + } + parseQuote := func(b byte) error { + r, err := parser.checkBytes(parser.quote) + if err != nil { + return errors.Trace(err) + } + if r { + parser.logSyntaxError() + return errors.AddStack(errUnexpectedQuoteField) + } + addByte(b) + return nil + } + + parserNoComma := func(b byte) error { + addByte(b) + return nil + } + if len(parser.quote) > 0 && parser.comma[0] == parser.quote[0] { + parserNoComma = parseQuote + } for { content, terminator, err := parser.readUntil(parser.unquoteIndexFunc) parser.recordBuffer = append(parser.recordBuffer, content...) - err = parser.replaceEOF(err, nil) + finished := false if err != nil { - return err + if errors.Cause(err) == io.EOF { + finished = true + err = nil + } + if err != nil { + return err + } } - switch terminator { - case '\r', '\n', parser.comma, 0: + switch { + case terminator == '\r', terminator == '\n', finished: return nil - case parser.quote: - parser.logSyntaxError() - return errors.AddStack(errUnexpectedQuoteField) - case '\\': - parser.skipByte() + case terminator == parser.comma[0]: + r, err := parser.checkBytes(parser.comma) + if err != nil { + return errors.Trace(err) + } + if r { + return nil + } + if err = parserNoComma(terminator); err != nil { + return err + } + case len(parser.quote) > 0 && terminator == parser.quote[0]: + r, err := parser.checkBytes(parser.quote) + if err != nil { + return errors.Trace(err) + } + if r { + parser.logSyntaxError() + return errors.AddStack(errUnexpectedQuoteField) + } + case terminator == '\\': + parser.skipBytes(1) if err := parser.readByteForBackslashEscape(); err != nil { return err } @@ -325,6 +487,17 @@ func (parser *CSVParser) readUnquoteField() error { } } +func (parser *CSVParser) checkBytes(b []byte) (bool, error) { + if len(b) == 1 { + return true, nil + } + pb, err := parser.peekBytes(len(b)) + if err != nil { + return false, err + } + return bytes.Equal(pb, b), nil +} + func (parser *CSVParser) replaceEOF(err error, replaced error) error { if err == nil || errors.Cause(err) != io.EOF { return err @@ -394,16 +567,16 @@ func (parser *CSVParser) ReadColumns() error { return nil } -var newLineAsciiSet, _ = makeASCIISet("\r\n") +var newLineAsciiSet = makeByteSet([]byte{'\r', '\n'}) func indexOfNewLine(b []byte) int { - return IndexAnyAscii(b, &newLineAsciiSet) + return IndexAnyByte(b, &newLineAsciiSet) } func (parser *CSVParser) ReadUntilTokNewLine() (int64, error) { _, _, err := parser.readUntil(indexOfNewLine) if err != nil { return 0, err } - parser.skipByte() + parser.skipBytes(1) return parser.pos, nil } diff --git a/lightning/mydump/csv_parser_test.go b/lightning/mydump/csv_parser_test.go index 69dff198b..144a5facb 100644 --- a/lightning/mydump/csv_parser_test.go +++ b/lightning/mydump/csv_parser_test.go @@ -74,12 +74,65 @@ func (s *testMydumpCSVParserSuite) runFailingTestCases(c *C, cfg *config.CSVConf } } +func tpchDatums() [][]types.Datum { + datums := make([][]types.Datum, 0, 3) + datums = append(datums, []types.Datum{ + types.NewStringDatum("1"), + types.NewStringDatum("goldenrod lavender spring chocolate lace"), + types.NewStringDatum("Manufacturer#1"), + types.NewStringDatum("Brand#13"), + types.NewStringDatum("PROMO BURNISHED COPPER"), + types.NewStringDatum("7"), + types.NewStringDatum("JUMBO PKG"), + types.NewStringDatum("901.00"), + types.NewStringDatum("ly. slyly ironi"), + }) + datums = append(datums, []types.Datum{ + types.NewStringDatum("2"), + types.NewStringDatum("blush thistle blue yellow saddle"), + types.NewStringDatum("Manufacturer#1"), + types.NewStringDatum("Brand#13"), + types.NewStringDatum("LARGE BRUSHED BRASS"), + types.NewStringDatum("1"), + types.NewStringDatum("LG CASE"), + types.NewStringDatum("902.00"), + types.NewStringDatum("lar accounts amo"), + }) + datums = append(datums, []types.Datum{ + types.NewStringDatum("3"), + types.NewStringDatum("spring green yellow purple cornsilk"), + types.NewStringDatum("Manufacturer#4"), + types.NewStringDatum("Brand#42"), + types.NewStringDatum("STANDARD POLISHED BRASS"), + types.NewStringDatum("21"), + types.NewStringDatum("WRAP CASE"), + types.NewStringDatum("903.00"), + types.NewStringDatum("egular deposits hag"), + }) + + return datums +} + +func datumsToString(datums [][]types.Datum, delimitor string, quote string, lastSep bool) string { + var b strings.Builder + for _, ds := range datums { + for i, d := range ds { + b.WriteString(quote) + b.WriteString(d.GetString()) + b.WriteString(quote) + if lastSep || i < len(ds)-1 { + b.WriteString(delimitor) + } + } + b.WriteString("\r\n") + } + return b.String() +} + func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { - reader := mydump.NewStringReader( - `1|goldenrod lavender spring chocolate lace|Manufacturer#1|Brand#13|PROMO BURNISHED COPPER|7|JUMBO PKG|901.00|ly. slyly ironi| -2|blush thistle blue yellow saddle|Manufacturer#1|Brand#13|LARGE BRUSHED BRASS|1|LG CASE|902.00|lar accounts amo| -3|spring green yellow purple cornsilk|Manufacturer#4|Brand#42|STANDARD POLISHED BRASS|21|WRAP CASE|903.00|egular deposits hag| -`) + datums := tpchDatums() + input := datumsToString(datums, "|", "", true) + reader := mydump.NewStringReader(input) cfg := config.CSVConfig{ Separator: "|", @@ -92,57 +145,83 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 1, - Row: []types.Datum{ - types.NewStringDatum("1"), - types.NewStringDatum("goldenrod lavender spring chocolate lace"), - types.NewStringDatum("Manufacturer#1"), - types.NewStringDatum("Brand#13"), - types.NewStringDatum("PROMO BURNISHED COPPER"), - types.NewStringDatum("7"), - types.NewStringDatum("JUMBO PKG"), - types.NewStringDatum("901.00"), - types.NewStringDatum("ly. slyly ironi"), - }, + Row: datums[0], }) c.Assert(parser, posEq, 126, 1) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 2, - Row: []types.Datum{ - types.NewStringDatum("2"), - types.NewStringDatum("blush thistle blue yellow saddle"), - types.NewStringDatum("Manufacturer#1"), - types.NewStringDatum("Brand#13"), - types.NewStringDatum("LARGE BRUSHED BRASS"), - types.NewStringDatum("1"), - types.NewStringDatum("LG CASE"), - types.NewStringDatum("902.00"), - types.NewStringDatum("lar accounts amo"), - }, + Row: datums[1], }) - c.Assert(parser, posEq, 240, 2) + c.Assert(parser, posEq, 241, 2) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 3, - Row: []types.Datum{ - types.NewStringDatum("3"), - types.NewStringDatum("spring green yellow purple cornsilk"), - types.NewStringDatum("Manufacturer#4"), - types.NewStringDatum("Brand#42"), - types.NewStringDatum("STANDARD POLISHED BRASS"), - types.NewStringDatum("21"), - types.NewStringDatum("WRAP CASE"), - types.NewStringDatum("903.00"), - types.NewStringDatum("egular deposits hag"), - }, + Row: datums[2], }) - c.Assert(parser, posEq, 367, 3) + c.Assert(parser, posEq, 369, 3) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) } +func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { + datums := tpchDatums() + deliAndSeps := [][2]string{ + {",", ""}, + {"\000", ""}, + {",", ""}, + {"🤔", ""}, + {",", "。"}, + {"||", ""}, + {"|+|", ""}, + {"##", ""}, + {",", "'"}, + {",", `"`}, + {"🤔", `''`}, + {"🤔", `"'`}, + {"🤔", `"'`}, + {"🤔", "🌚"}, // this two emoji have same prefix bytes + {"##", "#-"}, + {"\\s", "\\q"}, + } + for _, SepAndQuote := range deliAndSeps { + inputStr := datumsToString(datums, SepAndQuote[0], SepAndQuote[1], false) + cfg := config.CSVConfig{ + Separator: SepAndQuote[0], + Delimiter: SepAndQuote[1], + TrimLastSep: false, + } + + reader := mydump.NewStringReader(inputStr) + parser := mydump.NewCSVParser(&cfg, reader, config.ReadBlockSize, s.ioWorkers, false) + c.Assert(parser.ReadRow(), IsNil) + c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ + RowID: 1, + Row: datums[0], + }) + + c.Assert(parser, posEq, 117+8*len(SepAndQuote[0])+18*len(SepAndQuote[1]), 1) + + c.Assert(parser.ReadRow(), IsNil) + c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ + RowID: 2, + Row: datums[1], + }) + c.Assert(parser, posEq, 223+8*2*len(SepAndQuote[0])+18*2*len(SepAndQuote[1]), 2) + + c.Assert(parser.ReadRow(), IsNil) + c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ + RowID: 3, + Row: datums[2], + }) + c.Assert(parser, posEq, 342+8*3*len(SepAndQuote[0])+18*3*len(SepAndQuote[1]), 3) + + c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) + } +} + func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { cfg := config.CSVConfig{ Separator: ",", diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index 18fecf1ca..bfb41beaa 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -15,10 +15,8 @@ package mydump_test import ( "context" - "fmt" "log" "os" - "path/filepath" "github.com/pingcap/br/pkg/storage" @@ -71,18 +69,6 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { regions, err := MakeTableRegions(context.Background(), meta, 1, cfg, ioWorkers, loader.GetStore()) c.Assert(err, IsNil) - table := meta.Name - fmt.Printf("[%s] region count ===============> %d\n", table, len(regions)) - for _, region := range regions { - fname := filepath.Base(region.FileMeta.Path) - fmt.Printf("[%s] rowID = %5d / rows = %5d / offset = %10d / size = %10d \n", - fname, - region.RowIDMin(), - region.Rows(), - region.Offset(), - region.Size()) - } - // check - region-size vs file-size var tolFileSize int64 = 0 for _, file := range meta.DataFiles { diff --git a/tidb-lightning.toml b/tidb-lightning.toml index bbff2a1d0..e6932d538 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -148,9 +148,11 @@ filter = ['*.*'] # CSV files are imported according to MySQL's LOAD DATA INFILE rules. [mydumper.csv] -# separator between fields, should be an ASCII character. +# separator between fields, can be one or more characters but empty. The value can +# not be prefix of `delimiter`. separator = ',' -# string delimiter, can either be an ASCII character or empty string. +# string delimiter, can either be one or more characters or empty string. If not empty, +# the value should not be prefix of `separator` delimiter = '"' # whether the CSV files contain a header. If true, the first line will be skipped header = true