From ae8f90a5e6a6151d382230fbd5557f199d1c2b3c Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 18 Sep 2020 10:59:08 +0800 Subject: [PATCH 1/6] more flexible csv --- lightning/mydump/bytes.go | 11 +- lightning/mydump/csv_parser.go | 234 +++++++++++++++++++++++----- lightning/mydump/csv_parser_test.go | 146 +++++++++++++---- 3 files changed, 312 insertions(+), 79 deletions(-) diff --git a/lightning/mydump/bytes.go b/lightning/mydump/bytes.go index 3ce5d3e77..91b2c79de 100644 --- a/lightning/mydump/bytes.go +++ b/lightning/mydump/bytes.go @@ -9,8 +9,6 @@ package mydump -import "unicode/utf8" - // asciiSet is a 32-byte value, where each bit represents the presence of a // given ASCII character in the set. The 128-bits of the lower 16 bytes, // starting with the least-significant bit of the lowest word to the @@ -19,17 +17,14 @@ import "unicode/utf8" // ensuring that any non-ASCII character will be reported as not in the set. type asciiSet [8]uint32 -// makeASCIISet creates a set of ASCII characters and reports whether all +// makeByteSet creates a set of ASCII characters and reports whether all // characters in chars are ASCII. -func makeASCIISet(chars string) (as asciiSet, ok bool) { +func makeByteSet(chars []byte) (as asciiSet) { for i := 0; i < len(chars); i++ { c := chars[i] - if c >= utf8.RuneSelf { - return as, false - } as[c>>5] |= 1 << uint(c&31) } - return as, true + return as } // contains reports whether c is inside the set. diff --git a/lightning/mydump/csv_parser.go b/lightning/mydump/csv_parser.go index 3d368c54b..d2c9a2333 100644 --- a/lightning/mydump/csv_parser.go +++ b/lightning/mydump/csv_parser.go @@ -15,10 +15,13 @@ package mydump import ( "bytes" + "fmt" "io" "strings" "unicode" + "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/errors" "github.com/pingcap/tidb/types" @@ -38,8 +41,8 @@ type CSVParser struct { cfg *config.CSVConfig escFlavor backslashEscapeFlavor - comma byte - quote byte + comma []byte + quote []byte quoteIndexFunc func([]byte) int unquoteIndexFunc func([]byte) int @@ -66,18 +69,22 @@ func NewCSVParser( ioWorkers *worker.Pool, shouldParseHeader bool, ) *CSVParser { - quote := byte(0) + quote := []byte{0} if len(cfg.Delimiter) > 0 { - quote = cfg.Delimiter[0] + quote = []byte(cfg.Delimiter) } escFlavor := backslashEscapeFlavorNone - quoteStopSet := cfg.Delimiter - unquoteStopSet := "\r\n" + cfg.Separator + cfg.Delimiter + var quoteStopSet []byte + unquoteStopSet := []byte{'\r', '\n', cfg.Separator[0]} + if len(cfg.Delimiter) > 0 { + quoteStopSet = []byte{cfg.Delimiter[0]} + unquoteStopSet = append(unquoteStopSet, cfg.Delimiter[0]) + } if cfg.BackslashEscape { escFlavor = backslashEscapeFlavorMySQL - quoteStopSet += `\` - unquoteStopSet += `\` + quoteStopSet = append(quoteStopSet, '\\') + unquoteStopSet = append(unquoteStopSet, '\\') // we need special treatment of the NULL value \N, used by MySQL. if !cfg.NotNull && cfg.Null == `\N` { escFlavor = backslashEscapeFlavorMySQLWithNull @@ -87,7 +94,7 @@ func NewCSVParser( return &CSVParser{ blockParser: makeBlockParser(reader, blockBufSize, ioWorkers), cfg: cfg, - comma: cfg.Separator[0], + comma: []byte(cfg.Separator), quote: quote, escFlavor: escFlavor, quoteIndexFunc: makeBytesIndexFunc(quoteStopSet), @@ -96,9 +103,9 @@ func NewCSVParser( } } -func makeBytesIndexFunc(chars string) func([]byte) int { +func makeBytesIndexFunc(chars []byte) func([]byte) int { // chars are guaranteed to be ascii str, so this call will always success - as, _ := makeASCIISet(chars) + as := makeByteSet(chars) return func(s []byte) int { return IndexAnyAscii(s, &as) } @@ -130,6 +137,27 @@ func (parser *CSVParser) readByte() (byte, error) { return b, nil } +func (parser *CSVParser) readBytes(buf []byte) (int, error) { + cnt := 0 + for cnt < len(buf) { + if len(parser.buf) == 0 { + if err := parser.readBlock(); err != nil { + return cnt, err + } + } + if len(parser.buf) == 0 { + parser.pos += int64(cnt) + return cnt, io.EOF + } + readCnt := utils.MinInt(len(buf)-cnt, len(parser.buf)) + copy(buf[cnt:], parser.buf[:readCnt]) + parser.buf = parser.buf[readCnt:] + cnt += readCnt + } + parser.pos += int64(cnt) + return cnt, nil +} + func (parser *CSVParser) peekByte() (byte, error) { if len(parser.buf) == 0 { if err := parser.readBlock(); err != nil { @@ -142,11 +170,29 @@ func (parser *CSVParser) peekByte() (byte, error) { return parser.buf[0], nil } +func (parser *CSVParser) peekBytes(cnt int) ([]byte, error) { + if len(parser.buf) < cnt { + if err := parser.readBlock(); err != nil { + return []byte{}, err + } + } + if len(parser.buf) == 0 { + return []byte{}, io.EOF + } + cnt = utils.MinInt(cnt, len(parser.buf)) + return parser.buf[:cnt], nil +} + func (parser *CSVParser) skipByte() { parser.buf = parser.buf[1:] parser.pos++ } +func (parser *CSVParser) skipBytes(n int) { + parser.buf = parser.buf[n:] + parser.pos += int64(n) +} + // readUntil reads the buffer until any character from the `chars` set is found. // that character is excluded from the final buffer. func (parser *CSVParser) readUntil(findIndexFunc func([]byte) int) ([]byte, byte, error) { @@ -186,6 +232,33 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) { isEmptyLine := true whitespaceLine := true + + processDefault := func(b byte) error { + if b == '\\' && parser.escFlavor != backslashEscapeFlavorNone { + if err := parser.readByteForBackslashEscape(); err != nil { + return err + } + } else { + parser.recordBuffer = append(parser.recordBuffer, b) + } + return parser.readUnquoteField() + } + + processQuote := func(b byte) error { + if len(parser.quote) > 1 { + pb, err := parser.peekBytes(len(parser.quote) - 1) + if err != nil && err != io.EOF { + return err + } + if bytes.Equal(pb, parser.quote[1:]) { + parser.skipBytes(len(parser.quote) - 1) + } else { + return processDefault(b) + } + } + return parser.readQuotedField() + } + outside: for { firstByte, err := parser.readByte() @@ -198,15 +271,34 @@ outside: } switch firstByte { - case parser.comma: - parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) + case parser.comma[0]: whitespaceLine = false - case parser.quote: - if err := parser.readQuotedField(); err != nil { + isComma := true + if len(parser.comma) > 1 { + pb, err := parser.peekBytes(len(parser.comma) - 1) + if err != nil && err != io.EOF { + return nil, err + } + isComma = bytes.Equal(pb, parser.comma[1:]) + } + if isComma { + parser.skipBytes(len(parser.comma) - 1) + parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) + } else if parser.comma[0] == parser.quote[0] { + if err = processQuote(firstByte); err != nil { + return nil, err + } + } else { + if err = processDefault(firstByte); err != nil { + return nil, err + } + } + + case parser.quote[0]: + if err = processQuote(firstByte); err != nil { return nil, err } whitespaceLine = false - case '\r', '\n': // new line = end of record (ignore empty lines) if isEmptyLine { @@ -219,22 +311,13 @@ outside: } parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) break outside - default: - if firstByte == '\\' && parser.escFlavor != backslashEscapeFlavorNone { - if err := parser.readByteForBackslashEscape(); err != nil { - return nil, err - } - } else { - parser.recordBuffer = append(parser.recordBuffer, firstByte) - } - if err := parser.readUnquoteField(); err != nil { + if err = processDefault(firstByte); err != nil { return nil, err } } isEmptyLine = false } - // Create a single string and create slices out of it. // This pins the memory of the fields together, but allocates once. str := string(parser.recordBuffer) // Convert to string once to batch allocations @@ -264,16 +347,47 @@ func (parser *CSVParser) readByteForBackslashEscape() error { } func (parser *CSVParser) readQuotedField() error { + processDefault := func() error { + fmt.Printf(" parser pos: %d\n", parser.pos) + // in all other cases, we've got a syntax error. + parser.logSyntaxError() + return errors.AddStack(errUnexpectedQuoteField) + } + + processComma := func() error { + if len(parser.comma) > 1 { + b, err := parser.peekBytes(len(parser.comma)) + if err != nil && err != io.EOF { + return err + } + if !bytes.Equal(b, parser.comma) { + return processDefault() + } + } + return nil + } for { content, terminator, err := parser.readUntil(parser.quoteIndexFunc) err = parser.replaceEOF(err, errUnterminatedQuotedField) if err != nil { + fmt.Printf(" content: %s, len: %d, pos: %d\n", string(content), len(content), parser.pos) return err } parser.recordBuffer = append(parser.recordBuffer, content...) parser.skipByte() switch terminator { - case parser.quote: + case parser.quote[0]: + if len(parser.quote) > 1 { + b, err := parser.peekBytes(len(parser.quote) - 1) + if err != nil && err != io.EOF { + return err + } + if !bytes.Equal(b, parser.quote[1:]) { + parser.recordBuffer = append(parser.recordBuffer, terminator) + continue + } + parser.skipBytes(len(parser.quote) - 1) + } // encountered '"' -> continue if we're seeing '""'. b, err := parser.peekByte() err = parser.replaceEOF(err, nil) @@ -281,18 +395,30 @@ func (parser *CSVParser) readQuotedField() error { return err } switch b { - case parser.quote: + case parser.quote[0]: // consume the double quotation mark and continue - parser.skipByte() + if len(parser.quote) > 1 { + b, err := parser.peekBytes(len(parser.quote)) + if err != nil && err != io.EOF { + return err + } + if !bytes.Equal(b, parser.quote) && parser.quote[0] == parser.comma[0] { + return processComma() + } else { + return processDefault() + } + } + parser.skipBytes(len(parser.quote)) parser.recordBuffer = append(parser.recordBuffer, '"') - case '\r', '\n', parser.comma, 0: + case '\r', '\n', 0: // end the field if the next is a separator return nil + case parser.comma[0]: + return processComma() default: - // in all other cases, we've got a syntax error. - parser.logSyntaxError() - return errors.AddStack(errUnexpectedQuoteField) + return processDefault() } + case '\\': if err := parser.readByteForBackslashEscape(); err != nil { return err @@ -311,11 +437,26 @@ func (parser *CSVParser) readUnquoteField() error { } switch terminator { - case '\r', '\n', parser.comma, 0: + case '\r', '\n', 0: return nil - case parser.quote: - parser.logSyntaxError() - return errors.AddStack(errUnexpectedQuoteField) + case parser.comma[0]: + r, err := parser.checkOrReadBytes(parser.comma) + if err != nil { + return errors.Trace(err) + } + if r { + return nil + } + case parser.quote[0]: + r, err := parser.checkOrReadBytes(parser.quote) + if err != nil { + return errors.Trace(err) + } + if r { + fmt.Printf(" quote: %s, got: %c, pos: %d\n", parser.quote, terminator, parser.pos) + parser.logSyntaxError() + return errors.AddStack(errUnexpectedQuoteField) + } case '\\': parser.skipByte() if err := parser.readByteForBackslashEscape(); err != nil { @@ -325,6 +466,23 @@ func (parser *CSVParser) readUnquoteField() error { } } +func (parser *CSVParser) checkOrReadBytes(b []byte) (bool, error) { + if len(b) == 1 { + return true, nil + } + pb, err := parser.peekBytes(len(b)) + if err != nil { + return false, err + } + if bytes.Equal(pb, b) { + return true, nil + } + // read the following byte + parser.recordBuffer = append(parser.recordBuffer, pb[0]) + parser.skipByte() + return false, nil +} + func (parser *CSVParser) replaceEOF(err error, replaced error) error { if err == nil || errors.Cause(err) != io.EOF { return err @@ -394,7 +552,7 @@ func (parser *CSVParser) ReadColumns() error { return nil } -var newLineAsciiSet, _ = makeASCIISet("\r\n") +var newLineAsciiSet = makeByteSet([]byte{'\r', '\n'}) func indexOfNewLine(b []byte) int { return IndexAnyAscii(b, &newLineAsciiSet) diff --git a/lightning/mydump/csv_parser_test.go b/lightning/mydump/csv_parser_test.go index 69dff198b..c04efc56d 100644 --- a/lightning/mydump/csv_parser_test.go +++ b/lightning/mydump/csv_parser_test.go @@ -3,6 +3,7 @@ package mydump_test import ( "context" "encoding/csv" + "fmt" "io" "os" "path/filepath" @@ -74,6 +75,61 @@ func (s *testMydumpCSVParserSuite) runFailingTestCases(c *C, cfg *config.CSVConf } } +func tpchDatums() [][]types.Datum { + datums := make([][]types.Datum, 0, 3) + datums = append(datums, []types.Datum{ + types.NewStringDatum("1"), + types.NewStringDatum("goldenrod lavender spring chocolate lace"), + types.NewStringDatum("Manufacturer#1"), + types.NewStringDatum("Brand#13"), + types.NewStringDatum("PROMO BURNISHED COPPER"), + types.NewStringDatum("7"), + types.NewStringDatum("JUMBO PKG"), + types.NewStringDatum("901.00"), + types.NewStringDatum("ly. slyly ironi"), + }) + datums = append(datums, []types.Datum{ + types.NewStringDatum("2"), + types.NewStringDatum("blush thistle blue yellow saddle"), + types.NewStringDatum("Manufacturer#1"), + types.NewStringDatum("Brand#13"), + types.NewStringDatum("LARGE BRUSHED BRASS"), + types.NewStringDatum("1"), + types.NewStringDatum("LG CASE"), + types.NewStringDatum("902.00"), + types.NewStringDatum("lar accounts amo"), + }) + datums = append(datums, []types.Datum{ + types.NewStringDatum("3"), + types.NewStringDatum("spring green yellow purple cornsilk"), + types.NewStringDatum("Manufacturer#4"), + types.NewStringDatum("Brand#42"), + types.NewStringDatum("STANDARD POLISHED BRASS"), + types.NewStringDatum("21"), + types.NewStringDatum("WRAP CASE"), + types.NewStringDatum("903.00"), + types.NewStringDatum("egular deposits hag"), + }) + + return datums +} + +func datumsToString(datums [][]types.Datum, delimetor string, quote string) string { + var b strings.Builder + for _, ds := range datums { + for i, d := range ds { + b.WriteString(quote) + b.WriteString(d.GetString()) + b.WriteString(quote) + if i < len(ds)-1 { + b.WriteString(delimetor) + } + } + b.WriteString("\r\n") + } + return b.String() +} + func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { reader := mydump.NewStringReader( `1|goldenrod lavender spring chocolate lace|Manufacturer#1|Brand#13|PROMO BURNISHED COPPER|7|JUMBO PKG|901.00|ly. slyly ironi| @@ -81,6 +137,7 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { 3|spring green yellow purple cornsilk|Manufacturer#4|Brand#42|STANDARD POLISHED BRASS|21|WRAP CASE|903.00|egular deposits hag| `) + datums := tpchDatums() cfg := config.CSVConfig{ Separator: "|", Delimiter: "", @@ -92,57 +149,80 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 1, - Row: []types.Datum{ - types.NewStringDatum("1"), - types.NewStringDatum("goldenrod lavender spring chocolate lace"), - types.NewStringDatum("Manufacturer#1"), - types.NewStringDatum("Brand#13"), - types.NewStringDatum("PROMO BURNISHED COPPER"), - types.NewStringDatum("7"), - types.NewStringDatum("JUMBO PKG"), - types.NewStringDatum("901.00"), - types.NewStringDatum("ly. slyly ironi"), - }, + Row: datums[0], }) c.Assert(parser, posEq, 126, 1) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 2, - Row: []types.Datum{ - types.NewStringDatum("2"), - types.NewStringDatum("blush thistle blue yellow saddle"), - types.NewStringDatum("Manufacturer#1"), - types.NewStringDatum("Brand#13"), - types.NewStringDatum("LARGE BRUSHED BRASS"), - types.NewStringDatum("1"), - types.NewStringDatum("LG CASE"), - types.NewStringDatum("902.00"), - types.NewStringDatum("lar accounts amo"), - }, + Row: datums[1], }) c.Assert(parser, posEq, 240, 2) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 3, - Row: []types.Datum{ - types.NewStringDatum("3"), - types.NewStringDatum("spring green yellow purple cornsilk"), - types.NewStringDatum("Manufacturer#4"), - types.NewStringDatum("Brand#42"), - types.NewStringDatum("STANDARD POLISHED BRASS"), - types.NewStringDatum("21"), - types.NewStringDatum("WRAP CASE"), - types.NewStringDatum("903.00"), - types.NewStringDatum("egular deposits hag"), - }, + Row: datums[2], }) c.Assert(parser, posEq, 367, 3) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) } +func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { + datums := tpchDatums() + deliAndSeps := [][2]string{ + {",", ""}, + {"🤔", ""}, + {"||", ""}, + {"|+|", ""}, + {"##", ""}, + {",", "'"}, + {",", `"`}, + {"🤔", `''`}, + {"🤔", `"'`}, + {"🤔", `"'`}, + {"##", "#-"}, + } + for _, SepAndQuote := range deliAndSeps { + inputStr := datumsToString(datums, SepAndQuote[0], SepAndQuote[1]) + fmt.Printf("== input: ==\n%s\n", inputStr) + cfg := config.CSVConfig{ + Separator: SepAndQuote[0], + Delimiter: SepAndQuote[1], + TrimLastSep: false, + } + + reader := mydump.NewStringReader(inputStr) + parser := mydump.NewCSVParser(&cfg, reader, config.ReadBlockSize, s.ioWorkers, false) + c.Assert(parser.ReadRow(), IsNil) + c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ + RowID: 1, + Row: datums[0], + }) + + c.Assert(parser, posEq, 117+8*len(SepAndQuote[0])+18*len(SepAndQuote[1]), 1) + + c.Assert(parser.ReadRow(), IsNil) + c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ + RowID: 2, + Row: datums[1], + }) + c.Assert(parser, posEq, 223+8*2*len(SepAndQuote[0])+18*2*len(SepAndQuote[1]), 2) + + c.Assert(parser.ReadRow(), IsNil) + c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ + RowID: 3, + Row: datums[2], + }) + c.Assert(parser, posEq, 342+8*3*len(SepAndQuote[0])+18*3*len(SepAndQuote[1]), 3) + + c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) + fmt.Printf("sep: %s, quote: %s\n", SepAndQuote[0], SepAndQuote[1]) + } +} + func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { cfg := config.CSVConfig{ Separator: ",", From 8e67c9df12c3485d70c67f486386668801f3e5dc Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 23 Sep 2020 16:25:16 +0800 Subject: [PATCH 2/6] fix config and add unit test --- lightning/config/config.go | 8 +- lightning/mydump/csv_parser.go | 121 ++++++++++++++++++---------- lightning/mydump/csv_parser_test.go | 28 +++---- lightning/mydump/region_test.go | 14 ---- 4 files changed, 94 insertions(+), 77 deletions(-) diff --git a/lightning/config/config.go b/lightning/config/config.go index ca46c3118..12baedef4 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -384,12 +384,12 @@ func (cfg *Config) LoadFromTOML(data []byte) error { func (cfg *Config) Adjust() error { // Reject problematic CSV configurations. csv := &cfg.Mydumper.CSV - if len(csv.Separator) != 1 { - return errors.New("invalid config: `mydumper.csv.separator` must be exactly one byte long") + if len(csv.Separator) == 0 { + return errors.New("invalid config: `mydumper.csv.separator` must not be empty") } - if len(csv.Delimiter) > 1 { - return errors.New("invalid config: `mydumper.csv.delimiter` must be one byte long or empty") + if len(csv.Delimiter) > 0 && (strings.Index(csv.Separator, csv.Delimiter) == 0 || strings.Index(csv.Delimiter, csv.Separator) == 0) { + return errors.New("invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other") } if csv.Separator == csv.Delimiter { diff --git a/lightning/mydump/csv_parser.go b/lightning/mydump/csv_parser.go index d2c9a2333..0f5912276 100644 --- a/lightning/mydump/csv_parser.go +++ b/lightning/mydump/csv_parser.go @@ -15,7 +15,6 @@ package mydump import ( "bytes" - "fmt" "io" "strings" "unicode" @@ -245,18 +244,43 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) { } processQuote := func(b byte) error { - if len(parser.quote) > 1 { + return parser.readQuotedField() + } + if len(parser.quote) > 1 { + processQuote = func(b byte) error { pb, err := parser.peekBytes(len(parser.quote) - 1) if err != nil && err != io.EOF { return err } if bytes.Equal(pb, parser.quote[1:]) { parser.skipBytes(len(parser.quote) - 1) - } else { - return processDefault(b) + return parser.readQuotedField() } + return processDefault(b) + } + } + + processComma := func(b byte) error { + parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) + return nil + } + if len(parser.comma) > 1 { + processNotComma := processDefault + if parser.comma[0] == parser.quote[0] { + processNotComma = processQuote + } + processComma = func(b byte) error { + pb, err := parser.peekBytes(len(parser.comma) - 1) + if err != nil && err != io.EOF { + return err + } + if bytes.Equal(pb, parser.comma[1:]) { + parser.skipBytes(len(parser.comma) - 1) + parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) + return nil + } + return processNotComma(b) } - return parser.readQuotedField() } outside: @@ -273,25 +297,8 @@ outside: switch firstByte { case parser.comma[0]: whitespaceLine = false - isComma := true - if len(parser.comma) > 1 { - pb, err := parser.peekBytes(len(parser.comma) - 1) - if err != nil && err != io.EOF { - return nil, err - } - isComma = bytes.Equal(pb, parser.comma[1:]) - } - if isComma { - parser.skipBytes(len(parser.comma) - 1) - parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer)) - } else if parser.comma[0] == parser.quote[0] { - if err = processQuote(firstByte); err != nil { - return nil, err - } - } else { - if err = processDefault(firstByte); err != nil { - return nil, err - } + if err = processComma(firstByte); err != nil { + return nil, err } case parser.quote[0]: @@ -348,14 +355,15 @@ func (parser *CSVParser) readByteForBackslashEscape() error { func (parser *CSVParser) readQuotedField() error { processDefault := func() error { - fmt.Printf(" parser pos: %d\n", parser.pos) // in all other cases, we've got a syntax error. parser.logSyntaxError() return errors.AddStack(errUnexpectedQuoteField) } - processComma := func() error { - if len(parser.comma) > 1 { + processComma := func() error { return nil } + if len(parser.comma) > 1 { + processComma = func() error { + b, err := parser.peekBytes(len(parser.comma)) if err != nil && err != io.EOF { return err @@ -363,14 +371,13 @@ func (parser *CSVParser) readQuotedField() error { if !bytes.Equal(b, parser.comma) { return processDefault() } + return nil } - return nil } for { content, terminator, err := parser.readUntil(parser.quoteIndexFunc) err = parser.replaceEOF(err, errUnterminatedQuotedField) if err != nil { - fmt.Printf(" content: %s, len: %d, pos: %d\n", string(content), len(content), parser.pos) return err } parser.recordBuffer = append(parser.recordBuffer, content...) @@ -402,10 +409,12 @@ func (parser *CSVParser) readQuotedField() error { if err != nil && err != io.EOF { return err } - if !bytes.Equal(b, parser.quote) && parser.quote[0] == parser.comma[0] { - return processComma() - } else { - return processDefault() + if !bytes.Equal(b, parser.quote) { + if parser.quote[0] == parser.comma[0] { + return processComma() + } else { + return processDefault() + } } } parser.skipBytes(len(parser.quote)) @@ -428,6 +437,31 @@ func (parser *CSVParser) readQuotedField() error { } func (parser *CSVParser) readUnquoteField() error { + addByte := func(b byte) { + // read the following byte + parser.recordBuffer = append(parser.recordBuffer, b) + parser.skipByte() + } + parseQuote := func(b byte) error { + r, err := parser.checkBytes(parser.quote) + if err != nil { + return errors.Trace(err) + } + if r { + parser.logSyntaxError() + return errors.AddStack(errUnexpectedQuoteField) + } + addByte(b) + return nil + } + + parserNoComma := func(b byte) error { + addByte(b) + return nil + } + if parser.comma[0] == parser.quote[0] { + parserNoComma = parseQuote + } for { content, terminator, err := parser.readUntil(parser.unquoteIndexFunc) parser.recordBuffer = append(parser.recordBuffer, content...) @@ -440,20 +474,22 @@ func (parser *CSVParser) readUnquoteField() error { case '\r', '\n', 0: return nil case parser.comma[0]: - r, err := parser.checkOrReadBytes(parser.comma) + r, err := parser.checkBytes(parser.comma) if err != nil { return errors.Trace(err) } if r { return nil } + if err = parserNoComma(terminator); err != nil { + return err + } case parser.quote[0]: - r, err := parser.checkOrReadBytes(parser.quote) + r, err := parser.checkBytes(parser.quote) if err != nil { return errors.Trace(err) } if r { - fmt.Printf(" quote: %s, got: %c, pos: %d\n", parser.quote, terminator, parser.pos) parser.logSyntaxError() return errors.AddStack(errUnexpectedQuoteField) } @@ -466,7 +502,7 @@ func (parser *CSVParser) readUnquoteField() error { } } -func (parser *CSVParser) checkOrReadBytes(b []byte) (bool, error) { +func (parser *CSVParser) checkBytes(b []byte) (bool, error) { if len(b) == 1 { return true, nil } @@ -474,13 +510,7 @@ func (parser *CSVParser) checkOrReadBytes(b []byte) (bool, error) { if err != nil { return false, err } - if bytes.Equal(pb, b) { - return true, nil - } - // read the following byte - parser.recordBuffer = append(parser.recordBuffer, pb[0]) - parser.skipByte() - return false, nil + return bytes.Equal(pb, b), nil } func (parser *CSVParser) replaceEOF(err error, replaced error) error { @@ -565,3 +595,6 @@ func (parser *CSVParser) ReadUntilTokNewLine() (int64, error) { parser.skipByte() return parser.pos, nil } + +type node struct { +} diff --git a/lightning/mydump/csv_parser_test.go b/lightning/mydump/csv_parser_test.go index c04efc56d..63cef6ae8 100644 --- a/lightning/mydump/csv_parser_test.go +++ b/lightning/mydump/csv_parser_test.go @@ -3,7 +3,6 @@ package mydump_test import ( "context" "encoding/csv" - "fmt" "io" "os" "path/filepath" @@ -114,15 +113,15 @@ func tpchDatums() [][]types.Datum { return datums } -func datumsToString(datums [][]types.Datum, delimetor string, quote string) string { +func datumsToString(datums [][]types.Datum, delimitor string, quote string, lastSep bool) string { var b strings.Builder for _, ds := range datums { for i, d := range ds { b.WriteString(quote) b.WriteString(d.GetString()) b.WriteString(quote) - if i < len(ds)-1 { - b.WriteString(delimetor) + if lastSep || i < len(ds)-1 { + b.WriteString(delimitor) } } b.WriteString("\r\n") @@ -131,13 +130,10 @@ func datumsToString(datums [][]types.Datum, delimetor string, quote string) stri } func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { - reader := mydump.NewStringReader( - `1|goldenrod lavender spring chocolate lace|Manufacturer#1|Brand#13|PROMO BURNISHED COPPER|7|JUMBO PKG|901.00|ly. slyly ironi| -2|blush thistle blue yellow saddle|Manufacturer#1|Brand#13|LARGE BRUSHED BRASS|1|LG CASE|902.00|lar accounts amo| -3|spring green yellow purple cornsilk|Manufacturer#4|Brand#42|STANDARD POLISHED BRASS|21|WRAP CASE|903.00|egular deposits hag| -`) - datums := tpchDatums() + input := datumsToString(datums, "|", "", true) + reader := mydump.NewStringReader(input) + cfg := config.CSVConfig{ Separator: "|", Delimiter: "", @@ -158,14 +154,14 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { RowID: 2, Row: datums[1], }) - c.Assert(parser, posEq, 240, 2) + c.Assert(parser, posEq, 241, 2) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 3, Row: datums[2], }) - c.Assert(parser, posEq, 367, 3) + c.Assert(parser, posEq, 369, 3) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) } @@ -173,8 +169,10 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { datums := tpchDatums() deliAndSeps := [][2]string{ + {",", ""}, {",", ""}, {"🤔", ""}, + {",", "。"}, {"||", ""}, {"|+|", ""}, {"##", ""}, @@ -183,11 +181,12 @@ func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { {"🤔", `''`}, {"🤔", `"'`}, {"🤔", `"'`}, + {"🤔", "🌚"}, // this two emoji have same prefix bytes {"##", "#-"}, + {"\\s", "\\q"}, } for _, SepAndQuote := range deliAndSeps { - inputStr := datumsToString(datums, SepAndQuote[0], SepAndQuote[1]) - fmt.Printf("== input: ==\n%s\n", inputStr) + inputStr := datumsToString(datums, SepAndQuote[0], SepAndQuote[1], false) cfg := config.CSVConfig{ Separator: SepAndQuote[0], Delimiter: SepAndQuote[1], @@ -219,7 +218,6 @@ func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { c.Assert(parser, posEq, 342+8*3*len(SepAndQuote[0])+18*3*len(SepAndQuote[1]), 3) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) - fmt.Printf("sep: %s, quote: %s\n", SepAndQuote[0], SepAndQuote[1]) } } diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index 18fecf1ca..bfb41beaa 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -15,10 +15,8 @@ package mydump_test import ( "context" - "fmt" "log" "os" - "path/filepath" "github.com/pingcap/br/pkg/storage" @@ -71,18 +69,6 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { regions, err := MakeTableRegions(context.Background(), meta, 1, cfg, ioWorkers, loader.GetStore()) c.Assert(err, IsNil) - table := meta.Name - fmt.Printf("[%s] region count ===============> %d\n", table, len(regions)) - for _, region := range regions { - fname := filepath.Base(region.FileMeta.Path) - fmt.Printf("[%s] rowID = %5d / rows = %5d / offset = %10d / size = %10d \n", - fname, - region.RowIDMin(), - region.Rows(), - region.Offset(), - region.Size()) - } - // check - region-size vs file-size var tolFileSize int64 = 0 for _, file := range meta.DataFiles { From 665f02461f1b9aceff91bb30e34d61c8a9bd5041 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 23 Sep 2020 16:36:55 +0800 Subject: [PATCH 3/6] remove useless code --- lightning/mydump/csv_parser.go | 33 ++++++++------------------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/lightning/mydump/csv_parser.go b/lightning/mydump/csv_parser.go index 0f5912276..9130267f2 100644 --- a/lightning/mydump/csv_parser.go +++ b/lightning/mydump/csv_parser.go @@ -157,36 +157,19 @@ func (parser *CSVParser) readBytes(buf []byte) (int, error) { return cnt, nil } -func (parser *CSVParser) peekByte() (byte, error) { - if len(parser.buf) == 0 { - if err := parser.readBlock(); err != nil { - return 0, err - } - } - if len(parser.buf) == 0 { - return 0, io.EOF - } - return parser.buf[0], nil -} - func (parser *CSVParser) peekBytes(cnt int) ([]byte, error) { if len(parser.buf) < cnt { if err := parser.readBlock(); err != nil { - return []byte{}, err + return []byte{0}, err } } if len(parser.buf) == 0 { - return []byte{}, io.EOF + return []byte{0}, io.EOF } cnt = utils.MinInt(cnt, len(parser.buf)) return parser.buf[:cnt], nil } -func (parser *CSVParser) skipByte() { - parser.buf = parser.buf[1:] - parser.pos++ -} - func (parser *CSVParser) skipBytes(n int) { parser.buf = parser.buf[n:] parser.pos += int64(n) @@ -381,7 +364,7 @@ func (parser *CSVParser) readQuotedField() error { return err } parser.recordBuffer = append(parser.recordBuffer, content...) - parser.skipByte() + parser.skipBytes(1) switch terminator { case parser.quote[0]: if len(parser.quote) > 1 { @@ -396,12 +379,12 @@ func (parser *CSVParser) readQuotedField() error { parser.skipBytes(len(parser.quote) - 1) } // encountered '"' -> continue if we're seeing '""'. - b, err := parser.peekByte() + b, err := parser.peekBytes(1) err = parser.replaceEOF(err, nil) if err != nil { return err } - switch b { + switch b[0] { case parser.quote[0]: // consume the double quotation mark and continue if len(parser.quote) > 1 { @@ -440,7 +423,7 @@ func (parser *CSVParser) readUnquoteField() error { addByte := func(b byte) { // read the following byte parser.recordBuffer = append(parser.recordBuffer, b) - parser.skipByte() + parser.skipBytes(1) } parseQuote := func(b byte) error { r, err := parser.checkBytes(parser.quote) @@ -494,7 +477,7 @@ func (parser *CSVParser) readUnquoteField() error { return errors.AddStack(errUnexpectedQuoteField) } case '\\': - parser.skipByte() + parser.skipBytes(1) if err := parser.readByteForBackslashEscape(); err != nil { return err } @@ -592,7 +575,7 @@ func (parser *CSVParser) ReadUntilTokNewLine() (int64, error) { if err != nil { return 0, err } - parser.skipByte() + parser.skipBytes(1) return parser.pos, nil } From e8e87b79df87d58cab7cf55048b0de3e395e8d74 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 23 Sep 2020 17:12:33 +0800 Subject: [PATCH 4/6] fix unit test --- lightning/config/config.go | 4 ---- lightning/config/config_test.go | 16 +++++++++------- lightning/lightning_test.go | 2 +- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/lightning/config/config.go b/lightning/config/config.go index 12baedef4..e232c4883 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -392,10 +392,6 @@ func (cfg *Config) Adjust() error { return errors.New("invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other") } - if csv.Separator == csv.Delimiter { - return errors.New("invalid config: cannot use the same character for both CSV delimiter and separator") - } - if csv.BackslashEscape { if csv.Separator == `\` { return errors.New("invalid config: cannot use '\\' as CSV separator when `mydumper.csv.backslash-escape` is true") diff --git a/lightning/config/config_test.go b/lightning/config/config_test.go index 1626cb136..425072939 100644 --- a/lightning/config/config_test.go +++ b/lightning/config/config_test.go @@ -275,14 +275,15 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { [mydumper.csv] separator = '' `, - err: "invalid config: `mydumper.csv.separator` must be exactly one byte long", + err: "invalid config: `mydumper.csv.separator` must not be empty", }, { input: ` [mydumper.csv] separator = 'hello' + delimiter = 'hel' `, - err: "invalid config: `mydumper.csv.separator` must be exactly one byte long", + err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other", }, { input: ` @@ -297,7 +298,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { [mydumper.csv] separator = ',' `, - err: "invalid config: `mydumper.csv.separator` must be exactly one byte long", + err: "", }, { input: ` @@ -311,7 +312,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { [mydumper.csv] delimiter = 'hello' `, - err: "invalid config: `mydumper.csv.delimiter` must be one byte long or empty", + err: "", }, { input: ` @@ -324,9 +325,10 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { { input: ` [mydumper.csv] - delimiter = '“' + separator = '\s' + delimiter = '\d' `, - err: "invalid config: `mydumper.csv.delimiter` must be one byte long or empty", + err: "", }, { input: ` @@ -334,7 +336,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { separator = '|' delimiter = '|' `, - err: "invalid config: cannot use the same character for both CSV delimiter and separator", + err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other", }, { input: ` diff --git a/lightning/lightning_test.go b/lightning/lightning_test.go index e334d6b74..a45baf0e5 100644 --- a/lightning/lightning_test.go +++ b/lightning/lightning_test.go @@ -153,7 +153,7 @@ func (s *lightningServerSuite) TestRunServer(c *C) { c.Assert(data["error"], Matches, "cannot parse task.*") resp.Body.Close() - resp, err = http.Post(url, "application/toml", strings.NewReader("[mydumper.csv]\nseparator = 'fooo'")) + resp, err = http.Post(url, "application/toml", strings.NewReader("[mydumper.csv]\nseparator = 'fooo'\ndelimiter= 'foo'")) c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, http.StatusBadRequest) err = json.NewDecoder(resp.Body).Decode(&data) From d0304255c814f12ddce0e9cfaf590134a35e46dc Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 24 Sep 2020 17:31:41 +0800 Subject: [PATCH 5/6] use empty string for default quote --- lightning/config/config.go | 2 +- lightning/config/config_test.go | 8 ++++ lightning/mydump/bytes.go | 25 ++++------- lightning/mydump/csv_parser.go | 69 ++++++++++++++--------------- lightning/mydump/csv_parser_test.go | 1 + 5 files changed, 53 insertions(+), 52 deletions(-) diff --git a/lightning/config/config.go b/lightning/config/config.go index e232c4883..27559b232 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -388,7 +388,7 @@ func (cfg *Config) Adjust() error { return errors.New("invalid config: `mydumper.csv.separator` must not be empty") } - if len(csv.Delimiter) > 0 && (strings.Index(csv.Separator, csv.Delimiter) == 0 || strings.Index(csv.Delimiter, csv.Separator) == 0) { + if len(csv.Delimiter) > 0 && (strings.HasPrefix(csv.Separator, csv.Delimiter) || strings.HasPrefix(csv.Delimiter, csv.Separator)) { return errors.New("invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other") } diff --git a/lightning/config/config_test.go b/lightning/config/config_test.go index 425072939..d9e36df6e 100644 --- a/lightning/config/config_test.go +++ b/lightning/config/config_test.go @@ -285,6 +285,14 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { `, err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other", }, + { + input: ` + [mydumper.csv] + separator = 'hel' + delimiter = 'hello' + `, + err: "invalid config: `mydumper.csv.separator` and `mydumper.csv.delimiter` must not be prefix of each other", + }, { input: ` [mydumper.csv] diff --git a/lightning/mydump/bytes.go b/lightning/mydump/bytes.go index 91b2c79de..d11027520 100644 --- a/lightning/mydump/bytes.go +++ b/lightning/mydump/bytes.go @@ -9,17 +9,12 @@ package mydump -// asciiSet is a 32-byte value, where each bit represents the presence of a -// given ASCII character in the set. The 128-bits of the lower 16 bytes, -// starting with the least-significant bit of the lowest word to the -// most-significant bit of the highest word, map to the full range of all -// 128 ASCII characters. The 128-bits of the upper 16 bytes will be zeroed, -// ensuring that any non-ASCII character will be reported as not in the set. -type asciiSet [8]uint32 +// byteSet is a 32-byte value, where each bit represents the presence of a +// given byte value in the set. +type byteSet [8]uint32 -// makeByteSet creates a set of ASCII characters and reports whether all -// characters in chars are ASCII. -func makeByteSet(chars []byte) (as asciiSet) { +// makeByteSet creates a set of byte value. +func makeByteSet(chars []byte) (as byteSet) { for i := 0; i < len(chars); i++ { c := chars[i] as[c>>5] |= 1 << uint(c&31) @@ -28,19 +23,17 @@ func makeByteSet(chars []byte) (as asciiSet) { } // contains reports whether c is inside the set. -func (as *asciiSet) contains(c byte) bool { +func (as *byteSet) contains(c byte) bool { return (as[c>>5] & (1 << uint(c&31))) != 0 } -// IndexAnyAscii returns the byte index of the first occurrence in s of any of the Unicode -// code points in chars. It returns -1 if there is no code -// point in common. -func IndexAnyAscii(s []byte, as *asciiSet) int { +// IndexAnyByte returns the byte index of the first occurrence in s of any of the byte +// points in chars. It returns -1 if there is no code point in common. +func IndexAnyByte(s []byte, as *byteSet) int { for i, c := range s { if as.contains(c) { return i } } return -1 - } diff --git a/lightning/mydump/csv_parser.go b/lightning/mydump/csv_parser.go index 9130267f2..0cb5bffae 100644 --- a/lightning/mydump/csv_parser.go +++ b/lightning/mydump/csv_parser.go @@ -68,11 +68,6 @@ func NewCSVParser( ioWorkers *worker.Pool, shouldParseHeader bool, ) *CSVParser { - quote := []byte{0} - if len(cfg.Delimiter) > 0 { - quote = []byte(cfg.Delimiter) - } - escFlavor := backslashEscapeFlavorNone var quoteStopSet []byte unquoteStopSet := []byte{'\r', '\n', cfg.Separator[0]} @@ -94,7 +89,7 @@ func NewCSVParser( blockParser: makeBlockParser(reader, blockBufSize, ioWorkers), cfg: cfg, comma: []byte(cfg.Separator), - quote: quote, + quote: []byte(cfg.Delimiter), escFlavor: escFlavor, quoteIndexFunc: makeBytesIndexFunc(quoteStopSet), unquoteIndexFunc: makeBytesIndexFunc(unquoteStopSet), @@ -106,7 +101,7 @@ func makeBytesIndexFunc(chars []byte) func([]byte) int { // chars are guaranteed to be ascii str, so this call will always success as := makeByteSet(chars) return func(s []byte) int { - return IndexAnyAscii(s, &as) + return IndexAnyByte(s, &as) } } @@ -160,11 +155,11 @@ func (parser *CSVParser) readBytes(buf []byte) (int, error) { func (parser *CSVParser) peekBytes(cnt int) ([]byte, error) { if len(parser.buf) < cnt { if err := parser.readBlock(); err != nil { - return []byte{0}, err + return nil, err } } if len(parser.buf) == 0 { - return []byte{0}, io.EOF + return nil, io.EOF } cnt = utils.MinInt(cnt, len(parser.buf)) return parser.buf[:cnt], nil @@ -232,7 +227,7 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) { if len(parser.quote) > 1 { processQuote = func(b byte) error { pb, err := parser.peekBytes(len(parser.quote) - 1) - if err != nil && err != io.EOF { + if err != nil && errors.Cause(err) != io.EOF { return err } if bytes.Equal(pb, parser.quote[1:]) { @@ -249,12 +244,12 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) { } if len(parser.comma) > 1 { processNotComma := processDefault - if parser.comma[0] == parser.quote[0] { + if len(parser.quote) > 0 && parser.comma[0] == parser.quote[0] { processNotComma = processQuote } processComma = func(b byte) error { pb, err := parser.peekBytes(len(parser.comma) - 1) - if err != nil && err != io.EOF { + if err != nil && errors.Cause(err) != io.EOF { return err } if bytes.Equal(pb, parser.comma[1:]) { @@ -277,19 +272,19 @@ outside: firstByte = '\n' } - switch firstByte { - case parser.comma[0]: + switch { + case firstByte == parser.comma[0]: whitespaceLine = false if err = processComma(firstByte); err != nil { return nil, err } - case parser.quote[0]: + case len(parser.quote) > 0 && firstByte == parser.quote[0]: if err = processQuote(firstByte); err != nil { return nil, err } whitespaceLine = false - case '\r', '\n': + case firstByte == '\r', firstByte == '\n': // new line = end of record (ignore empty lines) if isEmptyLine { continue @@ -346,9 +341,8 @@ func (parser *CSVParser) readQuotedField() error { processComma := func() error { return nil } if len(parser.comma) > 1 { processComma = func() error { - b, err := parser.peekBytes(len(parser.comma)) - if err != nil && err != io.EOF { + if err != nil && errors.Cause(err) != io.EOF { return err } if !bytes.Equal(b, parser.comma) { @@ -365,8 +359,8 @@ func (parser *CSVParser) readQuotedField() error { } parser.recordBuffer = append(parser.recordBuffer, content...) parser.skipBytes(1) - switch terminator { - case parser.quote[0]: + switch { + case len(parser.quote) > 0 && terminator == parser.quote[0]: if len(parser.quote) > 1 { b, err := parser.peekBytes(len(parser.quote) - 1) if err != nil && err != io.EOF { @@ -380,8 +374,10 @@ func (parser *CSVParser) readQuotedField() error { } // encountered '"' -> continue if we're seeing '""'. b, err := parser.peekBytes(1) - err = parser.replaceEOF(err, nil) if err != nil { + if err == io.EOF { + err = nil + } return err } switch b[0] { @@ -402,7 +398,7 @@ func (parser *CSVParser) readQuotedField() error { } parser.skipBytes(len(parser.quote)) parser.recordBuffer = append(parser.recordBuffer, '"') - case '\r', '\n', 0: + case '\r', '\n': // end the field if the next is a separator return nil case parser.comma[0]: @@ -411,7 +407,7 @@ func (parser *CSVParser) readQuotedField() error { return processDefault() } - case '\\': + case terminator == '\\': if err := parser.readByteForBackslashEscape(); err != nil { return err } @@ -442,21 +438,27 @@ func (parser *CSVParser) readUnquoteField() error { addByte(b) return nil } - if parser.comma[0] == parser.quote[0] { + if len(parser.quote) > 0 && parser.comma[0] == parser.quote[0] { parserNoComma = parseQuote } for { content, terminator, err := parser.readUntil(parser.unquoteIndexFunc) parser.recordBuffer = append(parser.recordBuffer, content...) - err = parser.replaceEOF(err, nil) + finished := false if err != nil { - return err + if errors.Cause(err) == io.EOF { + finished = true + err = nil + } + if err != nil { + return err + } } - switch terminator { - case '\r', '\n', 0: + switch { + case terminator == '\r', terminator == '\n', finished: return nil - case parser.comma[0]: + case terminator == parser.comma[0]: r, err := parser.checkBytes(parser.comma) if err != nil { return errors.Trace(err) @@ -467,7 +469,7 @@ func (parser *CSVParser) readUnquoteField() error { if err = parserNoComma(terminator); err != nil { return err } - case parser.quote[0]: + case len(parser.quote) > 0 && terminator == parser.quote[0]: r, err := parser.checkBytes(parser.quote) if err != nil { return errors.Trace(err) @@ -476,7 +478,7 @@ func (parser *CSVParser) readUnquoteField() error { parser.logSyntaxError() return errors.AddStack(errUnexpectedQuoteField) } - case '\\': + case terminator == '\\': parser.skipBytes(1) if err := parser.readByteForBackslashEscape(); err != nil { return err @@ -568,7 +570,7 @@ func (parser *CSVParser) ReadColumns() error { var newLineAsciiSet = makeByteSet([]byte{'\r', '\n'}) func indexOfNewLine(b []byte) int { - return IndexAnyAscii(b, &newLineAsciiSet) + return IndexAnyByte(b, &newLineAsciiSet) } func (parser *CSVParser) ReadUntilTokNewLine() (int64, error) { _, _, err := parser.readUntil(indexOfNewLine) @@ -578,6 +580,3 @@ func (parser *CSVParser) ReadUntilTokNewLine() (int64, error) { parser.skipBytes(1) return parser.pos, nil } - -type node struct { -} diff --git a/lightning/mydump/csv_parser_test.go b/lightning/mydump/csv_parser_test.go index 63cef6ae8..144a5facb 100644 --- a/lightning/mydump/csv_parser_test.go +++ b/lightning/mydump/csv_parser_test.go @@ -170,6 +170,7 @@ func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { datums := tpchDatums() deliAndSeps := [][2]string{ {",", ""}, + {"\000", ""}, {",", ""}, {"🤔", ""}, {",", "。"}, From 02d71b4b8bbe81c630fd6a5d104d4e8c3f1186b5 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 25 Sep 2020 17:13:16 +0800 Subject: [PATCH 6/6] update comments in tidb-lightning.toml for separator and delimiter --- tidb-lightning.toml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tidb-lightning.toml b/tidb-lightning.toml index bbff2a1d0..e6932d538 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -148,9 +148,11 @@ filter = ['*.*'] # CSV files are imported according to MySQL's LOAD DATA INFILE rules. [mydumper.csv] -# separator between fields, should be an ASCII character. +# separator between fields, can be one or more characters but empty. The value can +# not be prefix of `delimiter`. separator = ',' -# string delimiter, can either be an ASCII character or empty string. +# string delimiter, can either be one or more characters or empty string. If not empty, +# the value should not be prefix of `separator` delimiter = '"' # whether the CSV files contain a header. If true, the first line will be skipped header = true