Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: fix lightning split large csv file error and adjust s3 seek result #27769

Merged
merged 36 commits into from
Sep 8, 2021
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
050e33c
fix lightning populate chunks
glorv Sep 2, 2021
c506864
fix comment
glorv Sep 2, 2021
6338289
Merge branch 'master' into fix-chunk
glorv Sep 2, 2021
3a7e535
Merge branch 'master' into fix-chunk
glorv Sep 3, 2021
d87cdb8
resolve comments
glorv Sep 6, 2021
30f32d1
fix seek
glorv Sep 6, 2021
b79e04e
add check for the seek position
glorv Sep 6, 2021
2dc464d
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
368ced5
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
429d2ac
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
3e76255
resolve comments
glorv Sep 6, 2021
d631f2b
Merge branch 'fix-chunk' of ssh://github.com/glorv/tidb into fix-chunk
glorv Sep 6, 2021
77855eb
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
8441774
fmt
glorv Sep 6, 2021
9878f8f
Merge branch 'master' into fix-chunk
glorv Sep 6, 2021
01c4e53
fix typo
glorv Sep 6, 2021
f78fba4
Merge branch 'master' into fix-chunk
kennytm Sep 6, 2021
22afcc9
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 6, 2021
519ebec
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 6, 2021
7d23699
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 6, 2021
7943a3c
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
88d493e
Merge branch 'master' into fix-chunk
glorv Sep 7, 2021
67b0813
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
32ad3b6
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
cfb9255
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
9db8405
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
00f0fa3
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
5d2392a
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
acb6d0d
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
4e50b52
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
25a157e
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
b247989
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 7, 2021
a1fbc70
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 8, 2021
3360a5e
Merge branch 'master' into fix-chunk
ti-chi-bot Sep 8, 2021
689f411
fix unit test
glorv Sep 8, 2021
69a37a3
Merge branch 'fix-chunk' of ssh://github.com/glorv/tidb into fix-chunk
glorv Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix lightning populate chunks
glorv committed Sep 2, 2021
commit 050e33cd219bd6da0857e1bf592c4fa877fab7ff
5 changes: 4 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
@@ -268,7 +268,7 @@ func makeSourceFileRegion(
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat {
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize)*11/10 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the file size if slightly bigger the int64(cfg.Mydumper.MaxRegionSize)*11/10? 🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the file will be split by cfg.Mydumper.MaxRegionSize, so the second chunk size is about 1/10 * cfg.Mydumper.MaxRegionSize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the file size is slightly bigger the int64(cfg.Mydumper.MaxRegionSize)* 2, the third chunk size is very small, will this be a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big problem. The common case is that the data export tool (like dumpling or mydumper) set the exported file size with cfg.Mydumper.MaxRegionSize, but the output file size might be slightly bigger or smaller, so we can avoid split a lot of small chunks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make that 11/10 a named constant...

Copy link
Contributor Author

@glorv glorv Sep 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will make the code a bit ugly because 11/10 is a float. I add a code comment to explain why the threshold need to be increased😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can make "10" a constant and set the upper limit to MaxRegionSize + MaxRegionSize/10.

_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
@@ -359,6 +359,9 @@ func SplitLargeFile(
columns = parser.Columns()
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
endOffset = dataFile.FileMeta.FileSize
}
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
56 changes: 56 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
@@ -331,3 +331,59 @@ func (s *testMydumpRegionSuite) TestSplitLargeFileWithCustomTerminator(c *C) {
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
}
}

func (s *testMydumpRegionSuite) TestSplitLargeFileOnlyOneChunk(c *C) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
MaxRegionSize: 15,
},
}

dir := c.MkDir()

fileName := "test.csv"
filePath := filepath.Join(dir, fileName)

content := []byte("field1,field2\r\n123,456\r\n")
err := os.WriteFile(filePath, content, 0o644)
c.Assert(err, IsNil)

dataFileInfo, err := os.Stat(filePath)
c.Assert(err, IsNil)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: fileName, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := int64(2)
columns := []string{"field1", "field2"}
prevRowIdxMax := int64(0)
ioWorker := worker.NewPool(context.Background(), 4, "io")

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

offsets := [][]int64{{14, 24}}

_, regions, _, err := SplitLargeFile(context.Background(), meta, cfg, fileInfo, colCnt, prevRowIdxMax, ioWorker, store)
c.Assert(err, IsNil)
c.Assert(regions, HasLen, len(offsets))
for i := range offsets {
c.Assert(regions[i].Chunk.Offset, Equals, offsets[i][0])
c.Assert(regions[i].Chunk.EndOffset, Equals, offsets[i][1])
c.Assert(regions[i].Chunk.Columns, DeepEquals, columns)
}
}
23 changes: 23 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
@@ -648,6 +648,17 @@ func (r *s3ObjectReader) Close() error {
return r.reader.Close()
}

// eofReader is a io.ReaderClose Reader that always return io.EOF
type eofReader struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional io.NopCloser(bytes.NewReader(nil))


func (r eofReader) Read([]byte) (n int, err error) {
return 0, io.EOF
}

func (r eofReader) Close() error {
return nil
}

// Seek implement the io.Seeker interface.
//
// Currently, tidb-lightning depends on this method to read parquet file for s3 storage.
@@ -666,6 +677,18 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {

if realOffset == r.pos {
return realOffset, nil
} else if realOffset >= r.rangeInfo.Size {
// See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// because s3's GetObject interface doesn't all a range that matches zero lenghth data,
// so if the position is out of range, we need to always return io.EOF after the seek operation.

// close current read and open a new one which target offset
if err := r.reader.Close(); err != nil {
log.L().Warn("close s3 reader failed, will ignore this error", logutil.ShortError(err))
}

r.reader = eofReader{}
return r.rangeInfo.Size, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.pos need to be updated in case subsequent seeks. Also, should we return realOffset rather than r.rangeInfo.Size? I didn't find any specifications for this behavior, but returning realOffset is more consistent with linux filesystem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I think we should return real position which is r.rangeInfo.Size, since the behavior isn't the same as linux fs. fs implement ReaderWriter and allow to write to a position that is larger that the current end and leave the file with a hole. But here we don't. Though there is no much difference.

}

// if seek ahead no more than 64k, we discard these data
9 changes: 9 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
@@ -740,6 +740,15 @@ func (s *s3Suite) TestOpenSeek(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, 100)
c.Assert(slice, DeepEquals, someRandomBytes[990100:990200])

// test seek to the file end or bigger positions
for _, p := range []int64{1000000, 1000001, 2000000} {
offset, err = reader.Seek(p, io.SeekStart)
c.Assert(offset, Equals, int64(1000000))
c.Assert(err, IsNil)
_, err = reader.Read(slice)
c.Assert(err, Equals, io.EOF)
}
}

type limitedBytesReader struct {