diff --git a/.gitignore b/.gitignore index b5731b364feaf..258cc7205f0d9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .DS_Store bin/ + +.idea \ No newline at end of file diff --git a/Makefile b/Makefile index 9643dd3a5832b..e864aeaab00a6 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ endif build: bin/dumpling bin/%: cmd/%/main.go $(wildcard v4/**/*.go) - $(GO) build $(GOFLAGS) -o $@ $< + $(GO) build $(GOFLAGS) -tags codes -o $@ $< test: $(GO) list ./... | xargs $(GO) test $(GOFLAGS) diff --git a/cmd/dumpling/main.go b/cmd/dumpling/main.go index 246d2ad8dced0..f44b5d22b4887 100644 --- a/cmd/dumpling/main.go +++ b/cmd/dumpling/main.go @@ -14,9 +14,70 @@ package main import ( + "flag" + "fmt" + _ "net/http/pprof" + "os" + "github.com/pingcap/dumpling/v4/cli" + "github.com/pingcap/dumpling/v4/export" +) + +var ( + database string + host string + user string + port int + password string + threads int + outputDir string + fileSize uint64 ) +func init() { + flag.StringVar(&database, "database", "", "Database to dump") + flag.StringVar(&database, "B", "", "Database to dump") + + flag.StringVar(&host, "h", "127.0.0.1", "The host to connect to") + flag.StringVar(&host, "host", "127.0.0.1", "The host to connect to") + + flag.StringVar(&user, "user", "root", "Username with privileges to run the dump") + flag.StringVar(&user, "u", "root", "Username with privileges to run the dump") + + flag.IntVar(&port, "port", 4000, "TCP/IP port to connect to") + flag.IntVar(&port, "P", 4000, "TCP/IP port to connect to") + + flag.StringVar(&password, "password", "", "User password") + flag.StringVar(&password, "p", "", "User password") + + flag.IntVar(&threads, "threads", 4, "Number of goroutines to use, default 4") + flag.IntVar(&threads, "t", 4, "Number of goroutines to use, default 4") + + flag.Uint64Var(&fileSize, "F", export.UnspecifiedSize, "The approximate size of output file") + flag.Uint64Var(&fileSize, "filesize", export.UnspecifiedSize, "The approximate size of output file") + + flag.StringVar(&outputDir, "output", ".", "Output directory") + flag.StringVar(&outputDir, "o", ".", "Output directory") +} + func main() { + flag.Parse() println(cli.LongVersion()) + + conf := export.DefaultConfig() + conf.Database = database + conf.Host = host + conf.User = user + conf.Port = port + conf.Password = password + conf.Threads = threads + conf.FileSize = fileSize + conf.OutputDirPath = outputDir + + err := export.Dump(conf) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + return } diff --git a/go.mod b/go.mod index 47b16ed59bc62..0972af22464ab 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module github.com/pingcap/dumpling +require ( + github.com/go-sql-driver/mysql v1.4.1 + github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4 +) + go 1.13 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000000000..7b4543ebc7059 --- /dev/null +++ b/go.sum @@ -0,0 +1,71 @@ +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= +github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4 h1:iRtOAQ6FXkY/BGvst3CDfTva4nTqh6CL8WXvanLdbu0= +github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc= +github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4= +github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= +github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY= +go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.4.0 h1:f3WCSC2KzAcBXGATIxAB1E2XuCpNU255wNKZ505qi3E= +go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.12.0 h1:dySoUQPFBGj6xwjmBzageVL8jGi8uxc6bEmJQjA06bw= +go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191107010934-f79515f33823 h1:akkRBeitX2EZP59KdtKw310CI4WGPCNPyrLbE7WZA8Y= +golang.org/x/tools v0.0.0-20191107010934-f79515f33823/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/v4/export/config.go b/v4/export/config.go new file mode 100644 index 0000000000000..bc29ce7e7a6d8 --- /dev/null +++ b/v4/export/config.go @@ -0,0 +1,55 @@ +package export + +import ( + "fmt" +) + +type Config struct { + Database string + Host string + User string + Port int + Password string + Threads int + + Logger Logger + FileSize uint64 + OutputDirPath string +} + +func DefaultConfig() *Config { + return &Config{ + Database: "", + Host: "127.0.0.1", + User: "root", + Port: 3306, + Password: "", + Threads: 4, + Logger: &DummyLogger{}, + FileSize: UnspecifiedSize, + OutputDirPath: ".", + } +} + +func (conf *Config) getDSN(db string) string { + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", conf.User, conf.Password, conf.Host, conf.Port, db) +} + +func extractOutputConfig(conf *Config) *Config { + return &Config{ + Logger: conf.Logger, + FileSize: conf.FileSize, + OutputDirPath: conf.OutputDirPath, + } +} + +type WriteConfig struct { + // Logger is used to log the export routine. + Logger Logger + // Output size limit in bytes. + OutputSize int + // OutputDirPath is the directory to output. + OutputDirPath string +} + +const UnspecifiedSize = 0 diff --git a/v4/export/dump.go b/v4/export/dump.go new file mode 100644 index 0000000000000..9b430119ff15d --- /dev/null +++ b/v4/export/dump.go @@ -0,0 +1,208 @@ +package export + +import ( + "context" + "database/sql" + "fmt" + "strings" + "sync" + + _ "github.com/go-sql-driver/mysql" +) + +func Dump(conf *Config) error { + pool, err := sql.Open("mysql", conf.getDSN("")) + if err != nil { + return withStack(err) + } + + var databases []string + if conf.Database == "" { + var err error + databases, err = showDatabases(pool) + if err != nil { + pool.Close() + return err + } + } else { + databases = strings.Split(conf.Database, ",") + } + pool.Close() + + for _, database := range databases { + fsWriter, err := NewSimpleWriter(extractOutputConfig(conf)) + if err != nil { + return err + } + if err := dumpDatabase(context.Background(), conf, database, fsWriter); err != nil { + return err + } + } + return nil +} + +func simpleQuery(db *sql.DB, sql string, handleOneRow func(*sql.Rows) error) error { + rows, err := db.Query(sql) + if err != nil { + return withStack(err) + } + + for rows.Next() { + if err := handleOneRow(rows); err != nil { + rows.Close() + return withStack(err) + } + } + return nil +} + +type oneStrColumnTable struct { + data []string +} + +func (o *oneStrColumnTable) handleOneRow(rows *sql.Rows) error { + var str string + if err := rows.Scan(&str); err != nil { + return withStack(err) + } + o.data = append(o.data, str) + return nil +} + +func showDatabases(db *sql.DB) ([]string, error) { + var res oneStrColumnTable + if err := simpleQuery(db, "SHOW DATABASES", res.handleOneRow); err != nil { + return nil, err + } + return res.data, nil +} + +// showTables shows the tables of a database, the caller should use the correct database. +func showTables(db *sql.DB) ([]string, error) { + var res oneStrColumnTable + if err := simpleQuery(db, "SHOW TABLES", res.handleOneRow); err != nil { + return nil, err + } + return res.data, nil +} + +func showCreateDatabase(db *sql.DB, database string) (string, error) { + var oneRow [2]string + handleOneRow := func(rows *sql.Rows) error { + return rows.Scan(&oneRow[0], &oneRow[1]) + } + query := fmt.Sprintf("SHOW CREATE DATABASE %s", database) + err := simpleQuery(db, query, handleOneRow) + if err != nil { + return "", err + } + return oneRow[1], nil +} + +func showCreateTable(db *sql.DB, database, table string) (string, error) { + var oneRow [2]string + handleOneRow := func(rows *sql.Rows) error { + return rows.Scan(&oneRow[0], &oneRow[1]) + } + query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", database, table) + err := simpleQuery(db, query, handleOneRow) + if err != nil { + return "", err + } + return oneRow[1], nil +} + +func dumpDatabase(ctx context.Context, conf *Config, dbName string, writer Writer) error { + dsn := conf.getDSN(dbName) + db, err := sql.Open("mysql", dsn) + if err != nil { + return withStack(err) + } + defer db.Close() + + createDatabaseSQL, err := showCreateDatabase(db, dbName) + if err != nil { + return err + } + if err := writer.WriteDatabaseMeta(ctx, dbName, createDatabaseSQL); err != nil { + return err + } + + tables, err := showTables(db) + if err != nil { + return err + } + + rateLimit := newRateLimit(conf.Threads) + var wg sync.WaitGroup + wg.Add(len(tables)) + res := make([]error, len(tables)) + for i, table := range tables { + go func(ith int, table string, wg *sync.WaitGroup, res []error) { + defer wg.Done() + createTableSQL, err := showCreateTable(db, dbName, table) + if err != nil { + res[ith] = err + return + } + if err := writer.WriteTableMeta(ctx, dbName, table, createTableSQL); err != nil { + res[ith] = err + return + } + + rateLimit.getToken() + tableIR, err := dumpTable(ctx, db, dbName, table) + defer rateLimit.putToken() + if err != nil { + res[ith] = err + return + } + + if err := writer.WriteTableData(ctx, tableIR); err != nil { + res[ith] = err + return + } + }(i, table, &wg, res) + } + wg.Wait() + for _, err := range res { + if err != nil { + return err + } + } + return nil +} + +func getColumnTypes(db *sql.DB, table string) ([]*sql.ColumnType, error) { + rows, err := db.Query(fmt.Sprintf("SELECT * FROM %s LIMIT 1", table)) + if err != nil { + return nil, withStack(err) + } + defer rows.Close() + return rows.ColumnTypes() +} + +type tableDumper interface { + handleOneRow(ctx context.Context, rows *sql.Rows) error + prepareColumns(ctx context.Context, colTypes []*sql.ColumnType) + finishTable(ctx context.Context) +} + +func dumpTable(ctx context.Context, db *sql.DB, database, table string) (TableDataIR, error) { + colTypes, err := getColumnTypes(db, table) + if err != nil { + return nil, err + } + + rows, err := db.Query(fmt.Sprintf("SELECT * from %s", table)) + if err != nil { + return nil, withStack(err) + } + + return &tableData{ + database: database, + table: table, + rows: rows, + colTypes: colTypes, + }, nil +} diff --git a/v4/export/error.go b/v4/export/error.go new file mode 100644 index 0000000000000..3e987ae76ecd2 --- /dev/null +++ b/v4/export/error.go @@ -0,0 +1,38 @@ +package export + +import ( + "errors" + "runtime/debug" + "strings" +) + +type errWithStack struct { + stack []byte + raw error +} + +func (e errWithStack) Unwrap() error { + return e.raw +} + +func (e errWithStack) Error() string { + var b strings.Builder + b.WriteString("err = ") + b.WriteString(e.raw.Error()) + b.WriteString("\n") + b.Write(e.stack) + return b.String() +} + +var stackErr errWithStack + +func withStack(err error) error { + if errors.Is(err, stackErr) { + return err + } + + return errWithStack{ + raw: err, + stack: debug.Stack(), + } +} diff --git a/v4/export/ir.go b/v4/export/ir.go new file mode 100644 index 0000000000000..9d5037a5fe72e --- /dev/null +++ b/v4/export/ir.go @@ -0,0 +1,54 @@ +package export + +import "database/sql" + +// TableDataIR is table data intermediate representation. +type TableDataIR interface { + DatabaseName() string + TableName() string + ColumnCount() uint + ColumnTypes() []string + + SpecialComments() StringIter + Rows() SQLRowIter +} + +// SQLRowIter is the iterator on a collection of sql.Row. +type SQLRowIter interface { + Next(RowReceiver) error + HasNext() bool +} + +type RowReceiver interface { + BindAddress([]interface{}) + ReportSize() uint64 +} + +func decodeFromRows(rows *sql.Rows, args []interface{}, row RowReceiver) error { + row.BindAddress(args) + if err := rows.Scan(args...); err != nil { + rows.Close() + return withStack(err) + } + return nil +} + +// StringIter is the iterator on a collection of strings. +type StringIter interface { + Next() string + HasNext() bool +} + +type MetaIR interface { + SpecialComments() StringIter + TargetName() string + MetaSQL() string +} + +// Logger used for logging when export. +type Logger interface { + Debug(format string, args ...interface{}) + Info(format string, args ...interface{}) + Warn(format string, args ...interface{}) + Error(format string, args ...interface{}) +} diff --git a/v4/export/ir_impl.go b/v4/export/ir_impl.go new file mode 100644 index 0000000000000..0c627c77e7621 --- /dev/null +++ b/v4/export/ir_impl.go @@ -0,0 +1,142 @@ +package export + +import ( + "database/sql" +) + +// rowIter implements the SQLRowIter interface. +type rowIter struct { + rows *sql.Rows + args []interface{} +} + +func (iter *rowIter) Next(row RowReceiver) error { + return decodeFromRows(iter.rows, iter.args, row) +} + +func (iter *rowIter) HasNext() bool { + return iter.rows.Next() +} + +type sizedRowIter struct { + rowIter SQLRowIter + sizeLimit uint64 + currentSize uint64 +} + +func (s *sizedRowIter) Next(row RowReceiver) error { + err := s.rowIter.Next(row) + if err != nil { + return err + } + s.currentSize += row.ReportSize() + return nil +} + +func (s *sizedRowIter) HasNext() bool { + if s.currentSize > s.sizeLimit { + return false + } + return s.rowIter.HasNext() +} + +type stringIter struct { + idx int + ss []string +} + +func newStringIter(ss ...string) StringIter { + return &stringIter{ + idx: 0, + ss: ss, + } +} + +func (m *stringIter) Next() string { + if m.idx >= len(m.ss) { + return "" + } + ret := m.ss[m.idx] + m.idx += 1 + return ret +} + +func (m *stringIter) HasNext() bool { + return m.idx < len(m.ss) +} + +type tableData struct { + database string + table string + rows *sql.Rows + colTypes []*sql.ColumnType + specCmts []string +} + +func (td *tableData) ColumnTypes() []string { + colTypes := make([]string, len(td.colTypes)) + for i, ct := range td.colTypes { + colTypes[i] = ct.DatabaseTypeName() + } + return colTypes +} + +func (td *tableData) DatabaseName() string { + return td.database +} + +func (td *tableData) TableName() string { + return td.table +} + +func (td *tableData) ColumnCount() uint { + return uint(len(td.colTypes)) +} + +func (td *tableData) Rows() SQLRowIter { + return &rowIter{ + rows: td.rows, + args: make([]interface{}, len(td.colTypes)), + } +} + +func (td *tableData) SpecialComments() StringIter { + return newStringIter(td.specCmts...) +} + +type tableDataChunks struct { + TableDataIR + sizeLimit uint64 +} + +func (t *tableDataChunks) Rows() SQLRowIter { + return &sizedRowIter{ + rowIter: t.Rows(), + sizeLimit: t.sizeLimit, + } +} + +func splitTableDataIntoChunks(td TableDataIR, chunkSize uint64) *tableDataChunks { + return &tableDataChunks{ + TableDataIR: td, + sizeLimit: chunkSize, + } +} + +type metaData struct { + target string + metaSQL string + specCmts []string +} + +func (m *metaData) SpecialComments() StringIter { + return newStringIter(m.specCmts...) +} + +func (m *metaData) TargetName() string { + return m.target +} + +func (m *metaData) MetaSQL() string { + return m.metaSQL +} diff --git a/v4/export/ratelimit.go b/v4/export/ratelimit.go new file mode 100644 index 0000000000000..aafb56957783f --- /dev/null +++ b/v4/export/ratelimit.go @@ -0,0 +1,23 @@ +package export + +type rateLimit struct { + token chan struct{} +} + +func newRateLimit(n int) *rateLimit { + return &rateLimit{ + token: make(chan struct{}, n), + } +} + +func (r *rateLimit) getToken() { + r.token <- struct{}{} +} + +func (r *rateLimit) putToken() { + select { + case <-r.token: + default: + panic("put a redundant token") + } +} diff --git a/v4/export/test_util.go b/v4/export/test_util.go new file mode 100644 index 0000000000000..7f8cc8b6cc597 --- /dev/null +++ b/v4/export/test_util.go @@ -0,0 +1,156 @@ +package export + +import ( + "database/sql" + "fmt" +) + +type mockStringWriter struct { + buf string +} + +func (m *mockStringWriter) WriteString(s string) (int, error) { + if s == "poison" { + return 0, fmt.Errorf("poison_error") + } + m.buf = s + return len(s), nil +} + +type mockStringCollector struct { + buf string +} + +func (m *mockStringCollector) WriteString(s string) (int, error) { + m.buf += s + return len(s), nil +} + +type mockSQLRowIterator struct { + idx int + data [][]sql.NullString +} + +func (m *mockSQLRowIterator) Next(sp RowReceiver) error { + args := make([]interface{}, len(m.data[m.idx])) + sp.BindAddress(args) + for i := range args { + *(args[i]).(*sql.NullString) = m.data[m.idx][i] + } + m.idx += 1 + return nil +} + +func (m *mockSQLRowIterator) HasNext() bool { + return m.idx < len(m.data) +} + +type mockMetaIR struct { + tarName string + meta string + specCmt []string +} + +func (m *mockMetaIR) SpecialComments() StringIter { + return newStringIter(m.specCmt...) +} + +func (m *mockMetaIR) TargetName() string { + return m.tarName +} + +func (m *mockMetaIR) MetaSQL() string { + return m.meta +} + +func newMockMetaIR(targetName string, meta string, specialComments []string) MetaIR { + return &mockMetaIR{ + tarName: targetName, + meta: meta, + specCmt: specialComments, + } +} + +func makeNullString(ss []string) []sql.NullString { + var ns []sql.NullString + for _, s := range ss { + if len(s) != 0 { + ns = append(ns, sql.NullString{String: s, Valid: true}) + } else { + ns = append(ns, sql.NullString{Valid: false}) + } + } + return ns +} + +type mockTableDataIR struct { + dbName string + tblName string + data [][]sql.NullString + specCmt []string + colTypes []string +} + +func (m *mockTableDataIR) ColumnTypes() []string { + return m.colTypes +} + +func newMockTableDataIR(databaseName, tableName string, data [][]string, specialComments []string, colTypes []string) TableDataIR { + var nData [][]sql.NullString + for _, ss := range data { + nData = append(nData, makeNullString(ss)) + } + + return &mockTableDataIR{ + dbName: databaseName, + tblName: tableName, + data: nData, + specCmt: specialComments, + colTypes: colTypes, + } +} + +func (m *mockTableDataIR) DatabaseName() string { + return m.dbName +} + +func (m *mockTableDataIR) TableName() string { + return "employee" +} + +func (m *mockTableDataIR) ColumnCount() uint { + return 5 +} + +func (m *mockTableDataIR) SpecialComments() StringIter { + return newStringIter(m.specCmt...) +} + +func (m *mockTableDataIR) Rows() SQLRowIter { + return &mockSQLRowIterator{ + idx: 0, + data: m.data, + } +} + +type DummyLogger struct{} + +func (d *DummyLogger) Debug(format string, args ...interface{}) { + fmt.Printf(format, args...) + fmt.Println() +} + +func (d *DummyLogger) Info(format string, args ...interface{}) { + fmt.Printf(format, args...) + fmt.Println() +} + +func (d *DummyLogger) Warn(format string, args ...interface{}) { + fmt.Printf(format, args...) + fmt.Println() +} + +func (d *DummyLogger) Error(format string, args ...interface{}) { + fmt.Printf(format, args...) + fmt.Println() +} diff --git a/v4/export/writer.go b/v4/export/writer.go new file mode 100644 index 0000000000000..f2588d3e8230e --- /dev/null +++ b/v4/export/writer.go @@ -0,0 +1,115 @@ +package export + +import ( + "context" + "fmt" + "io" + "os" + "path" + "sync" +) + +type Writer interface { + WriteDatabaseMeta(ctx context.Context, db, createSQL string) error + WriteTableMeta(ctx context.Context, db, table, createSQL string) error + WriteTableData(ctx context.Context, ir TableDataIR) error +} + +type SimpleWriter struct { + cfg *Config +} + +func NewSimpleWriter(config *Config) (Writer, error) { + sw := &SimpleWriter{cfg: config} + return sw, os.MkdirAll(config.OutputDirPath, 0755) +} + +func (f *SimpleWriter) WriteDatabaseMeta(ctx context.Context, db, createSQL string) error { + fileName := path.Join(f.cfg.OutputDirPath, fmt.Sprintf("%s-schema-create.sql", db)) + fsStringWriter := NewFileSystemStringWriter(fileName, false) + meta := &metaData{ + target: db, + metaSQL: createSQL, + } + return WriteMeta(meta, fsStringWriter, f.cfg) +} + +func (f *SimpleWriter) WriteTableMeta(ctx context.Context, db, table, createSQL string) error { + fileName := path.Join(f.cfg.OutputDirPath, fmt.Sprintf("%s.%s-schema.sql", db, table)) + fsStringWriter := NewFileSystemStringWriter(fileName, false) + meta := &metaData{ + target: table, + metaSQL: createSQL, + } + return WriteMeta(meta, fsStringWriter, f.cfg) +} + +func (f *SimpleWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { + if f.cfg.FileSize == UnspecifiedSize { + fileName := path.Join(f.cfg.OutputDirPath, fmt.Sprintf("%s.%s.sql", ir.DatabaseName(), ir.TableName())) + fsStringWriter := NewFileSystemStringWriter(fileName, true) + return WriteInsert(ir, fsStringWriter, f.cfg) + } + + ir = splitTableDataIntoChunks(ir, f.cfg.FileSize) + for chunkCount := 0; ; /* loop */ chunkCount += 1 { + fileName := path.Join(f.cfg.OutputDirPath, fmt.Sprintf("%s.%s.%3d.sql", ir.DatabaseName(), ir.TableName(), chunkCount)) + fsStringWriter := newInterceptStringWriter(NewFileSystemStringWriter(fileName, true)) + err := WriteInsert(ir, fsStringWriter, f.cfg) + if err != nil { + return err + } + if fsStringWriter.writeNothingYet { + break + } + } + return nil +} + +type FileSystemStringWriter struct { + path string + + file *os.File + once sync.Once + err error +} + +func (w *FileSystemStringWriter) initFileHandle() { + w.file, w.err = os.OpenFile(w.path, os.O_CREATE|os.O_WRONLY, 0755) +} + +func (w *FileSystemStringWriter) WriteString(str string) (int, error) { + if w.err != nil { + return 0, w.err + } + w.once.Do(w.initFileHandle) + return w.file.WriteString(str) +} + +func NewFileSystemStringWriter(path string, lazyHandleCreation bool) *FileSystemStringWriter { + w := &FileSystemStringWriter{path: path} + if !lazyHandleCreation { + w.once.Do(w.initFileHandle) + } + return w +} + +type interceptStringWriter struct { + sw io.StringWriter + writeNothingYet bool +} + +func (i *interceptStringWriter) WriteString(str string) (int, error) { + writtenBytes, err := i.sw.WriteString(str) + if writtenBytes != 0 { + i.writeNothingYet = false + } + return writtenBytes, err +} + +func newInterceptStringWriter(sw io.StringWriter) *interceptStringWriter { + return &interceptStringWriter{ + sw: sw, + writeNothingYet: true, + } +} diff --git a/v4/export/writer_util.go b/v4/export/writer_util.go new file mode 100644 index 0000000000000..5558bff55bf8d --- /dev/null +++ b/v4/export/writer_util.go @@ -0,0 +1,144 @@ +package export + +import ( + "database/sql" + "fmt" + "io" + "strings" +) + +type dumplingRow []sql.NullString + +func (d dumplingRow) BindAddress(args []interface{}) { + for i := range d { + args[i] = &d[i] + } +} + +func (d dumplingRow) ReportSize() uint64 { + var totalSize uint64 + for _, ns := range d { + if ns.Valid { + totalSize += 4 + } else { + totalSize += uint64(len(ns.String)) + } + } + return totalSize +} + +func WriteMeta(meta MetaIR, w io.StringWriter, cfg *Config) error { + log := cfg.Logger + log.Debug("start dumping meta data for target %s", meta.TargetName()) + + specCmtIter := meta.SpecialComments() + for specCmtIter.HasNext() { + if err := write(w, fmt.Sprintf("%s\n", specCmtIter.Next()), log); err != nil { + return err + } + } + + if err := write(w, fmt.Sprintf("%s\n", meta.MetaSQL()), log); err != nil { + return err + } + + log.Debug("finish dumping meta data for target %s", meta.TargetName()) + return nil +} + +func WriteInsert(tblIR TableDataIR, w io.StringWriter, cfg *Config) error { + log := cfg.Logger + rowIter := tblIR.Rows() + if !rowIter.HasNext() { + return nil + } + + log.Debug("start dumping for table %s", tblIR.TableName()) + specCmtIter := tblIR.SpecialComments() + for specCmtIter.HasNext() { + if err := write(w, fmt.Sprintf("%s\n", specCmtIter.Next()), log); err != nil { + return err + } + } + + tblName := tblIR.TableName() + if !strings.HasPrefix(tblName, "`") && !strings.HasSuffix(tblName, "`") { + tblName = wrapStringWith(tblName, "`") + } + if err := write(w, fmt.Sprintf("INSERT INTO %s VALUES \n", tblName), log); err != nil { + return err + } + + for rowIter.HasNext() { + var dumplingRow = make(dumplingRow, tblIR.ColumnCount()) + if err := rowIter.Next(dumplingRow); err != nil { + log.Error("scanning from sql.Row failed, error: %s", err.Error()) + return err + } + + row := convert(dumplingRow, tblIR.ColumnTypes()) + + if err := write(w, fmt.Sprintf("(%s)", strings.Join(row, ", ")), log); err != nil { + return err + } + + var splitter string + if rowIter.HasNext() { + splitter = "," + } else { + splitter = ";" + } + if err := write(w, fmt.Sprintf("%s\n", splitter), log); err != nil { + return err + } + } + log.Debug("finish dumping for table %s", tblIR.TableName()) + return nil +} + +func write(writer io.StringWriter, str string, logger Logger) error { + _, err := writer.WriteString(str) + if err != nil && logger != nil { + logger.Error("writing failed, string: `%s`, error: %s", str, err.Error()) + } + return err +} + +func wrapStringWith(str string, wrapper string) string { + return fmt.Sprintf("%s%s%s", wrapper, str, wrapper) +} + +func convert(origin []sql.NullString, colTypes []string) []string { + ret := make([]string, len(origin)) + for i, s := range origin { + if !s.Valid { + ret[i] = "NULL" + continue + } + + if isCharTypes(colTypes[i]) { + ret[i] = wrapStringWith(s.String, "'") + } else { + ret[i] = s.String + } + } + return ret +} + +var charTypes = map[string]struct{}{ + "CHAR": {}, + "NCHAR": {}, + "VARCHAR": {}, + "NVARCHAR": {}, + "BINARY": {}, + "VARBINARY": {}, + "BLOB": {}, + "TEXT": {}, + "ENUM": {}, + "SET": {}, +} + +func isCharTypes(colType string) bool { + _, ok := charTypes[colType] + return ok +} diff --git a/v4/export/writer_util_test.go b/v4/export/writer_util_test.go new file mode 100644 index 0000000000000..66efb54dc2605 --- /dev/null +++ b/v4/export/writer_util_test.go @@ -0,0 +1,92 @@ +package export + +import ( + . "github.com/pingcap/check" + "testing" +) + +func TestT(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testUtilSuite{}) + +type testUtilSuite struct { + mockCfg *Config +} + +func (s *testUtilSuite) SetUpSuite(c *C) { + s.mockCfg = &Config{ + Logger: &DummyLogger{}, + FileSize: UnspecifiedSize, + } +} + +func (s *testUtilSuite) TestWriteMeta(c *C) { + createTableStmt := "CREATE TABLE `t1` (\n" + + " `a` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;" + specCmts := []string{"/*!40103 SET TIME_ZONE='+00:00' */;"} + meta := newMockMetaIR("t1", createTableStmt, specCmts) + strCollector := &mockStringCollector{} + + err := WriteMeta(meta, strCollector, s.mockCfg) + c.Assert(err, IsNil) + expected := "/*!40103 SET TIME_ZONE='+00:00' */;\n" + + "CREATE TABLE `t1` (\n" + + " `a` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;\n" + c.Assert(strCollector.buf, Equals, expected) +} + +func (s *testUtilSuite) TestWriteInsert(c *C) { + data := [][]string{ + {"1", "male", "bob@mail.com", "020-1234", ""}, + {"2", "female", "sarah@mail.com", "020-1253", "healthy"}, + {"3", "male", "john@mail.com", "020-1256", "healthy"}, + {"4", "female", "sarah@mail.com", "020-1235", "healthy"}, + } + colTypes := []string{"INT", "SET", "VARCHAR", "VARCHAR", "TEXT"} + specCmts := []string{ + "/*!40101 SET NAMES binary*/;", + "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;", + } + tableIR := newMockTableDataIR("test", "employee", data, specCmts, colTypes) + strCollector := &mockStringCollector{} + + err := WriteInsert(tableIR, strCollector, s.mockCfg) + c.Assert(err, IsNil) + expected := "/*!40101 SET NAMES binary*/;\n" + + "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n" + + "INSERT INTO `employee` VALUES \n" + + "(1, 'male', 'bob@mail.com', '020-1234', NULL),\n" + + "(2, 'female', 'sarah@mail.com', '020-1253', 'healthy'),\n" + + "(3, 'male', 'john@mail.com', '020-1256', 'healthy'),\n" + + "(4, 'female', 'sarah@mail.com', '020-1235', 'healthy');\n" + c.Assert(strCollector.buf, Equals, expected) +} + +func (s *testUtilSuite) TestWrite(c *C) { + mocksw := &mockStringWriter{} + src := []string{"test", "loooooooooooooooooooong", "poison"} + exp := []string{"test", "loooooooooooooooooooong", "poison_error"} + + for i, s := range src { + err := write(mocksw, s, nil) + if err != nil { + c.Assert(err.Error(), Equals, exp[i]) + } else { + c.Assert(s, Equals, mocksw.buf) + c.Assert(mocksw.buf, Equals, exp[i]) + } + } + err := write(mocksw, "test", nil) + c.Assert(err, IsNil) +} + +func (s *testUtilSuite) TestConvert(c *C) { + srcColTypes := []string{"INT", "CHAR", "BIGINT", "VARCHAR", "SET"} + src := makeNullString([]string{"255", "", "25535", "computer_science", "male"}) + exp := []string{"255", "NULL", "25535", "'computer_science'", "'male'"} + c.Assert(convert(src, srcColTypes), DeepEquals, exp) +}