Skip to content

Commit

Permalink
cyclic, cmd: qoute cyclic table name, fatal if loopback and fix a dat…
Browse files Browse the repository at this point in the history
…a race (#701)

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored Jul 3, 2020
1 parent 91ef563 commit 0057782
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 125 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newMqSink(ctx context.Context, mqProducer mqProducer.Producer, filter *filt
func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs))
log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs))
continue
}
partition := k.dispatcher.Dispatch(row)
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row
defer s.unresolvedTxnsMu.Unlock()
for _, row := range rows {
if s.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs))
log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs))
continue
}
key := *row.Table
Expand Down
9 changes: 3 additions & 6 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ func newCliCommand() *cobra.Command {
Use: "cli",
Short: "Manage replication task and TiCDC cluster",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
err := util.InitLogger(&util.Config{Level: "warn"})
if err != nil {
fmt.Printf("init logger error %v", errors.ErrorStack(err))
os.Exit(1)
}
initCmd(cmd, &util.Config{Level: "warn"})

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cliPdAddr},
Expand All @@ -139,7 +135,8 @@ func newCliCommand() *cobra.Command {
return errors.Annotate(err, "fail to open PD client")
}
cdcEtcdCli = kv.NewCDCEtcdClient(etcdCli)
pdCli, err = pd.NewClient([]string{cliPdAddr}, pd.SecurityOption{},
pdCli, err = pd.NewClientWithContext(
defaultContext, []string{cliPdAddr}, pd.SecurityOption{},
pd.WithGRPCDialOptions(
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand Down
24 changes: 1 addition & 23 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/pingcap/log"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

var rootCmd = &cobra.Command{
Expand All @@ -33,24 +27,8 @@ var rootCmd = &cobra.Command{

// Execute runs the root command
func Execute() {
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defaultContext = ctx
go func() {
sig := <-sc
log.Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}()

if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
rootCmd.Println(err)
os.Exit(1)
}
}
25 changes: 5 additions & 20 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package cmd

import (
"context"
"fmt"
"os"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -56,10 +54,11 @@ func init() {
}

func runEServer(cmd *cobra.Command, args []string) error {
err := initLog()
if err != nil {
return errors.Annotate(err, "init log failed")
}
cancel := initCmd(cmd, &util.Config{
File: logFile,
Level: logLevel,
})
defer cancel()
tz, err := util.GetTimezone(timezone)
if err != nil {
return errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`")
Expand All @@ -86,17 +85,3 @@ func runEServer(cmd *cobra.Command, args []string) error {

return nil
}

func initLog() error {
// Init log.
err := util.InitLogger(&util.Config{
File: logFile,
Level: logLevel,
})
if err != nil {
fmt.Printf("init logger error %v", errors.ErrorStack(err))
os.Exit(1)
}
log.Info("init log", zap.String("file", logFile), zap.String("level", logLevel))
return nil
}
36 changes: 34 additions & 2 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,56 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"

"github.com/pingcap/ticdc/pkg/config"
"syscall"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv"
"github.com/spf13/cobra"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
)

// initCmd initializes the logger, the default context and returns its cancel function.
func initCmd(cmd *cobra.Command, logCfg *util.Config) context.CancelFunc {
// Init log.
err := util.InitLogger(logCfg)
if err != nil {
cmd.Printf("init logger error %v\n", errors.ErrorStack(err))
os.Exit(1)
}
log.Info("init log", zap.String("file", logFile), zap.String("level", logLevel))

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-sc
log.Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}()
defaultContext = ctx
return cancel
}

func getAllCaptures(ctx context.Context) ([]*capture, error) {
_, raw, err := cdcEtcdCli.GetCaptures(ctx)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions pkg/cyclic/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ type TxnMap map[uint64]map[model.TableName][]*model.RowChangedEvent
// There is at most one mark table row that is modified for each transaction.
type MarkMap map[uint64]*model.RowChangedEvent

func (m MarkMap) shouldFilterTxn(startTs uint64, filterReplicaIDs []uint64) (*model.RowChangedEvent, bool) {
func (m MarkMap) shouldFilterTxn(startTs uint64, filterReplicaIDs []uint64, replicaID uint64) (*model.RowChangedEvent, bool) {
markRow, markFound := m[startTs]
if !markFound {
return nil, false
}
replicaID := ExtractReplicaID(markRow)
from := ExtractReplicaID(markRow)
if from == replicaID {
log.Fatal("cyclic replication loopback detected",
zap.Any("markRow", markRow),
zap.Uint64("replicaID", replicaID))
}
for i := range filterReplicaIDs {
if filterReplicaIDs[i] == replicaID {
if filterReplicaIDs[i] == from {
return markRow, true
}
}
Expand Down Expand Up @@ -85,7 +90,7 @@ func FilterAndReduceTxns(txnsMap map[model.TableName][]*model.Txn, filterReplica
filteredTxns := make([]*model.Txn, 0, len(txns))
for _, txn := range txns {
// Check if we should skip this event
markRow, needSkip := markMap.shouldFilterTxn(txn.StartTs, filterReplicaIDs)
markRow, needSkip := markMap.shouldFilterTxn(txn.StartTs, filterReplicaIDs, replicaID)
if needSkip {
// Found cyclic mark, skip this event as it originly created from
// downstream.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cyclic/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ type Cyclic struct {
func (*Cyclic) UdpateSourceTableCyclicMark(sourceSchema, sourceTable string, bucket, replicaID uint64) string {
schema, table := mark.GetMarkTableName(sourceSchema, sourceTable)
return fmt.Sprintf(
`INSERT INTO %s.%s VALUES (%d, %d, 0) ON DUPLICATE KEY UPDATE val = val + 1;`,
schema, table, bucket, replicaID)
`INSERT INTO %s VALUES (%d, %d, 0) ON DUPLICATE KEY UPDATE val = val + 1;`,
model.QuoteSchema(schema, table), bucket, replicaID)
}

// FilterReplicaID return a slice of replica IDs needs to be filtered.
Expand Down
7 changes: 7 additions & 0 deletions tests/cyclic_ab/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ function run() {
run_cdc_cli changefeed cyclic create-marktables \
--cyclic-upstream-dsn="root@tcp(${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT})/"

# make sure create-marktables does not create mark table for mark table.
for c in $(seq 1 10); do {
# must not cause an error table name too long.
run_cdc_cli changefeed cyclic create-marktables \
--cyclic-upstream-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/"
} done

# record tso after we create tables to not block on waiting mark tables DDLs.
start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT)

Expand Down
66 changes: 0 additions & 66 deletions tests/cyclic_loopback/run.sh

This file was deleted.

0 comments on commit 0057782

Please sign in to comment.