From 7c95f65409b687683f38e1b40162231eae90c75e Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 24 Nov 2020 13:07:10 +0800 Subject: [PATCH] test: implement the old value checker (#1058) --- .github/workflows/integration.yml | 100 +++++++ Dockerfile.development | 2 + Makefile | 11 +- cdc/kv/store_op.go | 3 +- cdc/sink/mysql.go | 2 +- cdc/sink/simple_mysql_tester.go | 262 ++++++++++++++++++ cdc/sink/sink.go | 65 ++++- docker-compose-avro.yml | 4 +- docker-compose-canal.yml | 2 +- ...er-compose.yml => docker-compose-mysql.yml | 50 ++-- docker/config/enable-oldvalue-config.toml | 1 + .../framework/avro/kafka_docker_env.go | 75 ++++- .../framework/avro/kafka_single_table.go | 58 ++-- .../framework/canal/kafka_docker_env.go | 4 +- integration/framework/docker_compose_op.go | 20 +- .../{kafka_docker_env.go => docker_env.go} | 69 ++++- integration/framework/mysql/docker_env.go | 78 ++++++ .../framework/mysql/docker_env_test.go | 87 ++++++ integration/framework/mysql/single_table.go | 85 ++++++ .../framework/mysql/single_table_test.go | 45 +++ integration/integration.go | 36 +++ integration/tests/case_many_types.go | 44 ++- 22 files changed, 1010 insertions(+), 93 deletions(-) create mode 100644 cdc/sink/simple_mysql_tester.go rename docker-compose.yml => docker-compose-mysql.yml (88%) create mode 100644 docker/config/enable-oldvalue-config.toml rename integration/framework/{kafka_docker_env.go => docker_env.go} (59%) create mode 100644 integration/framework/mysql/docker_env.go create mode 100644 integration/framework/mysql/docker_env_test.go create mode 100644 integration/framework/mysql/single_table.go create mode 100644 integration/framework/mysql/single_table_test.go diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 396c6646168..1d7dca17571 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -9,7 +9,107 @@ on: branches: [ master ] jobs: + MySQL-integration: + runs-on: ubuntu-latest + + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v2 + + - uses: actions/setup-go@v2 + with: + go-version: '^1.14.0' + + - name: Cache Vendor + id: cache-vendor + uses: actions/cache@v2 + with: + path: vendor + key: ${{ runner.os }}-cdc-integration-vendor-${{ hashFiles('go.sum') }} + + - name: Update Vendor + run: go mod vendor + + - name: Pull images + run: docker-compose -f docker-compose-avro.yml pull --ignore-pull-failures + + - name: TiKV version + run: docker run pingcap/tikv:release-4.0-nightly -V + + - name: Build Integration Framework + run: | + cd $GITHUB_WORKSPACE/integration + go build + + - name: Run Integration Framework + timeout-minutes: 45 + run: | + cd $GITHUB_WORKSPACE/integration + ./integration -protocol=mysql + + - uses: actions/upload-artifact@v2 + if: ${{ always() }} + with: + name: logs + path: ${{ github.workspace }}/docker/logs/* + + - name: Clean Up + if: ${{ always() }} + run: | + $GITHUB_WORKSPACE/scripts/avro-local-test.sh down + + OldValue-integration: + needs: [MySQL-integration] + runs-on: ubuntu-latest + + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v2 + + - uses: actions/setup-go@v2 + with: + go-version: '^1.14.0' + + - name: Cache Vendor + id: cache-vendor + uses: actions/cache@v2 + with: + path: vendor + key: ${{ runner.os }}-cdc-integration-vendor-${{ hashFiles('go.sum') }} + + - name: Update Vendor + run: go mod vendor + + - name: Pull images + run: docker-compose -f docker-compose-avro.yml pull --ignore-pull-failures + + - name: TiKV version + run: docker run pingcap/tikv:release-4.0-nightly -V + + - name: Build Integration Framework + run: | + cd $GITHUB_WORKSPACE/integration + go build + + - name: Run Integration Framework + timeout-minutes: 45 + run: | + cd $GITHUB_WORKSPACE/integration + ./integration -protocol=simple-mysql-checking-old-value + + - uses: actions/upload-artifact@v2 + if: ${{ always() }} + with: + name: logs + path: ${{ github.workspace }}/docker/logs/* + + - name: Clean Up + if: ${{ always() }} + run: | + $GITHUB_WORKSPACE/scripts/avro-local-test.sh down + CanalJson-integration: + needs: [OldValue-integration] runs-on: ubuntu-latest steps: diff --git a/Dockerfile.development b/Dockerfile.development index b7f09e5f932..56c1cb0859f 100644 --- a/Dockerfile.development +++ b/Dockerfile.development @@ -4,7 +4,9 @@ WORKDIR /go/src/github.com/pingcap/ticdc COPY . . ENV CDC_ENABLE_VENDOR=1 RUN go mod vendor +RUN make failpoint-enable RUN make +RUN make failpoint-disable FROM alpine:3.12 RUN apk add --no-cache tzdata bash curl socat diff --git a/Makefile b/Makefile index 46d313f1b10..46ac48b374e 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ GOVENDORFLAG := -mod=vendor endif GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG) +GOBUILDNOVENDOR := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath ifeq ($(GOVERSION114), 1) GOTEST := CGO_ENABLED=1 $(GO) test -p 3 --race -gcflags=all=-d=checkptr=0 else @@ -83,7 +84,7 @@ leak_test: check_failpoint_ctl $(FAILPOINT_DISABLE) check_failpoint_ctl: - which $(FAILPOINT) >/dev/null 2>&1 || $(GOBUILD) -o $(FAILPOINT) github.com/pingcap/failpoint/failpoint-ctl + which $(FAILPOINT) >/dev/null 2>&1 || $(GOBUILDNOVENDOR) -o $(FAILPOINT) github.com/pingcap/failpoint/failpoint-ctl check_third_party_binary: @which bin/tidb-server @@ -142,8 +143,8 @@ check: check-copyright fmt lint check-static tidy errdoc coverage: GO111MODULE=off go get github.com/wadey/gocovmerge - gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out" - grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out" + gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out" + grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out" ifeq ("$(JenkinsCI)", "1") GO111MODULE=off go get github.com/mattn/goveralls @goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN) @@ -173,8 +174,8 @@ tools/bin/golangci-lint: tools/check/go.mod cd tools/check; test -e ../bin/golangci-lint || \ $(GO) build -o ../bin/golangci-lint github.com/golangci/golangci-lint/cmd/golangci-lint -failpoint-enable: +failpoint-enable: check_failpoint_ctl $(FAILPOINT_ENABLE) -failpoint-disable: +failpoint-disable: check_failpoint_ctl $(FAILPOINT_DISABLE) diff --git a/cdc/kv/store_op.go b/cdc/kv/store_op.go index 2b0ce53837a..1b36535a6d8 100644 --- a/cdc/kv/store_op.go +++ b/cdc/kv/store_op.go @@ -15,7 +15,6 @@ package kv import ( "fmt" - "go.uber.org/zap" "sync" "time" @@ -30,6 +29,7 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/store" "github.com/pingcap/tidb/store/tikv" + "go.uber.org/zap" ) const ( @@ -61,7 +61,6 @@ func newStorageWithCurVersionCache(storage tidbkv.Storage, cacheKey string) tidb curVersionCache[cacheKey] = &curVersionCacheEntry{ ts: 0, lastUpdated: time.Unix(0, 0), - mu: sync.Mutex{}, } } diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 24d6fc292e4..677fc603e74 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -999,7 +999,7 @@ func reduceReplace(replaces map[string][][]interface{}, batchSize int) ([]string cacheArgs := make([]interface{}, 0) last := false for i, val := range vals { - cacheCount += 1 + cacheCount++ if i == len(vals)-1 || cacheCount >= batchSize { last = true } diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go new file mode 100644 index 00000000000..d1959056119 --- /dev/null +++ b/cdc/sink/simple_mysql_tester.go @@ -0,0 +1,262 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "database/sql" + "fmt" + "net/url" + "strings" + "sync" + + dmysql "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/filter" + "github.com/pingcap/ticdc/pkg/quotes" + "go.uber.org/zap" +) + +func init() { + failpoint.Inject("SimpleMySQLSinkTester", func() { + sinkIniterMap["simple-mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + return newSimpleMySQLSink(ctx, sinkURI, config) + } + }) +} + +type simpleMySQLSink struct { + enableOldValue bool + enableCheckOldValue bool + db *sql.DB + rowsBuffer []*model.RowChangedEvent + rowsBufferLock sync.Mutex +} + +func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.ReplicaConfig) (*simpleMySQLSink, error) { + var db *sql.DB + + // dsn format of the driver: + // [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] + username := sinkURI.User.Username() + password, _ := sinkURI.User.Password() + port := sinkURI.Port() + if username == "" { + username = "root" + } + if port == "" { + port = "4000" + } + + dsnStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/?multiStatements=true", username, password, sinkURI.Hostname(), port) + dsn, err := dmysql.ParseDSN(dsnStr) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + + // create test db used for parameter detection + if dsn.Params == nil { + dsn.Params = make(map[string]string, 1) + } + testDB, err := sql.Open("mysql", dsn.FormatDSN()) + if err != nil { + return nil, errors.Annotate( + cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection when configuring sink") + } + defer testDB.Close() + + db, err = sql.Open("mysql", dsnStr) + if err != nil { + return nil, errors.Annotate( + cerror.WrapError(cerror.ErrMySQLConnectionError, err), "Open database connection failed") + } + err = db.PingContext(ctx) + if err != nil { + return nil, errors.Annotate( + cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection") + } + + sink := &simpleMySQLSink{ + db: db, + enableOldValue: config.EnableOldValue, + } + if strings.ToLower(sinkURI.Query().Get("check-old-value")) == "true" { + sink.enableCheckOldValue = true + log.Info("the old value checker is enabled") + } + return sink, nil +} + +func (s *simpleMySQLSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { + // do nothing + return nil +} + +// EmitRowChangedEvents sends Row Changed Event to Sink +// EmitRowChangedEvents may write rows to downstream directly; +func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + s.rowsBufferLock.Lock() + defer s.rowsBufferLock.Unlock() + s.rowsBuffer = append(s.rowsBuffer, rows...) + return nil +} + +func (s *simpleMySQLSink) executeRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + var sql string + var args []interface{} + if s.enableOldValue { + for _, row := range rows { + if len(row.PreColumns) != 0 && len(row.Columns) != 0 { + // update + if s.enableCheckOldValue { + err := s.checkOldValue(ctx, row) + if err != nil { + return errors.Trace(err) + } + } + sql, args = prepareUpdate(row.Table.QuoteString(), row.PreColumns, row.Columns, true) + } else if len(row.PreColumns) == 0 { + // insert + sql, args = prepareReplace(row.Table.QuoteString(), row.Columns, true, true) + } else if len(row.Columns) == 0 { + // delete + if s.enableCheckOldValue { + err := s.checkOldValue(ctx, row) + if err != nil { + return errors.Trace(err) + } + } + sql, args = prepareDelete(row.Table.QuoteString(), row.PreColumns, true) + } + _, err := s.db.ExecContext(ctx, sql, args...) + if err != nil { + return errors.Trace(err) + } + } + } else { + for _, row := range rows { + if row.IsDelete() { + sql, args = prepareDelete(row.Table.QuoteString(), row.PreColumns, true) + } else { + sql, args = prepareReplace(row.Table.QuoteString(), row.Columns, true, false) + } + _, err := s.db.ExecContext(ctx, sql, args...) + if err != nil { + return errors.Trace(err) + } + } + } + return nil +} + +// EmitDDLEvent sends DDL Event to Sink +// EmitDDLEvent should execute DDL to downstream synchronously +func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + var sql string + if len(ddl.TableInfo.Table) == 0 { + sql = ddl.Query + } else { + sql = fmt.Sprintf("use %s;%s", ddl.TableInfo.Schema, ddl.Query) + } + _, err := s.db.ExecContext(ctx, sql) + if err != nil && isIgnorableDDLError(err) { + log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err)) + return nil + } + return err +} + +// FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. +// TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` +func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { + s.rowsBufferLock.Lock() + defer s.rowsBufferLock.Unlock() + newBuffer := make([]*model.RowChangedEvent, 0, len(s.rowsBuffer)) + for _, row := range s.rowsBuffer { + if row.CommitTs <= resolvedTs { + err := s.executeRowChangedEvents(ctx, row) + if err != nil { + return 0, err + } + } else { + newBuffer = append(newBuffer, row) + } + } + s.rowsBuffer = newBuffer + return resolvedTs, nil +} + +// EmitCheckpointTs sends CheckpointTs to Sink +// TiCDC guarantees that all Events **in the cluster** which of commitTs less than or equal `checkpointTs` are sent to downstream successfully. +func (s *simpleMySQLSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + // do nothing + return nil +} + +// Close closes the Sink +func (s *simpleMySQLSink) Close() error { + return s.db.Close() +} + +func prepareCheckSQL(quoteTable string, cols []*model.Column) (string, []interface{}) { + var builder strings.Builder + builder.WriteString("SELECT count(1) FROM " + quoteTable + " WHERE ") + + colNames, wargs := whereSlice(cols, true) + if len(wargs) == 0 { + return "", nil + } + args := make([]interface{}, 0, len(wargs)) + for i := 0; i < len(colNames); i++ { + if i > 0 { + builder.WriteString(" AND ") + } + if wargs[i] == nil { + builder.WriteString(quotes.QuoteName(colNames[i]) + " IS NULL") + } else { + builder.WriteString(quotes.QuoteName(colNames[i]) + " = ?") + args = append(args, wargs[i]) + } + } + builder.WriteString(" LIMIT 1;") + sql := builder.String() + return sql, args +} + +func (s *simpleMySQLSink) checkOldValue(ctx context.Context, row *model.RowChangedEvent) error { + sql, args := prepareCheckSQL(row.Table.QuoteString(), row.PreColumns) + result, err := s.db.QueryContext(ctx, sql, args...) + if err != nil { + return errors.Trace(err) + } + var count int + if result.Next() { + err := result.Scan(&count) + if err != nil { + return errors.Trace(err) + } + } + if count == 0 { + log.Error("can't pass the check, the old value of this row is not exist", zap.Any("row", row)) + return errors.New("check failed") + } + log.Debug("pass the old value check", zap.String("sql", sql), zap.Any("args", args), zap.Int("count", count)) + return nil +} diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 942fc5e840d..7c723921b49 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -55,6 +55,53 @@ type Sink interface { Close() error } +var sinkIniterMap = make(map[string]sinkInitFunc) + +type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *filter.Filter, *config.ReplicaConfig, map[string]string, chan error) (Sink, error) + +func init() { + // register blockhole sink + sinkIniterMap["blackhole"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + return newBlackHoleSink(ctx, opts), nil + } + + // register mysql sink + sinkIniterMap["mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + return newMySQLSink(ctx, changefeedID, sinkURI, filter, config, opts) + } + sinkIniterMap["tidb"] = sinkIniterMap["mysql"] + sinkIniterMap["mysql+ssl"] = sinkIniterMap["mysql"] + sinkIniterMap["tidb+ssl"] = sinkIniterMap["mysql"] + + // register kafka sink + sinkIniterMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + return newKafkaSaramaSink(ctx, sinkURI, filter, config, opts, errCh) + } + sinkIniterMap["kafka+ssl"] = sinkIniterMap["kafka"] + + //register pulsar sink + sinkIniterMap["pulsar"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + return newPulsarSink(ctx, sinkURI, filter, config, opts, errCh) + } + sinkIniterMap["pulsar+ssl"] = sinkIniterMap["pulsar"] + + // register local sink + sinkIniterMap["local"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + return cdclog.NewLocalFileSink(ctx, sinkURI, errCh) + } + + // register s3 sink + sinkIniterMap["s3"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + return cdclog.NewS3Sink(ctx, sinkURI, errCh) + } +} + // NewSink creates a new sink with the sink-uri func NewSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string, filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { // parse sinkURI as a URI @@ -62,20 +109,8 @@ func NewSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr st if err != nil { return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) } - switch strings.ToLower(sinkURI.Scheme) { - case "blackhole": - return newBlackHoleSink(ctx, opts), nil - case "mysql", "tidb", "mysql+ssl", "tidb+ssl": - return newMySQLSink(ctx, changefeedID, sinkURI, filter, config, opts) - case "kafka", "kafka+ssl": - return newKafkaSaramaSink(ctx, sinkURI, filter, config, opts, errCh) - case "pulsar", "pulsar+ssl": - return newPulsarSink(ctx, sinkURI, filter, config, opts, errCh) - case "local": - return cdclog.NewLocalFileSink(ctx, sinkURI, errCh) - case "s3": - return cdclog.NewS3Sink(ctx, sinkURI, errCh) - default: - return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", sinkURI.Scheme) + if newSink, ok := sinkIniterMap[strings.ToLower(sinkURI.Scheme)]; ok { + return newSink(ctx, changefeedID, sinkURI, filter, config, opts, errCh) } + return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", sinkURI.Scheme) } diff --git a/docker-compose-avro.yml b/docker-compose-avro.yml index 84781bf3e9b..0462232efcf 100644 --- a/docker-compose-avro.yml +++ b/docker-compose-avro.yml @@ -8,7 +8,7 @@ services: context: . dockerfile: ./Dockerfile.development volumes: - - ./docker/data:/data + - /data - ./docker/logs:/logs - ./docker/config:/config command: @@ -134,7 +134,7 @@ services: image: pingcap/tikv:release-4.0-nightly volumes: - ./docker/config/tikv.toml:/tikv.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --addr=0.0.0.0:20160 diff --git a/docker-compose-canal.yml b/docker-compose-canal.yml index 703801aa463..3f69b259120 100644 --- a/docker-compose-canal.yml +++ b/docker-compose-canal.yml @@ -8,7 +8,7 @@ services: context: . dockerfile: ./Dockerfile volumes: - - ./docker/data:/data + - /data - ./docker/logs:/logs - ./docker/config:/config command: diff --git a/docker-compose.yml b/docker-compose-mysql.yml similarity index 88% rename from docker-compose.yml rename to docker-compose-mysql.yml index 3ad9e639a80..29b51952938 100644 --- a/docker-compose.yml +++ b/docker-compose-mysql.yml @@ -6,10 +6,13 @@ image: ticdc:latest build: context: . - dockerfile: ./Dockerfile + dockerfile: ./Dockerfile.development volumes: - - ./docker/data:/data + - /data - ./docker/logs:/logs + - ./docker/config:/config + environment: + GO_FAILPOINTS: ${GO_FAILPOINTS} command: -c "/usr/bin/tail -f /dev/null" entrypoint: "/bin/sh" depends_on: @@ -21,15 +24,20 @@ image: ticdc:latest build: context: . - dockerfile: ./Dockerfile + dockerfile: ./Dockerfile.development volumes: - - ./docker/data:/data + - /data - ./docker/logs:/logs + ports: + - "8300:8300" + environment: + GO_FAILPOINTS: ${GO_FAILPOINTS} entrypoint: "/cdc server" command: - --addr=0.0.0.0:8300 - --pd=http://upstream-pd:2379 - --log-file=/logs/capturer0.log + - --log-level=debug - --advertise-addr=capturer0:8300 depends_on: - "upstream-tidb" @@ -40,15 +48,20 @@ image: ticdc:latest build: context: . - dockerfile: ./Dockerfile + dockerfile: ./Dockerfile.development volumes: - - ./docker/data:/data + - /data - ./docker/logs:/logs + ports: + - "8301:8300" + environment: + GO_FAILPOINTS: ${GO_FAILPOINTS} entrypoint: "/cdc server" command: - --addr=0.0.0.0:8300 - --pd=http://upstream-pd:2379 - --log-file=/logs/capturer1.log + - --log-level=debug - --advertise-addr=capturer1:8300 depends_on: - "upstream-tidb" @@ -59,15 +72,20 @@ image: ticdc:latest build: context: . - dockerfile: ./Dockerfile + dockerfile: ./Dockerfile.development volumes: - - ./docker/data:/data + - /data - ./docker/logs:/logs + ports: + - "8302:8300" + environment: + GO_FAILPOINTS: ${GO_FAILPOINTS} entrypoint: "/cdc server" command: - --addr=0.0.0.0:8300 - --pd=http://upstream-pd:2379 - --log-file=/logs/capturer2.log + - --log-level=debug - --advertise-addr=capturer2:8300 depends_on: - "upstream-tidb" @@ -80,7 +98,7 @@ - "2379:2379" volumes: - ./docker/config/pd.toml:/pd.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --name=upstream-pd @@ -98,7 +116,7 @@ image: pingcap/tikv:nightly volumes: - ./docker/config/tikv.toml:/tikv.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --addr=0.0.0.0:20160 @@ -115,7 +133,7 @@ image: pingcap/tikv:nightly volumes: - ./docker/config/tikv.toml:/tikv.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --addr=0.0.0.0:20160 @@ -132,7 +150,7 @@ image: pingcap/tikv:nightly volumes: - ./docker/config/tikv.toml:/tikv.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --addr=0.0.0.0:20160 @@ -171,7 +189,7 @@ - "3379:2379" volumes: - ./docker/config/pd.toml:/pd.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --name=downstream-pd @@ -189,7 +207,7 @@ image: pingcap/tikv:nightly volumes: - ./docker/config/tikv.toml:/tikv.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --addr=0.0.0.0:20160 @@ -206,7 +224,7 @@ image: pingcap/tikv:nightly volumes: - ./docker/config/tikv.toml:/tikv.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --addr=0.0.0.0:20160 @@ -223,7 +241,7 @@ image: pingcap/tikv:nightly volumes: - ./docker/config/tikv.toml:/tikv.toml:ro - - ./docker/data:/data + - /data - ./docker/logs:/logs command: - --addr=0.0.0.0:20160 diff --git a/docker/config/enable-oldvalue-config.toml b/docker/config/enable-oldvalue-config.toml new file mode 100644 index 00000000000..4293a79e451 --- /dev/null +++ b/docker/config/enable-oldvalue-config.toml @@ -0,0 +1 @@ +enable-old-value = true diff --git a/integration/framework/avro/kafka_docker_env.go b/integration/framework/avro/kafka_docker_env.go index 93b4158a0a3..86b83d85c6f 100644 --- a/integration/framework/avro/kafka_docker_env.go +++ b/integration/framework/avro/kafka_docker_env.go @@ -15,12 +15,12 @@ package avro import ( "encoding/json" - "errors" "io/ioutil" "net/http" "path" "github.com/integralist/go-findroot/find" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/integration/framework" "go.uber.org/zap" @@ -34,7 +34,7 @@ const ( // KafkaDockerEnv represents the docker-compose service defined in docker-compose-avro.yml type KafkaDockerEnv struct { - framework.KafkaDockerEnv + framework.DockerEnv } // NewKafkaDockerEnv creates a new KafkaDockerEnv @@ -84,7 +84,7 @@ func NewKafkaDockerEnv(dockerComposeFile string) *KafkaDockerEnv { file = dockerComposeFile } - return &KafkaDockerEnv{KafkaDockerEnv: framework.KafkaDockerEnv{ + return &KafkaDockerEnv{DockerEnv: framework.DockerEnv{ DockerComposeOperator: framework.DockerComposeOperator{ FileName: file, Controller: controllerContainerName, @@ -92,3 +92,72 @@ func NewKafkaDockerEnv(dockerComposeFile string) *KafkaDockerEnv { }, }} } + +// Setup brings up a docker-compose service +func (d *KafkaDockerEnv) Setup() { + d.DockerEnv.Setup() + if err := createConnector(); err != nil { + log.Fatal("failed to create connector", zap.Error(err)) + } +} + +// Reset implements Environment +func (d *KafkaDockerEnv) Reset() { + d.DockerEnv.Reset() + if err := d.resetSchemaRegistry(); err != nil { + log.Fatal("failed to reset schema registry", zap.Error(err)) + } + if err := d.resetKafkaConnector(); err != nil { + log.Fatal("failed to reset kafka connector", zap.Error(err)) + } +} + +func (d *KafkaDockerEnv) resetSchemaRegistry() error { + resp, err := http.Get("http://127.0.0.1:8081/subjects") + if err != nil { + return err + } + if resp.Body == nil { + return errors.New("get schema registry subjects returns empty body") + } + defer resp.Body.Close() + + bytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + subs := []string{} + err = json.Unmarshal(bytes, &subs) + if err != nil { + return err + } + for _, sub := range subs { + url := "http://127.0.0.1:8081/subjects/" + sub + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + } + log.Info("Deleted the schema registry subjects", zap.Any("subjects", subs)) + return nil +} + +func (d *KafkaDockerEnv) resetKafkaConnector() error { + url := "http://127.0.0.1:8083/connectors/jdbc-sink-connector/" + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + return createConnector() +} diff --git a/integration/framework/avro/kafka_single_table.go b/integration/framework/avro/kafka_single_table.go index bad70fe0c18..5395d674195 100644 --- a/integration/framework/avro/kafka_single_table.go +++ b/integration/framework/avro/kafka_single_table.go @@ -67,25 +67,39 @@ func (a *SingleTableTask) Prepare(taskContext *framework.TaskContext) error { return err } taskContext.Downstream.SetConnMaxLifetime(5 * time.Second) + if taskContext.WaitForReady != nil { + log.Info("Waiting for env to be ready") + return taskContext.WaitForReady() + } + + return nil +} + +// Run implements Task +func (a *SingleTableTask) Run(taskContext *framework.TaskContext) error { + log.Warn("SingleTableTask has been run") + return nil +} +func createConnector() error { // TODO better way to generate JSON connectorConfigFmt := `{ - "name": "jdbc-sink-connector", - "config": { - "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", - "tasks.max": "1", - "topics": "testdb_%s", - "connection.url": "jdbc:mysql://root@downstream-tidb:4000/testdb", - "connection.ds.pool.size": 5, - "table.name.format": "%s", - "insert.mode": "upsert", - "delete.enabled": true, - "pk.mode": "record_key", - "auto.create": true, - "auto.evolve": true - } - }` - connectorConfig := fmt.Sprintf(connectorConfigFmt, a.TableName, a.TableName) + "name": "jdbc-sink-connector", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", + "tasks.max": "1", + "topics": "testdb_%s", + "connection.url": "jdbc:mysql://root@downstream-tidb:4000/testdb", + "connection.ds.pool.size": 5, + "table.name.format": "%s", + "insert.mode": "upsert", + "delete.enabled": true, + "pk.mode": "record_key", + "auto.create": true, + "auto.evolve": true + } + }` + connectorConfig := fmt.Sprintf(connectorConfigFmt, "test", "test") log.Debug("Creating Kafka sink connector", zap.String("config", connectorConfig)) resp, err := http.Post( @@ -112,17 +126,5 @@ func (a *SingleTableTask) Prepare(taskContext *framework.TaskContext) error { zap.ByteString("body", str)) return errors.Errorf("Kafka Connect Rest API returned status code %d", resp.StatusCode) } - - if taskContext.WaitForReady != nil { - log.Info("Waiting for env to be ready") - return taskContext.WaitForReady() - } - - return nil -} - -// Run implements Task -func (a *SingleTableTask) Run(taskContext *framework.TaskContext) error { - log.Warn("SingleTableTask has been run") return nil } diff --git a/integration/framework/canal/kafka_docker_env.go b/integration/framework/canal/kafka_docker_env.go index f5e4e15387d..9ee233fc28b 100644 --- a/integration/framework/canal/kafka_docker_env.go +++ b/integration/framework/canal/kafka_docker_env.go @@ -32,7 +32,7 @@ const ( // KafkaDockerEnv represents the docker-compose service defined in docker-compose-canal.yml type KafkaDockerEnv struct { - framework.KafkaDockerEnv + framework.DockerEnv } // NewKafkaDockerEnv creates a new KafkaDockerEnv @@ -57,7 +57,7 @@ func NewKafkaDockerEnv(dockerComposeFile string) *KafkaDockerEnv { file = dockerComposeFile } - return &KafkaDockerEnv{KafkaDockerEnv: framework.KafkaDockerEnv{ + return &KafkaDockerEnv{DockerEnv: framework.DockerEnv{ DockerComposeOperator: framework.DockerComposeOperator{ FileName: file, Controller: controllerContainerName, diff --git a/integration/framework/docker_compose_op.go b/integration/framework/docker_compose_op.go index e8eddff47ec..ef5ed9f3b3f 100644 --- a/integration/framework/docker_compose_op.go +++ b/integration/framework/docker_compose_op.go @@ -47,7 +47,11 @@ func (d *DockerComposeOperator) Setup() { if err != nil { log.Fatal("ping downstream database but not receive a pong", zap.Error(err)) } + d.WaitClusterStarted() +} +// WaitClusterStarted waits the cluster is started and ready +func (d *DockerComposeOperator) WaitClusterStarted() { if d.HealthChecker != nil { err := retry.Run(time.Second, 120, d.HealthChecker) if err != nil { @@ -56,6 +60,20 @@ func (d *DockerComposeOperator) Setup() { } } +// RestartComponents restarts a docker-compose service +func (d *DockerComposeOperator) RestartComponents(names ...string) { + for _, name := range names { + cmd := exec.Command("docker-compose", "-f", d.FileName, "rm", "-sf", name) + cmd.Env = os.Environ() + cmd.Env = append(cmd.Env, d.ExecEnv...) + runCmdHandleError(cmd) + } + cmd := exec.Command("docker-compose", "-f", d.FileName, "up", "-d") + cmd.Env = os.Environ() + cmd.Env = append(cmd.Env, d.ExecEnv...) + runCmdHandleError(cmd) +} + func waitTiDBStarted(dsn string) error { return retry.Run(time.Second, 60, func() error { upstream, err := sql.Open("mysql", dsn) @@ -85,7 +103,7 @@ func runCmdHandleError(cmd *exec.Cmd) []byte { zap.ByteString("output", bytes)) } - log.Info("Finished executing command", zap.String("cmd", cmd.String())) + log.Info("Finished executing command", zap.String("cmd", cmd.String()), zap.ByteString("output", bytes)) return bytes } diff --git a/integration/framework/kafka_docker_env.go b/integration/framework/docker_env.go similarity index 59% rename from integration/framework/kafka_docker_env.go rename to integration/framework/docker_env.go index aa2528970ab..87dd20ee3a4 100644 --- a/integration/framework/kafka_docker_env.go +++ b/integration/framework/docker_env.go @@ -16,6 +16,7 @@ package framework import ( "context" "database/sql" + "fmt" "os/exec" "time" @@ -31,19 +32,40 @@ const ( DownstreamDSN = "root@tcp(127.0.0.1:5000)/" ) -// KafkaDockerEnv represents the docker-compose service -type KafkaDockerEnv struct { +// DockerEnv represents the docker-compose service +type DockerEnv struct { DockerComposeOperator } // Reset implements Environment -func (e *KafkaDockerEnv) Reset() { - e.TearDown() - e.Setup() +func (e *DockerEnv) Reset() { + stdout, err := e.ExecInController(`/cdc cli unsafe reset --no-confirm --pd="http://upstream-pd:2379"`) + if err != nil { + log.Fatal("ResetEnv: cannot reset the cdc cluster", zap.ByteString("stdout", stdout), zap.Error(err)) + } + log.Info("ResetEnv: reset the cdc cluster", zap.ByteString("stdout", stdout)) + + upstream, err := sql.Open("mysql", UpstreamDSN) + if err != nil { + log.Fatal("ResetEnv: cannot connect to upstream database", zap.Error(err)) + } + defer upstream.Close() + if err := dropAllSchemas(upstream); err != nil { + log.Fatal("ResetEnv: error found when dropping all schema", zap.Error(err)) + } + + downstream, err := sql.Open("mysql", DownstreamDSN) + if err != nil { + log.Fatal("ResetEnv: cannot connect to downstream database", zap.Error(err)) + } + defer downstream.Close() + if err := dropAllSchemas(downstream); err != nil { + log.Fatal("ResetEnv: error found when dropping all schema", zap.Error(err)) + } } // RunTest implements Environment -func (e *KafkaDockerEnv) RunTest(task Task) { +func (e *DockerEnv) RunTest(task Task) { cmdLine := "/cdc " + task.GetCDCProfile().String() bytes, err := e.ExecInController(cmdLine) if err != nil { @@ -100,6 +122,39 @@ func (e *KafkaDockerEnv) RunTest(task Task) { } // SetListener implements Environment. Currently unfinished, will be used to monitor Kafka output -func (e *KafkaDockerEnv) SetListener(states interface{}, listener MqListener) { +func (e *DockerEnv) SetListener(states interface{}, listener MqListener) { // TODO } + +var systemSchema = map[string]struct{}{ + "INFORMATION_SCHEMA": {}, + "METRICS_SCHEMA": {}, + "PERFORMANCE_SCHEMA": {}, + "mysql": {}, +} + +func dropAllSchemas(db *sql.DB) error { + result, err := db.Query("show databases;") + if err != nil { + return err + } + var schemaNames []string + var schema string + for result.Next() { + err := result.Scan(&schema) + if err != nil { + return err + } + schemaNames = append(schemaNames, schema) + } + for _, schema := range schemaNames { + if _, ok := systemSchema[schema]; ok { + continue + } + _, err := db.Exec(fmt.Sprintf("drop database %s;", schema)) + if err != nil { + return err + } + } + return nil +} diff --git a/integration/framework/mysql/docker_env.go b/integration/framework/mysql/docker_env.go new file mode 100644 index 00000000000..63c6a7065a7 --- /dev/null +++ b/integration/framework/mysql/docker_env.go @@ -0,0 +1,78 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "database/sql" + + "github.com/integralist/go-findroot/find" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/integration/framework" + "go.uber.org/zap" +) + +const ( + dockerComposeFilePath = "/docker-compose-mysql.yml" + controllerContainerName = "ticdc_controller_1" +) + +// DockerEnv represents the docker-compose service defined in docker-compose-canal.yml +type DockerEnv struct { + framework.DockerEnv +} + +// NewDockerEnv creates a new KafkaDockerEnv +func NewDockerEnv(dockerComposeFile string) *DockerEnv { + healthChecker := func() error { + if err := checkDbConn(framework.UpstreamDSN); err != nil { + return err + } + return checkDbConn(framework.DownstreamDSN) + } + var file string + if dockerComposeFile == "" { + st, err := find.Repo() + if err != nil { + log.Fatal("Could not find git repo root", zap.Error(err)) + } + file = st.Path + dockerComposeFilePath + } else { + file = dockerComposeFile + } + + return &DockerEnv{DockerEnv: framework.DockerEnv{ + DockerComposeOperator: framework.DockerComposeOperator{ + FileName: file, + Controller: controllerContainerName, + HealthChecker: healthChecker, + }, + }} +} + +func checkDbConn(dsn string) error { + db, err := sql.Open("mysql", dsn) + if err != nil { + return err + } + if db == nil { + return errors.New("Can not connect to " + dsn) + } + defer db.Close() + err = db.Ping() + if err != nil { + return err + } + return nil +} diff --git a/integration/framework/mysql/docker_env_test.go b/integration/framework/mysql/docker_env_test.go new file mode 100644 index 00000000000..3ae307821c3 --- /dev/null +++ b/integration/framework/mysql/docker_env_test.go @@ -0,0 +1,87 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "os/exec" + "testing" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/integration/framework" + "github.com/stretchr/testify/require" +) + +func TestDockerEnv_Basic(t *testing.T) { + env := NewDockerEnv("") + require.NotNil(t, env) + + env.Setup() + + bytes, err := env.ExecInController("echo test") + require.NoErrorf(t, err, "Execution returned error", func() string { + switch err := err.(type) { + case *exec.ExitError: + return string(err.Stderr) + default: + return "" + } + }()) + require.Equal(t, "test\n", string(bytes)) + + env.TearDown() +} + +type dummyTask struct { + test *testing.T +} + +func (t *dummyTask) Prepare(taskContext *framework.TaskContext) error { + return nil +} + +func (t *dummyTask) GetCDCProfile() *framework.CDCProfile { + return &framework.CDCProfile{ + PDUri: "http://upstream-pd:2379", + SinkURI: "mysql://downstream-tidb:4000/testdb", + Opts: map[string]string{}, + ConfigFile: "", + } +} + +func (t *dummyTask) Name() string { + return "Dummy" +} + +func (t *dummyTask) Run(taskContext *framework.TaskContext) error { + err := taskContext.Upstream.Ping() + require.NoError(t.test, err, "Pinging upstream failed") + + err = taskContext.Downstream.Ping() + require.NoError(t.test, err, "Pinging downstream failed") + + err = taskContext.CreateDB("testdb") + require.NoError(t.test, err) + + log.Info("Running task") + return nil +} + +func TestCanalKafkaDockerEnv_RunTest(t *testing.T) { + env := NewDockerEnv("") + require.NotNil(t, env) + + env.Setup() + env.RunTest(&dummyTask{test: t}) + env.TearDown() +} diff --git a/integration/framework/mysql/single_table.go b/integration/framework/mysql/single_table.go new file mode 100644 index 00000000000..3f6addc0b95 --- /dev/null +++ b/integration/framework/mysql/single_table.go @@ -0,0 +1,85 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "database/sql" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/integration/framework" +) + +const ( + testDbName = "testdb" +) + +// SingleTableTask provides a basic implementation for an Avro test case +type SingleTableTask struct { + TableName string + CheckOleValue bool +} + +// Name implements Task +func (c *SingleTableTask) Name() string { + log.Warn("SingleTableTask should be embedded in another Task") + return "SingleTableTask-" + c.TableName +} + +// GetCDCProfile implements Task +func (c *SingleTableTask) GetCDCProfile() *framework.CDCProfile { + sinkURI := "mysql://downstream-tidb:4000/" + testDbName + if c.CheckOleValue { + sinkURI = "simple-mysql://downstream-tidb:4000/" + testDbName + "?check-old-value=true" + } + return &framework.CDCProfile{ + PDUri: "http://upstream-pd:2379", + SinkURI: sinkURI, + Opts: map[string]string{}, + ConfigFile: "/config/enable-oldvalue-config.toml", + } +} + +// Prepare implements Task +func (c *SingleTableTask) Prepare(taskContext *framework.TaskContext) error { + err := taskContext.CreateDB(testDbName) + if err != nil { + return err + } + + _ = taskContext.Upstream.Close() + taskContext.Upstream, err = sql.Open("mysql", framework.UpstreamDSN+testDbName) + if err != nil { + return err + } + + _ = taskContext.Downstream.Close() + taskContext.Downstream, err = sql.Open("mysql", framework.DownstreamDSN+testDbName) + if err != nil { + return err + } + taskContext.Downstream.SetConnMaxLifetime(5 * time.Second) + + if taskContext.WaitForReady != nil { + log.Info("Waiting for env to be ready") + return taskContext.WaitForReady() + } + return nil +} + +// Run implements Task +func (c *SingleTableTask) Run(taskContext *framework.TaskContext) error { + log.Warn("SingleTableTask has been run") + return nil +} diff --git a/integration/framework/mysql/single_table_test.go b/integration/framework/mysql/single_table_test.go new file mode 100644 index 00000000000..ef1f8b8c19d --- /dev/null +++ b/integration/framework/mysql/single_table_test.go @@ -0,0 +1,45 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "database/sql" + "testing" + + "github.com/pingcap/ticdc/integration/framework" + "github.com/stretchr/testify/require" +) + +type emptyCanalSingleTableTask struct { + SingleTableTask +} + +func TestCanalSingleTableTest_Prepare(t *testing.T) { + env := NewDockerEnv("") + require.NotNil(t, env) + + env.Setup() + env.RunTest(&emptyCanalSingleTableTask{SingleTableTask{TableName: "test"}}) + + _, err := sql.Open("mysql", framework.UpstreamDSN+"testdb") + require.NoError(t, err) + + _, err = sql.Open("mysql", framework.DownstreamDSN+"testdb") + require.NoError(t, err) + + err = env.HealthChecker() + require.NoError(t, err) + + env.TearDown() +} diff --git a/integration/integration.go b/integration/integration.go index a8ea5c04e15..99b896cd231 100644 --- a/integration/integration.go +++ b/integration/integration.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/ticdc/integration/framework" "github.com/pingcap/ticdc/integration/framework/avro" "github.com/pingcap/ticdc/integration/framework/canal" + "github.com/pingcap/ticdc/integration/framework/mysql" "github.com/pingcap/ticdc/integration/tests" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -75,6 +76,37 @@ func testCanalJSON() { runTests(testCases, env) } +func testMySQL() { + env := mysql.NewDockerEnv(*dockerComposeFile) + task := &mysql.SingleTableTask{TableName: "test"} + testCases := []framework.Task{ + tests.NewSimpleCase(task), + tests.NewDeleteCase(task), + tests.NewManyTypesCase(task), + tests.NewUnsignedCase(task), + tests.NewCompositePKeyCase(task), + tests.NewAlterCase(task), + } + + runTests(testCases, env) +} + +func testMySQLWithCheckingOldvValue() { + env := mysql.NewDockerEnv(*dockerComposeFile) + env.DockerComposeOperator.ExecEnv = []string{"GO_FAILPOINTS=github.com/pingcap/ticdc/cdc/sink/SimpleMySQLSinkTester=return(ture)"} + task := &mysql.SingleTableTask{TableName: "test", CheckOleValue: true} + testCases := []framework.Task{ + tests.NewSimpleCase(task), + tests.NewDeleteCase(task), + tests.NewManyTypesCase(task), + tests.NewUnsignedCase(task), + tests.NewCompositePKeyCase(task), + tests.NewAlterCase(task), + } + + runTests(testCases, env) +} + func runTests(cases []framework.Task, env framework.Environment) { log.SetLevel(zapcore.DebugLevel) env.Setup() @@ -97,6 +129,10 @@ func main() { testCanal() } else if *testProtocol == "canalJson" { testCanalJSON() + } else if *testProtocol == "mysql" { + testMySQL() + } else if *testProtocol == "simple-mysql-checking-old-value" { + testMySQLWithCheckingOldvValue() } else { log.Fatal("Unknown sink protocol", zap.String("protocol", *testProtocol)) } diff --git a/integration/tests/case_many_types.go b/integration/tests/case_many_types.go index 8e4ba855fb4..d1ecaeeac65 100644 --- a/integration/tests/case_many_types.go +++ b/integration/tests/case_many_types.go @@ -15,12 +15,13 @@ package tests import ( "errors" - "github.com/pingcap/ticdc/integration/framework/avro" - "github.com/pingcap/ticdc/integration/framework/canal" "math" "time" "github.com/pingcap/ticdc/integration/framework" + "github.com/pingcap/ticdc/integration/framework/avro" + "github.com/pingcap/ticdc/integration/framework/canal" + "github.com/pingcap/ticdc/integration/framework/mysql" ) // ManyTypesCase is base impl of test case for different types data @@ -86,6 +87,28 @@ func (s *ManyTypesCase) Run(ctx *framework.TaskContext) error { t_json JSON, PRIMARY KEY (id) )` + case *mysql.SingleTableTask: + createDBQuery = `create table test ( + id INT, + t_boolean BOOLEAN, + t_bigint BIGINT, + t_double DOUBLE, + t_decimal DECIMAL(38, 19), + t_bit BIT(64), + t_date DATE, + t_datetime DATETIME, + t_timestamp TIMESTAMP NULL, + t_time TIME, + t_year YEAR, + t_char CHAR, + t_varchar VARCHAR(10), + t_blob BLOB, + t_text TEXT, + t_enum ENUM ('enum1', 'enum2', 'enum3'), + t_set SET ('a', 'b', 'c'), + t_json JSON, + PRIMARY KEY (id) + )` default: return errors.New("unknown test case type") } @@ -94,15 +117,16 @@ func (s *ManyTypesCase) Run(ctx *framework.TaskContext) error { if err != nil { return err } + if _, ok := s.Task.(*avro.SingleTableTask); ok { + _, err = ctx.Downstream.ExecContext(ctx.Ctx, "drop table if exists test") + if err != nil { + return err + } - _, err = ctx.Downstream.ExecContext(ctx.Ctx, "drop table if exists test") - if err != nil { - return err - } - - _, err = ctx.Downstream.ExecContext(ctx.Ctx, createDBQuery) - if err != nil { - return err + _, err = ctx.Downstream.ExecContext(ctx.Ctx, createDBQuery) + if err != nil { + return err + } } // Get a handle of an existing table