Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cyclic, cmd: qoute cyclic table name, fatal if loopback and fix a data race #701

Merged
merged 8 commits into from
Jul 3, 2020
Merged
2 changes: 1 addition & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newMqSink(ctx context.Context, mqProducer mqProducer.Producer, filter *filt
func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs))
log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs))
continue
}
partition := k.dispatcher.Dispatch(row)
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row
defer s.unresolvedTxnsMu.Unlock()
for _, row := range rows {
if s.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs))
log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs))
continue
}
key := *row.Table
Expand Down
9 changes: 3 additions & 6 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ func newCliCommand() *cobra.Command {
Use: "cli",
Short: "Manage replication task and TiCDC cluster",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
err := util.InitLogger(&util.Config{Level: "warn"})
if err != nil {
fmt.Printf("init logger error %v", errors.ErrorStack(err))
os.Exit(1)
}
initCmd(cmd, &util.Config{Level: "warn"})

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cliPdAddr},
Expand All @@ -139,7 +135,8 @@ func newCliCommand() *cobra.Command {
return errors.Annotate(err, "fail to open PD client")
}
cdcEtcdCli = kv.NewCDCEtcdClient(etcdCli)
pdCli, err = pd.NewClient([]string{cliPdAddr}, pd.SecurityOption{},
pdCli, err = pd.NewClientWithContext(
defaultContext, []string{cliPdAddr}, pd.SecurityOption{},
pd.WithGRPCDialOptions(
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand Down
24 changes: 1 addition & 23 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/pingcap/log"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

var rootCmd = &cobra.Command{
Expand All @@ -33,24 +27,8 @@ var rootCmd = &cobra.Command{

// Execute runs the root command
func Execute() {
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defaultContext = ctx
go func() {
sig := <-sc
log.Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}()

if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
rootCmd.Println(err)
os.Exit(1)
}
}
25 changes: 5 additions & 20 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package cmd

import (
"context"
"fmt"
"os"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -56,10 +54,11 @@ func init() {
}

func runEServer(cmd *cobra.Command, args []string) error {
err := initLog()
if err != nil {
return errors.Annotate(err, "init log failed")
}
cancel := initCmd(cmd, &util.Config{
File: logFile,
Level: logLevel,
})
defer cancel()
tz, err := util.GetTimezone(timezone)
if err != nil {
return errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`")
Expand All @@ -86,17 +85,3 @@ func runEServer(cmd *cobra.Command, args []string) error {

return nil
}

func initLog() error {
// Init log.
err := util.InitLogger(&util.Config{
File: logFile,
Level: logLevel,
})
if err != nil {
fmt.Printf("init logger error %v", errors.ErrorStack(err))
os.Exit(1)
}
log.Info("init log", zap.String("file", logFile), zap.String("level", logLevel))
return nil
}
36 changes: 34 additions & 2 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,56 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"

"github.com/pingcap/ticdc/pkg/config"
"syscall"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv"
"github.com/spf13/cobra"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
)

// initCmd initializes the logger, the default context and returns its cancel function.
func initCmd(cmd *cobra.Command, logCfg *util.Config) context.CancelFunc {
// Init log.
err := util.InitLogger(logCfg)
if err != nil {
cmd.Printf("init logger error %v\n", errors.ErrorStack(err))
os.Exit(1)
}
log.Info("init log", zap.String("file", logFile), zap.String("level", logLevel))

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-sc
log.Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}()
defaultContext = ctx
return cancel
}

func getAllCaptures(ctx context.Context) ([]*capture, error) {
_, raw, err := cdcEtcdCli.GetCaptures(ctx)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions pkg/cyclic/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ type TxnMap map[uint64]map[model.TableName][]*model.RowChangedEvent
// There is at most one mark table row that is modified for each transaction.
type MarkMap map[uint64]*model.RowChangedEvent

func (m MarkMap) shouldFilterTxn(startTs uint64, filterReplicaIDs []uint64) (*model.RowChangedEvent, bool) {
func (m MarkMap) shouldFilterTxn(startTs uint64, filterReplicaIDs []uint64, replicaID uint64) (*model.RowChangedEvent, bool) {
markRow, markFound := m[startTs]
if !markFound {
return nil, false
}
replicaID := ExtractReplicaID(markRow)
from := ExtractReplicaID(markRow)
if from == replicaID {
log.Fatal("cyclic replication loopback detected",
zap.Any("markRow", markRow),
zap.Uint64("replicaID", replicaID))
}
for i := range filterReplicaIDs {
if filterReplicaIDs[i] == replicaID {
if filterReplicaIDs[i] == from {
return markRow, true
}
}
Expand Down Expand Up @@ -85,7 +90,7 @@ func FilterAndReduceTxns(txnsMap map[model.TableName][]*model.Txn, filterReplica
filteredTxns := make([]*model.Txn, 0, len(txns))
for _, txn := range txns {
// Check if we should skip this event
markRow, needSkip := markMap.shouldFilterTxn(txn.StartTs, filterReplicaIDs)
markRow, needSkip := markMap.shouldFilterTxn(txn.StartTs, filterReplicaIDs, replicaID)
if needSkip {
// Found cyclic mark, skip this event as it originly created from
// downstream.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cyclic/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ type Cyclic struct {
func (*Cyclic) UdpateSourceTableCyclicMark(sourceSchema, sourceTable string, bucket, replicaID uint64) string {
schema, table := mark.GetMarkTableName(sourceSchema, sourceTable)
return fmt.Sprintf(
`INSERT INTO %s.%s VALUES (%d, %d, 0) ON DUPLICATE KEY UPDATE val = val + 1;`,
schema, table, bucket, replicaID)
`INSERT INTO %s VALUES (%d, %d, 0) ON DUPLICATE KEY UPDATE val = val + 1;`,
model.QuoteSchema(schema, table), bucket, replicaID)
}

// FilterReplicaID return a slice of replica IDs needs to be filtered.
Expand Down
7 changes: 7 additions & 0 deletions tests/cyclic_ab/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ function run() {
run_cdc_cli changefeed cyclic create-marktables \
--cyclic-upstream-dsn="root@tcp(${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT})/"

# make sure create-marktables does not create mark table for mark table.
for c in $(seq 1 10); do {
# must not cause an error table name too long.
run_cdc_cli changefeed cyclic create-marktables \
--cyclic-upstream-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/"
} done

# record tso after we create tables to not block on waiting mark tables DDLs.
start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT)

Expand Down
66 changes: 0 additions & 66 deletions tests/cyclic_loopback/run.sh

This file was deleted.