Skip to content

Commit

Permalink
owner: check if the checkpoint ts is less then gc safe time when crea…
Browse files Browse the repository at this point in the history
…te the changefeed (#1045) (#1097)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Nov 20, 2020
1 parent df7ec64 commit f5f773d
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 187 deletions.
6 changes: 4 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
SinkURI: "blackhole://",
Opts: map[string]string{},
StartTs: 417257993615179777,
Config: &config.ReplicaConfig{},
Config: &config.ReplicaConfig{
CheckGCSafePoint: true,
CaseSensitive: true,
},
}

err := info.VerifyAndFix()
Expand All @@ -175,7 +178,6 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
marshalConfig1, err := info.Config.Marshal()
c.Assert(err, check.IsNil)
defaultConfig := config.GetDefaultReplicaConfig()
defaultConfig.CaseSensitive = false
marshalConfig2, err := defaultConfig.Marshal()
c.Assert(err, check.IsNil)
c.Assert(marshalConfig1, check.Equals, marshalConfig2)
Expand Down
7 changes: 6 additions & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ func (o *Owner) newChangeFeed(
checkpointTs uint64) (cf *changeFeed, resultErr error) {
log.Info("Find new changefeed", zap.Stringer("info", info),
zap.String("id", id), zap.Uint64("checkpoint ts", checkpointTs))

if info.Config.CheckGCSafePoint {
err := util.CheckSafetyOfStartTs(ctx, o.pdClient, checkpointTs)
if err != nil {
return nil, errors.Trace(err)
}
}
failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(nil, tikv.ErrGCTooEarly.GenWithStackByArgs(checkpointTs-300, checkpointTs))
})
Expand Down
7 changes: 4 additions & 3 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ var (
cliLogLevel string
changefeedListAll bool

changefeedID string
captureID string
interval uint
changefeedID string
captureID string
interval uint
disableGCSafePointCheck bool

syncPointEnabled bool
syncPointInterval time.Duration
Expand Down
6 changes: 5 additions & 1 deletion cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate
}
startTs = oracle.ComposeTS(ts, logical)
}
if err := verifyStartTs(ctx, startTs, cdcEtcdCli); err != nil {
if err := verifyStartTs(ctx, startTs); err != nil {
return nil, err
}
if err := verifyTargetTs(ctx, startTs, targetTs); err != nil {
Expand All @@ -234,6 +234,9 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate
return nil, err
}
}
if disableGCSafePointCheck {
cfg.CheckGCSafePoint = false
}
if cyclicReplicaID != 0 || len(cyclicFilterReplicaIDs) != 0 {
if !(cyclicReplicaID != 0 && len(cyclicFilterReplicaIDs) != 0) {
return nil, errors.New("invaild cyclic config, please make sure using " +
Expand Down Expand Up @@ -404,6 +407,7 @@ func newCreateChangefeedCommand() *cobra.Command {
changefeedConfigVariables(command)
command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table")
command.PersistentFlags().StringVarP(&changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID")
command.PersistentFlags().BoolVarP(&disableGCSafePointCheck, "disable-gc-check", "", false, "Disable GC safe point check")

return command
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ dispatchers = [
{matcher = ['test3.*', 'test4.*'], dispatcher = "rowid"},
]
# 对于 MQ 类的 Sink,可以指定消息的协议格式
# 协议目前支持 default, canal 两种,default 为 ticdc-open-protocol
# 协议目前支持 default, canal, avro 和 maxwell 四种,default 为 ticdc-open-protocol
# For MQ Sinks, you can configure the protocol of the messages sending to MQ
# Currently the protocol support default and canal
# Currently the protocol support default, canal, avro and maxwell. Default is ticdc-open-protocol
protocol = "default"
[cyclic-replication]
Expand Down
20 changes: 4 additions & 16 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

Expand All @@ -39,7 +38,7 @@ import (
"github.com/pingcap/ticdc/pkg/httputil"
"github.com/pingcap/ticdc/pkg/logutil"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/ticdc/pkg/util"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.etcd.io/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -215,22 +214,11 @@ func jsonPrint(cmd *cobra.Command, v interface{}) error {
return nil
}

func verifyStartTs(ctx context.Context, startTs uint64, cli kv.CDCEtcdClient) error {
resp, err := cli.Client.Get(ctx, tikv.GcSavedSafePoint)
if err != nil {
return errors.Trace(err)
}
if resp.Count == 0 {
func verifyStartTs(ctx context.Context, startTs uint64) error {
if disableGCSafePointCheck {
return nil
}
safePoint, err := strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64)
if err != nil {
return errors.Trace(err)
}
if startTs < safePoint {
return errors.Errorf("startTs %d less than gcSafePoint %d", startTs, safePoint)
}
return nil
return util.CheckSafetyOfStartTs(ctx, pdCli, startTs)
}

func verifyTargetTs(ctx context.Context, startTs, targetTs uint64) error {
Expand Down
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# AUTOGENERATED BY github.com/pingcap/tiup/components/errdoc/errdoc-gen
# DO NOT EDIT THIS FILE, PLEASE CHANGE ERROR DEFINITION IF CONTENT IMPROPER.
# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER.

["CDC:ErrAPIInvalidParam"]
error = '''
Expand Down Expand Up @@ -438,7 +438,7 @@ create sem version

["CDC:ErrNewStore"]
error = '''
new store faile
new store failed
'''

["CDC:ErrNoPendingRegion"]
Expand Down
24 changes: 24 additions & 0 deletions integration/framework/docker_compose_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package framework

import (
"database/sql"
"os"
"os/exec"
"time"
Expand All @@ -38,6 +39,14 @@ func (d *DockerComposeOperator) Setup() {
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, d.ExecEnv...)
runCmdHandleError(cmd)
err := waitTiDBStarted(UpstreamDSN)
if err != nil {
log.Fatal("ping upstream database but not receive a pong", zap.Error(err))
}
err = waitTiDBStarted(DownstreamDSN)
if err != nil {
log.Fatal("ping downstream database but not receive a pong", zap.Error(err))
}

if d.HealthChecker != nil {
err := retry.Run(time.Second, 120, d.HealthChecker)
Expand All @@ -47,6 +56,21 @@ func (d *DockerComposeOperator) Setup() {
}
}

func waitTiDBStarted(dsn string) error {
return retry.Run(time.Second, 60, func() error {
upstream, err := sql.Open("mysql", dsn)
if err != nil {
return errors.Trace(err)
}
defer upstream.Close()
err = upstream.Ping()
if err != nil {
return errors.Trace(err)
}
return nil
})
}

func runCmdHandleError(cmd *exec.Cmd) []byte {
log.Info("Start executing command", zap.String("cmd", cmd.String()))
bytes, err := cmd.Output()
Expand Down
22 changes: 12 additions & 10 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
)

var defaultReplicaConfig = &ReplicaConfig{
CaseSensitive: true,
EnableOldValue: false,
CaseSensitive: true,
EnableOldValue: false,
CheckGCSafePoint: true,
Filter: &FilterConfig{
Rules: []string{"*.*"},
},
Expand All @@ -49,14 +50,15 @@ var defaultReplicaConfig = &ReplicaConfig{
type ReplicaConfig replicaConfig

type replicaConfig struct {
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"`
ForceReplicate bool `toml:"force-replicate" json:"force-replicate"`
Filter *FilterConfig `toml:"filter" json:"filter"`
Mounter *MounterConfig `toml:"mounter" json:"mounter"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Cyclic *CyclicConfig `toml:"cyclic-replication" json:"cyclic-replication"`
Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"`
ForceReplicate bool `toml:"force-replicate" json:"force-replicate"`
CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"`
Filter *FilterConfig `toml:"filter" json:"filter"`
Mounter *MounterConfig `toml:"mounter" json:"mounter"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Cyclic *CyclicConfig `toml:"cyclic-replication" json:"cyclic-replication"`
Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
ErrPDEtcdAPIError = errors.Normalize("etcd api call error", errors.RFCCodeText("CDC:ErrPDEtcdAPIError"))
ErrCachedTSONotExists = errors.Normalize("GetCachedCurrentVersion: cache entry does not exist", errors.RFCCodeText("CDC:ErrCachedTSONotExists"))
ErrGetStoreSnapshot = errors.Normalize("get snapshot failed", errors.RFCCodeText("CDC:ErrGetStoreSnapshot"))
ErrNewStore = errors.Normalize("new store faile", errors.RFCCodeText("CDC:ErrNewStore"))
ErrNewStore = errors.Normalize("new store failed", errors.RFCCodeText("CDC:ErrNewStore"))

// rule related errors
ErrEncodeFailed = errors.Normalize("encode failed: %s", errors.RFCCodeText("CDC:ErrEncodeFailed"))
Expand Down
43 changes: 43 additions & 0 deletions pkg/util/gc_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/store/tikv"
pd "github.com/tikv/pd/client"
)

const (
// cdcChangefeedCreatingServiceGCSafePointID is service GC safe point ID
cdcChangefeedCreatingServiceGCSafePointID = "ticdc-changefeed-creating"
// cdcChangefeedCreatingServiceGCSafePointTTL is service GC safe point TTL
cdcChangefeedCreatingServiceGCSafePointTTL = 10 * 60 // 10 mins
)

// CheckSafetyOfStartTs checks if the startTs less than the minimum of Service-GC-Ts
// and this function will update the service GC to startTs
func CheckSafetyOfStartTs(ctx context.Context, pdCli pd.Client, startTs uint64) error {
minServiceGCTs, err := pdCli.UpdateServiceGCSafePoint(ctx, cdcChangefeedCreatingServiceGCSafePointID,
cdcChangefeedCreatingServiceGCSafePointTTL, startTs)
if err != nil {
return errors.Trace(err)
}
if startTs < minServiceGCTs {
return errors.Wrap(tikv.ErrGCTooEarly.GenWithStackByArgs(startTs, minServiceGCTs), "startTs less than gcSafePoint")
}
return nil
}
62 changes: 62 additions & 0 deletions pkg/util/gc_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"math"

"github.com/pingcap/check"
pd "github.com/tikv/pd/client"
)

type gcServiceSuite struct {
pdCli mockPdClientForServiceGCSafePoint
}

var _ = check.Suite(&gcServiceSuite{
mockPdClientForServiceGCSafePoint{serviceSafePoint: make(map[string]uint64)},
})

func (s *gcServiceSuite) TestCheckSafetyOfStartTs(c *check.C) {
ctx := context.Background()
s.pdCli.UpdateServiceGCSafePoint(ctx, "service1", 10, 60) //nolint:errcheck
err := CheckSafetyOfStartTs(ctx, s.pdCli, 50)
c.Assert(err.Error(), check.Equals, "startTs less than gcSafePoint: [tikv:9006]GC life time is shorter than transaction duration, transaction starts at 50, GC safe point is 60")
s.pdCli.UpdateServiceGCSafePoint(ctx, "service2", 10, 80) //nolint:errcheck
s.pdCli.UpdateServiceGCSafePoint(ctx, "service3", 10, 70) //nolint:errcheck
err = CheckSafetyOfStartTs(ctx, s.pdCli, 65)
c.Assert(err, check.IsNil)
c.Assert(s.pdCli.serviceSafePoint, check.DeepEquals, map[string]uint64{"service1": 60, "service2": 80, "service3": 70, "ticdc-changefeed-creating": 65})

}

type mockPdClientForServiceGCSafePoint struct {
pd.Client
serviceSafePoint map[string]uint64
}

func (m mockPdClientForServiceGCSafePoint) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
minSafePoint := uint64(math.MaxUint64)
for _, safePoint := range m.serviceSafePoint {
if minSafePoint > safePoint {
minSafePoint = safePoint
}
}
if safePoint < minSafePoint && len(m.serviceSafePoint) != 0 {
return minSafePoint, nil
}
m.serviceSafePoint[serviceID] = safePoint
return minSafePoint, nil
}
4 changes: 2 additions & 2 deletions tools/check/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/pingcap/tidb-cdc/_tools
go 1.13

require (
github.com/golangci/golangci-lint v1.30.0 // indirect
github.com/golangci/golangci-lint v1.32.2 // indirect
github.com/mgechev/revive v1.0.2 // indirect
github.com/pingcap/tiup v1.2.1-0.20201020064426-ad64b15d57de // indirect
github.com/pingcap/tiup v1.2.3 // indirect
)
Loading

0 comments on commit f5f773d

Please sign in to comment.