Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: support update changefeed config, in non-online way #699

Merged
merged 6 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 185 additions & 93 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package cmd

import (
"context"
"fmt"
"strings"
"time"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/pingcap/ticdc/pkg/cyclic/mark"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/r3labs/diff"
"github.com/spf13/cobra"
)

Expand All @@ -39,6 +41,7 @@ func newChangefeedCommand() *cobra.Command {
newListChangefeedCommand(),
newQueryChangefeedCommand(),
newCreateChangefeedCommand(),
newUpdateChangefeedCommand(),
newStatisticsChangefeedCommand(),
newCreateChangefeedCyclicCommand(),
)
Expand Down Expand Up @@ -158,6 +161,125 @@ func newQueryChangefeedCommand() *cobra.Command {
return command
}

func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate bool) (*model.ChangeFeedInfo, error) {
if isCreate {
if startTs == 0 {
ts, logical, err := pdCli.GetTS(ctx)
if err != nil {
return nil, err
}
startTs = oracle.ComposeTS(ts, logical)
}
if err := verifyStartTs(ctx, startTs, cdcEtcdCli); err != nil {
return nil, err
}
}

cfg := config.GetDefaultReplicaConfig()
if len(configFile) > 0 {
if err := strictDecodeFile(configFile, "cdc", cfg); err != nil {
return nil, err
}
}
if cyclicReplicaID != 0 || len(cyclicFilterReplicaIDs) != 0 {
if !(cyclicReplicaID != 0 && len(cyclicFilterReplicaIDs) != 0) {
return nil, errors.New("invaild cyclic config, please make sure using " +
"nonzero replica ID and specify filter replica IDs")
}
filter := make([]uint64, 0, len(cyclicFilterReplicaIDs))
for _, id := range cyclicFilterReplicaIDs {
filter = append(filter, uint64(id))
}
cfg.Cyclic = &config.CyclicConfig{
Enable: true,
ReplicaID: cyclicReplicaID,
FilterReplicaID: filter,
SyncDDL: cyclicSyncDDL,
// TODO(neil) enable ID bucket.
}
}
info := &model.ChangeFeedInfo{
SinkURI: sinkURI,
Opts: make(map[string]string),
CreateTime: time.Now(),
StartTs: startTs,
TargetTs: targetTs,
Config: cfg,
Engine: model.SortEngine(sortEngine),
SortDir: sortDir,
State: model.StateNormal,
}

tz, err := util.GetTimezone(timezone)
if err != nil {
return nil, errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`")
}

if isCreate {
ctx = util.PutTimezoneInCtx(ctx, tz)
ineligibleTables, eligibleTables, err := verifyTables(ctx, cfg, startTs)
if err != nil {
return nil, err
}
if len(ineligibleTables) != 0 {
cmd.Printf("[WARN] some tables are not eligible to replicate, %#v\n", ineligibleTables)
if !noConfirm {
cmd.Printf("Could you agree to ignore those tables, and continue to replicate [Y/N]\n")
var yOrN string
_, err := fmt.Scan(&yOrN)
if err != nil {
return nil, err
}
if strings.ToLower(strings.TrimSpace(yOrN)) != "y" {
cmd.Printf("No changefeed is created because you don't want to ignore some tables.\n")
return nil, nil
}
}
}
if cfg.Cyclic.IsEnabled() && !cyclic.IsTablesPaired(eligibleTables) {
return nil, errors.New("normal tables and mark tables are not paired, " +
"please run `cdc cli changefeed cyclic create-marktables`")
}
}

for _, opt := range opts {
s := strings.SplitN(opt, "=", 2)
if len(s) <= 0 {
cmd.Printf("omit opt: %s", opt)
continue
}

var key string
var value string

key = s[0]
if len(s) > 1 {
value = s[1]
}
info.Opts[key] = value
}

err = verifySink(ctx, info.SinkURI, info.Config, info.Opts)
if err != nil {
return nil, err
}
return info, nil
}

func changefeedConfigVariables(command *cobra.Command) {
command.PersistentFlags().Uint64Var(&startTs, "start-ts", 0, "Start ts of changefeed")
command.PersistentFlags().Uint64Var(&targetTs, "target-ts", 0, "Target ts of changefeed")
command.PersistentFlags().StringVar(&sinkURI, "sink-uri", "mysql://root:123456@127.0.0.1:3306/", "sink uri")
command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file")
command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format")
command.PersistentFlags().StringVar(&sortEngine, "sort-engine", "memory", "sort engine used for data sort")
command.PersistentFlags().StringVar(&sortDir, "sort-dir", ".", "directory used for file sort")
command.PersistentFlags().StringVar(&timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)")
command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed")
command.PersistentFlags().UintSliceVar(&cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Expremental) Cyclic replication filter replica ID of changefeed")
command.PersistentFlags().BoolVar(&cyclicSyncDDL, "cyclic-sync-ddl", true, "(Expremental) Cyclic replication sync DDL of changefeed")
}

func newCreateChangefeedCommand() *cobra.Command {
command := &cobra.Command{
Use: "create",
Expand All @@ -166,128 +288,98 @@ func newCreateChangefeedCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
ctx := defaultContext
id := uuid.New().String()
if startTs == 0 {
ts, logical, err := pdCli.GetTS(ctx)
if err != nil {
return err
}
startTs = oracle.ComposeTS(ts, logical)
}
if err := verifyStartTs(ctx, startTs, cdcEtcdCli); err != nil {

info, err := verifyChangefeedParamers(ctx, cmd, true /* isCreate */)
if err != nil {
return err
}
if info == nil {
return nil
}

cfg := config.GetDefaultReplicaConfig()
if len(configFile) > 0 {
if err := strictDecodeFile(configFile, "cdc", cfg); err != nil {
return err
}
infoStr, err := info.Marshal()
if err != nil {
return err
}
if cyclicReplicaID != 0 || len(cyclicFilterReplicaIDs) != 0 {
if !(cyclicReplicaID != 0 && len(cyclicFilterReplicaIDs) != 0) {
return errors.New(
"invaild cyclic config, please make sure using nonzero replica ID " +
"and specify filter replica IDs")
}
filter := make([]uint64, 0, len(cyclicFilterReplicaIDs))
for _, id := range cyclicFilterReplicaIDs {
filter = append(filter, uint64(id))
}
cfg.Cyclic = &config.CyclicConfig{
Enable: true,
ReplicaID: cyclicReplicaID,
FilterReplicaID: filter,
SyncDDL: cyclicSyncDDL,
// TODO(neil) enable ID bucket.
}
err = cdcEtcdCli.SaveChangeFeedInfo(ctx, info, id)
if err != nil {
return err
}
info := &model.ChangeFeedInfo{
SinkURI: sinkURI,
Opts: make(map[string]string),
CreateTime: time.Now(),
StartTs: startTs,
TargetTs: targetTs,
Config: cfg,
Engine: model.SortEngine(sortEngine),
SortDir: sortDir,
State: model.StateNormal,
cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\n", id, infoStr)
return nil
},
}
changefeedConfigVariables(command)
command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table")

return command
}

func newUpdateChangefeedCommand() *cobra.Command {
command := &cobra.Command{
Use: "update",
Short: "Update config of an existing replication task (changefeed)",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := defaultContext

old, err := cdcEtcdCli.GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
return err
}

tz, err := util.GetTimezone(timezone)
info, err := verifyChangefeedParamers(ctx, cmd, false /* isCreate */)
if err != nil {
return errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`")
return err
}
// Fix some fields that can't be updated.
info.CreateTime = old.CreateTime
info.AdminJobType = old.AdminJobType

ctx = util.PutTimezoneInCtx(ctx, tz)
ineligibleTables, eligibleTables, err := verifyTables(ctx, cfg, startTs)
changelog, err := diff.Diff(old, info)
if err != nil {
return err
}
if len(ineligibleTables) != 0 {
cmd.Printf("[WARN] some tables are not eligible to replicate, %#v\n", ineligibleTables)
if !noConfirm {
cmd.Printf("Could you agree to ignore those tables, and continue to replicate [Y/N]\n")
var yOrN string
_, err := fmt.Scan(&yOrN)
if err != nil {
return err
}
if strings.ToLower(strings.TrimSpace(yOrN)) != "y" {
cmd.Printf("No changefeed is created because you don't want to ignore some tables.\n")
return nil
}
}
if len(changelog) == 0 {
cmd.Printf("changefeed config is the same with the old one, do nothing\n")
return nil
}
if cfg.Cyclic.IsEnabled() && !cyclic.IsTablesPaired(eligibleTables) {
return errors.New("normal tables and mark tables are not paired, " +
"please run `cdc cli changefeed cyclic create-marktables`")
cmd.Printf("Diff of changefeed config:\n")
for _, change := range changelog {
cmd.Printf("%+v\n", change)
}

for _, opt := range opts {
s := strings.SplitN(opt, "=", 2)
if len(s) <= 0 {
cmd.Printf("omit opt: %s", opt)
continue
if !noConfirm {
cmd.Printf("Could you agree to apply changes above to changefeed [Y/N]\n")
var yOrN string
_, err = fmt.Scan(&yOrN)
if err != nil {
return err
}

var key string
var value string

key = s[0]
if len(s) > 1 {
value = s[1]
if strings.ToLower(strings.TrimSpace(yOrN)) != "y" {
cmd.Printf("No upadte to changefeed.\n")
return nil
}
info.Opts[key] = value
}

d, err := info.Marshal()
err = cdcEtcdCli.SaveChangeFeedInfo(ctx, info, changefeedID)
if err != nil {
return err
}
err = verifySink(ctx, info.SinkURI, info.Config, info.Opts)
infoStr, err := info.Marshal()
if err != nil {
return err
}
err = cdcEtcdCli.SaveChangeFeedInfo(ctx, info, id)
if err != nil {
return err
}
cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\n", id, d)
cmd.Printf("Update changefeed config successfully! "+
"Will take effect only if the changefeed has been paused before this command"+
"\nID: %s\nInfo: %s\n", changefeedID, infoStr)
return nil
},
}
command.PersistentFlags().Uint64Var(&startTs, "start-ts", 0, "Start ts of changefeed")
command.PersistentFlags().Uint64Var(&targetTs, "target-ts", 0, "Target ts of changefeed")
command.PersistentFlags().StringVar(&sinkURI, "sink-uri", "mysql://root:123456@127.0.0.1:3306/", "sink uri")
command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file")
command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format")
command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table")
command.PersistentFlags().StringVar(&sortEngine, "sort-engine", "memory", "sort engine used for data sort")
command.PersistentFlags().StringVar(&sortDir, "sort-dir", ".", "directory used for file sort")
command.PersistentFlags().StringVar(&timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)")
command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed")
command.PersistentFlags().UintSliceVar(&cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Expremental) Cyclic replication filter replica ID of changefeed")
command.PersistentFlags().BoolVar(&cyclicSyncDDL, "cyclic-sync-ddl", true, "(Expremental) Cyclic replication sync DDL of changefeed")
changefeedConfigVariables(command)
command.PersistentFlags().StringVar(&changefeedID, "changefeed-id", "", "Replication task (changefeed) ID")
command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to confirm update changefeed config")
_ = command.MarkPersistentFlagRequired("changefeed-id")

return command
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/pingcap/tidb v1.1.0-beta.0.20200603101356-552e7709de0d
github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/r3labs/diff v1.1.0
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.5.1
go.etcd.io/etcd v0.5.0-alpha.5.0.20191211224106-0dc78a144b31
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/r3labs/diff v1.1.0 h1:V53xhrbTHrWFWq3gI4b94AjgEJOerO1+1l0xyHOBi8M=
github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBmr8YXawX/le3+O26N+vPPC1PtjaF3mwnook=
Expand Down
23 changes: 23 additions & 0 deletions tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,29 @@ function run() {
exit 1
fi

# Update changefeed
cat - >"$WORK_DIR/changefeed.toml" <<EOF
case-sensitive = false
[mounter]
worker-num = 4
EOF
run_cdc_cli changefeed update --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai" --config="$WORK_DIR/changefeed.toml" --no-confirm --changefeed-id $uuid
changefeed_info=$(run_cdc_cli changefeed query --changefeed-id $uuid 2>&1)
if [[ ! $changefeed_info == *"\"case-sensitive\": false"* ]]; then
echo "[$(date)] <<<<< changefeed info is not updated as expected ${changefeed_info} >>>>>"
exit 1
fi
if [[ ! $changefeed_info == *"\"worker-num\": 4"* ]]; then
echo "[$(date)] <<<<< changefeed info is not updated as expected ${changefeed_info} >>>>>"
exit 1
fi

jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1)
if [[ $jobtype != 1 ]]; then
echo "[$(date)] <<<<< unexpect admin job type! expect 1 got ${jobtype} >>>>>"
exit 1
fi

# Resume changefeed
run_cdc_cli changefeed --changefeed-id $uuid resume && sleep 3
jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1)
Expand Down