Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Write directly to S3 #155

Merged
merged 12 commits into from
Sep 28, 2020
Merged
16 changes: 13 additions & 3 deletions cmd/dumpling/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"time"

"github.com/docker/go-units"
"github.com/pingcap/dumpling/v4/cli"
"github.com/pingcap/dumpling/v4/export"
"github.com/pingcap/dumpling/v4/log"
"github.com/pingcap/br/pkg/storage"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/spf13/pflag"
"go.uber.org/zap"

"github.com/pingcap/dumpling/v4/cli"
"github.com/pingcap/dumpling/v4/export"
"github.com/pingcap/dumpling/v4/log"
)

var (
Expand Down Expand Up @@ -124,6 +126,8 @@ func main() {
pflag.StringVar(&outputFilenameFormat, "output-filename-template", "", "The output filename template (without file extension)")
pflag.BoolVar(&completeInsert, "complete-insert", false, "Use complete INSERT statements that include column names")

storage.DefineFlags(pflag.CommandLine)

printVersion := pflag.BoolP("version", "V", false, "Print Dumpling version")

pflag.Parse()
Expand Down Expand Up @@ -207,6 +211,12 @@ func main() {
conf.OutputFileTemplate = tmpl
conf.CompleteInsert = completeInsert

err = conf.ParseFromFlags(pflag.CommandLine)
if err != nil {
fmt.Println(err.Error())
os.Exit(2)
}

err = export.Dump(context.Background(), conf)
if err != nil {
log.Error("dump failed error stack info", zap.Error(err))
Expand Down
17 changes: 7 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,22 @@ require (
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/coreos/go-semver v0.3.0
github.com/docker/go-units v0.4.0
github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/gorilla/websocket v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/json-iterator/go v1.1.9 // indirect
github.com/pingcap/br v0.0.0-20200925095602-bf9cc603382e
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd
github.com/pingcap/errors v0.11.5-0.20200902104258-eba4f1d8f6de
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/tidb-tools v4.0.5-0.20200817064459-ba61a7376547+incompatible
github.com/pingcap/kvproto v0.0.0-20200910095337-6b893f12be43
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible
github.com/pkg/errors v0.9.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.3
github.com/spf13/pflag v1.0.5
github.com/tikv/pd v1.1.0-beta.0.20200825070655-6b09f3acbb1f
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.15.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8 // indirect
Expand All @@ -32,5 +30,4 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
)
502 changes: 502 additions & 0 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions tests/_utils/run_dumpling
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
set -e

echo "[$(date)] Executing bin/dumpling..."
echo "$DUMPLING_OUTPUT_DIR"

bin/dumpling -u "$DUMPLING_TEST_USER" -h 127.0.0.1 \
-P "$DUMPLING_TEST_PORT" -B "$DUMPLING_TEST_DATABASE" \
Expand Down
52 changes: 52 additions & 0 deletions tests/s3/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/sh

set -eux

echo "starting localstack writing to ${DUMPLING_OUTPUT_DIR}"
mkdir -p "${DUMPLING_OUTPUT_DIR}"
ls "${DUMPLING_OUTPUT_DIR}"
docker run --name dumpling_test_s3 -d \
-p 5000:5000 \
motoserver/moto
sleep 1 # wait for motoserver to start up
cleanup() {
echo "Stopping motoserver"
docker rm -f dumpling_test_s3
}
trap cleanup EXIT

awslocal() {
docker run --rm --net=host -it -e PAGER=cat -e AWS_ACCESS_KEY_ID=foo -e AWS_SECRET_ACCESS_KEY=foo amazon/aws-cli --endpoint http://localhost:5000 "$@"
}
awslocal s3api create-bucket --bucket mybucket

DB_NAME="s3"
TABLE_NAME="t"

# drop database on mysql
run_sql "drop database if exists \`$DB_NAME\`;"

# build data on mysql
run_sql "create database $DB_NAME;"
run_sql "create table $DB_NAME.$TABLE_NAME (a int(255));"

# insert 100 records
run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");"

# run dumpling!
HOST_DIR=${DUMPLING_OUTPUT_DIR}
export DUMPLING_OUTPUT_DIR=s3://mybucket/dump
export DUMPLING_TEST_DATABASE=$DB_NAME
export AWS_REGION=us-east-1
export AWS_ACCESS_KEY_ID=testid
export AWS_SECRET_ACCESS_KEY=testkey
run_dumpling --s3.endpoint=http://localhost:5000
ls "${HOST_DIR}"

curl -o "${HOST_DIR}/s3-schema-create.sql" http://localhost:5000/mybucket/dump/s3-schema-create.sql
curl -o "${HOST_DIR}/s3.t-schema.sql" http://localhost:5000/mybucket/dump/s3.t-schema.sql
curl -o "${HOST_DIR}/s3.t.0.sql" http://localhost:5000/mybucket/dump/s3.t.0.sql

file_should_exist "$HOST_DIR/s3-schema-create.sql"
file_should_exist "$HOST_DIR/s3.t-schema.sql"
file_should_exist "$HOST_DIR/s3.t.0.sql"
18 changes: 17 additions & 1 deletion v4/export/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package export

import (
"context"
"encoding/json"
"fmt"
"regexp"
Expand All @@ -9,12 +10,17 @@ import (
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/dumpling/v4/log"
"github.com/pingcap/br/pkg/storage"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"go.uber.org/zap"

"github.com/pingcap/dumpling/v4/log"
)

type Config struct {
// TODO support passing in options via flags as well as toml
tirsen marked this conversation as resolved.
Show resolved Hide resolved
storage.BackendOptions
tirsen marked this conversation as resolved.
Show resolved Hide resolved

Databases []string
Host string
User string
Expand Down Expand Up @@ -62,6 +68,8 @@ type Config struct {
SessionParams map[string]interface{}

PosAfterConnect bool

ExternalStorage storage.ExternalStorage `json:"-"`
}

func DefaultConfig() *Config {
Expand Down Expand Up @@ -117,6 +125,14 @@ func (conf *Config) GetDSN(db string) string {
return dsn
}

func (config *Config) createExternalStorage(ctx context.Context) (storage.ExternalStorage, error) {
b, err := storage.ParseBackend(config.OutputDirPath, &config.BackendOptions)
if err != nil {
return nil, err
}
return storage.Create(ctx, b, false)
}

const (
UnspecifiedSize = 0
DefaultTiDBMemQuotaQuery = 32 * (1 << 30)
Expand Down
18 changes: 18 additions & 0 deletions v4/export/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package export

import (
"context"

. "github.com/pingcap/check"
)

var _ = Suite(&testConfigSuite{})

type testConfigSuite struct{}

func (s *testConfigSuite) TestCreateExternalStorage(c *C) {
mockConfig := DefaultConfig()
loc, err := mockConfig.createExternalStorage(context.Background())
c.Assert(err, IsNil)
c.Assert(loc.URI(), Matches, "file:.*")
}
6 changes: 3 additions & 3 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func Dump(pCtx context.Context, conf *Config) (err error) {
if err = adjustConfig(conf); err != nil {
if err = adjustConfig(pCtx, conf); err != nil {
return withStack(err)
}

Expand Down Expand Up @@ -114,9 +114,9 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
defer newPool.Close()
}

m := newGlobalMetadata(conf.OutputDirPath)
m := newGlobalMetadata(conf.ExternalStorage)
// write metadata even if dump failed
defer m.writeGlobalMetaData()
defer m.writeGlobalMetaData(ctx)

// for consistency lock, we should lock tables at first to get the tables we want to lock & dump
// for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables
Expand Down
19 changes: 10 additions & 9 deletions v4/export/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"database/sql"
"errors"
"fmt"
"path"
"strings"
"time"

"github.com/pingcap/br/pkg/storage"
)

type globalMetadata struct {
buffer bytes.Buffer

filePath string
storage storage.ExternalStorage
}

const (
Expand All @@ -28,10 +29,10 @@ const (
mariadbShowMasterStatusFieldNum = 4
)

func newGlobalMetadata(outputDir string) *globalMetadata {
func newGlobalMetadata(s storage.ExternalStorage) *globalMetadata {
return &globalMetadata{
filePath: path.Join(outputDir, metadataPath),
buffer: bytes.Buffer{},
storage: s,
buffer: bytes.Buffer{},
}
}

Expand Down Expand Up @@ -189,12 +190,12 @@ func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerTyp
})
}

func (m *globalMetadata) writeGlobalMetaData() error {
fileWriter, tearDown, err := buildFileWriter(m.filePath)
func (m *globalMetadata) writeGlobalMetaData(ctx context.Context) error {
fileWriter, tearDown, err := buildFileWriter(ctx, m.storage, metadataPath)
if err != nil {
return err
}
defer tearDown()
defer tearDown(ctx)

return write(fileWriter, m.String())
return write(ctx, fileWriter, m.String())
}
33 changes: 14 additions & 19 deletions v4/export/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package export
import (
"context"
"fmt"
"path"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/br/pkg/storage"
. "github.com/pingcap/check"
)

Expand All @@ -30,10 +30,8 @@ func (s *testMetaDataSuite) TestMysqlMetaData(c *C) {
mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(
sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"}))

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand All @@ -42,6 +40,13 @@ func (s *testMetaDataSuite) TestMysqlMetaData(c *C) {
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *testMetaDataSuite) createStorage(c *C) storage.ExternalStorage {
backend, err := storage.ParseBackend("file:///"+c.MkDir(), nil)
c.Assert(err, IsNil)
testLoc, _ := storage.Create(context.Background(), backend, true)
return testLoc
}

func (s *testMetaDataSuite) TestMetaDataAfterConn(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
Expand All @@ -63,11 +68,9 @@ func (s *testMetaDataSuite) TestMetaDataAfterConn(c *C) {
sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"}))
mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows2)

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, true), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand Down Expand Up @@ -98,10 +101,8 @@ func (s *testMetaDataSuite) TestMysqlWithFollowersMetaData(c *C) {
mock.ExpectQuery("SELECT @@default_master_connection").WillReturnError(fmt.Errorf("mock error"))
mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(followerRows)

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand Down Expand Up @@ -131,10 +132,8 @@ func (s *testMetaDataSuite) TestMysqlWithNullFollowersMetaData(c *C) {
mock.ExpectQuery("SELECT @@default_master_connection").WillReturnError(fmt.Errorf("mock error"))
mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(sqlmock.NewRows([]string{"SQL_Remaining_Delay"}).AddRow(nil))

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand All @@ -160,10 +159,8 @@ func (s *testMetaDataSuite) TestMariaDBMetaData(c *C) {
AddRow(gtidSet)
mock.ExpectQuery("SELECT @@global.gtid_binlog_pos").WillReturnRows(rows)
mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(rows)
testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMariaDB, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(mock.ExpectationsWereMet(), IsNil)
}
Expand All @@ -189,10 +186,8 @@ func (s *testMetaDataSuite) TestMariaDBWithFollowersMetaData(c *C) {
AddRow("connection_1"))
mock.ExpectQuery("SHOW ALL SLAVES STATUS").WillReturnRows(followerRows)

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand Down
Loading