Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#44333
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Jun 30, 2023
1 parent bd94fb8 commit 56b2329
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 1 deletion.
89 changes: 89 additions & 0 deletions br/pkg/lightning/mydump/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "mydump",
srcs = [
"bytes.go",
"charset_convertor.go",
"csv_parser.go",
"loader.go",
"parquet_parser.go",
"parser.go",
"parser_generated.go",
"reader.go",
"region.go",
"router.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/mydump",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/lightning/metric",
"//br/pkg/lightning/worker",
"//br/pkg/storage",
"//config",
"//parser/mysql",
"//types",
"//util/filter",
"//util/mathutil",
"//util/regexpr-router",
"//util/slice",
"//util/table-filter",
"@com_github_pingcap_errors//:errors",
"@com_github_spkg_bom//:bom",
"@com_github_xitongsys_parquet_go//parquet",
"@com_github_xitongsys_parquet_go//reader",
"@com_github_xitongsys_parquet_go//source",
"@org_golang_x_text//encoding",
"@org_golang_x_text//encoding/simplifiedchinese",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

go_test(
name = "mydump_test",
timeout = "short",
srcs = [
"charset_convertor_test.go",
"csv_parser_test.go",
"loader_test.go",
"main_test.go",
"parquet_parser_test.go",
"parser_test.go",
"reader_test.go",
"region_test.go",
"router_test.go",
],
data = glob([
"csv/*",
"examples/*",
"parquet/*",
]),
embed = [":mydump"],
flaky = True,
deps = [
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/lightning/worker",
"//br/pkg/mock/storage",
"//br/pkg/storage",
"//parser/mysql",
"//testkit/testsetup",
"//types",
"//util/filter",
"//util/table-filter",
"//util/table-router",
"@com_github_golang_mock//gomock",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//local",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
)
21 changes: 20 additions & 1 deletion br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
tidbconfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mathutil"
)
Expand All @@ -30,8 +31,14 @@ var (
errUnterminatedQuotedField = errors.NewNoStackError("syntax error: unterminated quoted field")
errDanglingBackslash = errors.NewNoStackError("syntax error: no character after backslash")
errUnexpectedQuoteField = errors.NewNoStackError("syntax error: cannot have consecutive fields without separator")
// LargestEntryLimit is the max size for reading file to buf
LargestEntryLimit int
)

func init() {
LargestEntryLimit = tidbconfig.MaxTxnEntrySizeLimit
}

// CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
type CSVParser struct {
blockParser
Expand Down Expand Up @@ -331,6 +338,9 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) {
var buf []byte
for {
buf = append(buf, parser.buf...)
if len(buf) > LargestEntryLimit {
return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit")
}
parser.buf = nil
if err := parser.readBlock(); err != nil || len(parser.buf) == 0 {
if err == nil {
Expand Down Expand Up @@ -442,9 +452,18 @@ outside:

func (parser *CSVParser) readQuotedField() error {
for {
prevPos := parser.pos
content, terminator, err := parser.readUntil(&parser.quoteByteSet)
err = parser.replaceEOF(err, errUnterminatedQuotedField)
if err != nil {
if errors.Cause(err) == io.EOF {
// return the position of quote to the caller.
// because we return an error here, the parser won't
// use the `pos` again, so it's safe to modify it here.
parser.pos = prevPos - 1
// set buf to parser.buf in order to print err log
parser.buf = content
err = parser.replaceEOF(err, errUnterminatedQuotedField)
}
return err
}
parser.recordBuffer = append(parser.recordBuffer, content...)
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mydump_test

import (
"bytes"
"context"
"encoding/csv"
"fmt"
Expand Down Expand Up @@ -680,6 +681,29 @@ func TestConsecutiveFields(t *testing.T) {
})
}

func TestTooLargeRow(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
},
}
var testCase bytes.Buffer
testCase.WriteString("a,b,c,d")
// WARN: will take up 10KB memory here.
mydump.LargestEntryLimit = 10 * 1024
for i := 0; i < mydump.LargestEntryLimit; i++ {
testCase.WriteByte('d')
}
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
require.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor)
require.NoError(t, err)
e := parser.ReadRow()
require.Error(t, e)
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
}

func TestSpecialChars(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{Separator: ",", Delimiter: `"`},
Expand Down
1 change: 1 addition & 0 deletions br/tests/lightning_csv/errData/db-schema-create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create database if not exists db;
1 change: 1 addition & 0 deletions br/tests/lightning_csv/errData/db.test-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table test(a int primary key, b int, c int, d int);
3 changes: 3 additions & 0 deletions br/tests/lightning_csv/errData/db.test.1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,2,3,4
2,10,4,5
1111,",7,8
8 changes: 8 additions & 0 deletions br/tests/lightning_csv/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ for BACKEND in tidb local; do
check_not_contains 'id:'

done

set +e
run_lightning --backend local -d "tests/$TEST_NAME/errData" --log-file "$TEST_DIR/lightning-err.log" 2>/dev/null
set -e
# err content presented
grep ",7,8" "$TEST_DIR/lightning-err.log"
# pos should not set to end
grep "[\"syntax error\"] [pos=22]" "$TEST_DIR/lightning-err.log"
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
// Config number limitations
const (
MaxLogFileSize = 4096 // MB
// MaxTxnEntrySize is the max value of TxnEntrySizeLimit.
MaxTxnEntrySizeLimit = 120 * 1024 * 1024 // 120MB
// DefTxnEntrySizeLimit is the default value of TxnEntrySizeLimit.
DefTxnEntrySizeLimit = 6 * 1024 * 1024
// DefTxnTotalSizeLimit is the default value of TxnTxnTotalSizeLimit.
Expand Down
10 changes: 10 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,18 @@ func setGlobalVars() {
}
plannercore.AllowCartesianProduct.Store(cfg.Performance.CrossJoin)
privileges.SkipWithGrant = cfg.Security.SkipGrantTable
<<<<<<< HEAD
kv.TxnTotalSizeLimit = cfg.Performance.TxnTotalSizeLimit
if cfg.Performance.TxnEntrySizeLimit > 120*1024*1024 {
=======
if cfg.Performance.TxnTotalSizeLimit == config.DefTxnTotalSizeLimit {
// practically deprecate the config, let the new session memory tracker take charge of it.
kv.TxnTotalSizeLimit = config.SuperLargeTxnSize
} else {
kv.TxnTotalSizeLimit = cfg.Performance.TxnTotalSizeLimit
}
if cfg.Performance.TxnEntrySizeLimit > config.MaxTxnEntrySizeLimit {
>>>>>>> bb2e845f712 (lightning: fix risk of OOM (#40443) (#44333))
log.Fatal("cannot set txn entry size limit larger than 120M")
}
kv.TxnEntrySizeLimit = cfg.Performance.TxnEntrySizeLimit
Expand Down

0 comments on commit 56b2329

Please sign in to comment.