Skip to content

Commit

Permalink
Merge branch 'master' into bug-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dengqee committed Apr 8, 2021
2 parents eb3bdcd + 6c41167 commit 6d01841
Show file tree
Hide file tree
Showing 116 changed files with 5,195 additions and 1,766 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@ FAILPOINT := bin/failpoint-ctl
FAILPOINT_ENABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) enable >/dev/null)
FAILPOINT_DISABLE := $$(find $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev/null)

RELEASE_VERSION ?= $(shell git describe --tags --dirty="-dev")
RELEASE_VERSION := v5.0.0-master
ifneq ($(shell git rev-parse --abbrev-ref HEAD | egrep '^release-[0-9]\.[0-9].*$$|^HEAD$$'),)
# If we are in release branch, use tag version.
RELEASE_VERSION := $(shell git describe --tags --dirty="-dirty")
else ifneq ($(shell git status --porcelain),)
# Add -dirty if the working tree is dirty for non release branch.
RELEASE_VERSION := $(RELEASE_VERSION)-dirty
endif

LDFLAGS += -X "$(CDC_PKG)/pkg/version.ReleaseVersion=$(RELEASE_VERSION)"
LDFLAGS += -X "$(CDC_PKG)/pkg/version.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')"
LDFLAGS += -X "$(CDC_PKG)/pkg/version.GitHash=$(shell git rev-parse HEAD)"
Expand Down
18 changes: 9 additions & 9 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"github.com/pingcap/ticdc/pkg/version"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -41,13 +43,10 @@ import (
"google.golang.org/grpc/backoff"
)

const (
captureSessionTTL = 3
)

// processorOpts records options for processor
type processorOpts struct {
// captureOpts records options for capture
type captureOpts struct {
flushCheckpointInterval time.Duration
captureSessionTTL int
}

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
Expand All @@ -67,7 +66,7 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

opts *processorOpts
opts *captureOpts
closed chan struct{}
}

Expand All @@ -78,7 +77,7 @@ func NewCapture(
pdCli pd.Client,
credential *security.Credential,
advertiseAddr string,
opts *processorOpts,
opts *captureOpts,
) (c *Capture, err error) {
tlsConfig, err := credential.ToTLSConfig()
if err != nil {
Expand Down Expand Up @@ -114,7 +113,7 @@ func NewCapture(
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}
sess, err := concurrency.NewSession(etcdCli,
concurrency.WithTTL(captureSessionTTL))
concurrency.WithTTL(opts.captureSessionTTL))
if err != nil {
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
Expand All @@ -124,6 +123,7 @@ func NewCapture(
info := &model.CaptureInfo{
ID: id,
AdvertiseAddr: advertiseAddr,
Version: version.ReleaseVersion,
}
processorManager := processor.NewManager(pdCli, credential, info)
log.Info("creating capture", zap.String("capture-id", id), util.ZapFieldCapture(ctx))
Expand Down
4 changes: 2 additions & 2 deletions cdc/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *captureSuite) TestCaptureSuicide(c *check.C) {
defer cancel()
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, "127.0.0.1:12034",
&processorOpts{flushCheckpointInterval: time.Millisecond * 200})
&captureOpts{flushCheckpointInterval: time.Millisecond * 200})
c.Assert(err, check.IsNil)

var wg sync.WaitGroup
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *captureSuite) TestCaptureSessionDoneDuringHandleTask(c *check.C) {
defer cancel()
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, "127.0.0.1:12034",
&processorOpts{flushCheckpointInterval: time.Millisecond * 200})
&captureOpts{flushCheckpointInterval: time.Millisecond * 200})
c.Assert(err, check.IsNil)

runProcessorCount := 0
Expand Down
38 changes: 26 additions & 12 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -71,8 +72,18 @@ type ChangeFeedRWriter interface {

// GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed
GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error)

// PutAllChangeFeedStatus the changefeed info to storage such as etcd.
PutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus) error

// LeaseGuardRemoveAllTaskStatus wraps RemoveAllTaskStatus with a context restricted by lease TTL.
LeaseGuardRemoveAllTaskStatus(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error

// LeaseGuardRemoveAllTaskPositions wraps RemoveAllTaskPositions with a context restricted by lease TTL.
LeaseGuardRemoveAllTaskPositions(ctx context.Context, changefeedID string, leaseID clientv3.LeaseID) error

// LeaseGuardPutAllChangeFeedStatus wraps PutAllChangeFeedStatus with a context restricted by lease TTL.
LeaseGuardPutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus, leaseID clientv3.LeaseID) error
}

type changeFeed struct {
Expand Down Expand Up @@ -119,6 +130,7 @@ type changeFeed struct {
lastRebalanceTime time.Time

etcdCli kv.CDCEtcdClient
leaseID clientv3.LeaseID

// context cancel function for all internal goroutines
cancel context.CancelFunc
Expand Down Expand Up @@ -321,7 +333,7 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model
id := id
targetTs := targetTs
updateFuncs[captureID] = append(updateFuncs[captureID], func(_ int64, status *model.TaskStatus) (bool, error) {
status.RemoveTable(id, targetTs)
status.RemoveTable(id, targetTs, false /*isMoveTable*/)
return true, nil
})
cleanedTables[id] = struct{}{}
Expand Down Expand Up @@ -366,7 +378,7 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model
}

for captureID, funcs := range updateFuncs {
newStatus, _, err := c.etcdCli.AtomicPutTaskStatus(ctx, c.id, captureID, funcs...)
newStatus, _, err := c.etcdCli.LeaseGuardAtomicPutTaskStatus(ctx, c.id, captureID, c.leaseID, funcs...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -391,15 +403,17 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model

func (c *changeFeed) updateTaskStatus(ctx context.Context, taskStatus map[model.CaptureID]*model.TaskStatus) error {
for captureID, status := range taskStatus {
newStatus, _, err := c.etcdCli.AtomicPutTaskStatus(ctx, c.id, captureID, func(modRevision int64, taskStatus *model.TaskStatus) (bool, error) {
if taskStatus.SomeOperationsUnapplied() {
log.Error("unexpected task status, there are operations unapplied in this status", zap.Any("status", taskStatus))
return false, cerror.ErrWaitHandleOperationTimeout.GenWithStackByArgs()
}
taskStatus.Tables = status.Tables
taskStatus.Operation = status.Operation
return true, nil
})
newStatus, _, err := c.etcdCli.LeaseGuardAtomicPutTaskStatus(
ctx, c.id, captureID, c.leaseID,
func(modRevision int64, taskStatus *model.TaskStatus) (bool, error) {
if taskStatus.SomeOperationsUnapplied() {
log.Error("unexpected task status, there are operations unapplied in this status", zap.Any("status", taskStatus))
return false, cerror.ErrWaitHandleOperationTimeout.GenWithStackByArgs()
}
taskStatus.Tables = status.Tables
taskStatus.Operation = status.Operation
return true, nil
})
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -530,7 +544,7 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model
// To ensure that the replication pipeline stops exactly at the boundary TS,
// The boundary TS specified by Remove Table Operation MUST greater or equal to the checkpoint TS of this table.
// So the global resolved TS is a reasonable values.
replicaInfo, exist := status.RemoveTable(tableID, c.status.ResolvedTs)
replicaInfo, exist := status.RemoveTable(tableID, c.status.ResolvedTs, true /*isMoveTable*/)
if !exist {
delete(c.moveTableJobs, tableID)
log.Warn("ignored the move job, the table is not exist in the source capture", zap.Reflect("job", job))
Expand Down
7 changes: 5 additions & 2 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
PhysicalTableID: physicalTableID,
Delete: raw.OpType == model.OpTypeDelete,
}
snap, err := m.schemaStorage.GetSnapshot(ctx, raw.CRTs)
// when async commit is enabled, the commitTs of DMLs may be equals with DDL finishedTs
// a DML whose commitTs is equal to a DDL finishedTs using the schema info before the DDL
snap, err := m.schemaStorage.GetSnapshot(ctx, raw.CRTs-1)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -582,7 +584,8 @@ func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, e
// Encode bits as integers to avoid pingcap/tidb#10988 (which also affects MySQL itself)
v, err := datum.GetBinaryLiteral().ToInt(nil)
return v, "", err
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
b := datum.GetBytes()
if b == nil {
b = emptyBytes
Expand Down
3 changes: 2 additions & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testkit"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -254,7 +255,7 @@ func testMounterDisableOldValue(c *check.C, tc struct {
tk.MustExec(insertSQL, params...)
}

ver, err := store.CurrentVersion()
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
c.Assert(err, check.IsNil)
scheamStorage.AdvanceResolvedTs(ver.Ver)
mounter := NewMounter(scheamStorage, 1, false).(*mounterImpl)
Expand Down
9 changes: 5 additions & 4 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testkit"
)
Expand Down Expand Up @@ -681,7 +682,7 @@ func (t *schemaSuite) TestCreateSnapFromMeta(c *check.C) {
tk.MustExec("create table test2.simple_test3 (id bigint primary key)")
tk.MustExec("create table test2.simple_test4 (id bigint primary key)")
tk.MustExec("create table test2.simple_test5 (a bigint)")
ver, err := store.CurrentVersion()
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
c.Assert(err, check.IsNil)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -717,7 +718,7 @@ func (t *schemaSuite) TestSnapshotClone(c *check.C) {
tk.MustExec("create table test2.simple_test3 (id bigint primary key)")
tk.MustExec("create table test2.simple_test4 (id bigint primary key)")
tk.MustExec("create table test2.simple_test5 (a bigint)")
ver, err := store.CurrentVersion()
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
c.Assert(err, check.IsNil)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -753,15 +754,15 @@ func (t *schemaSuite) TestExplicitTables(c *check.C) {
defer domain.Close()
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(c, store)
ver1, err := store.CurrentVersion()
ver1, err := store.CurrentVersion(oracle.GlobalTxnScope)
c.Assert(err, check.IsNil)
tk.MustExec("create database test2")
tk.MustExec("create table test.simple_test1 (id bigint primary key)")
tk.MustExec("create table test.simple_test2 (id bigint unique key)")
tk.MustExec("create table test2.simple_test3 (a bigint)")
tk.MustExec("create table test2.simple_test4 (a varchar(20) unique key)")
tk.MustExec("create table test2.simple_test5 (a varchar(20))")
ver2, err := store.CurrentVersion()
ver2, err := store.CurrentVersion(oracle.GlobalTxnScope)
c.Assert(err, check.IsNil)
meta1, err := kv.GetSnapshotMeta(store, ver1.Ver)
c.Assert(err, check.IsNil)
Expand Down
18 changes: 6 additions & 12 deletions cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/version"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -57,27 +56,22 @@ func (s *Server) startStatusHTTP() error {

prometheus.DefaultGatherer = registry
serverMux.Handle("/metrics", promhttp.Handler())

credential := &security.Credential{}
if s.opts.credential != nil {
credential = s.opts.credential
}
tlsConfig, err := credential.ToTLSConfigWithVerify()
conf := config.GetGlobalServerConfig()
tlsConfig, err := conf.Security.ToTLSConfigWithVerify()
if err != nil {
log.Error("status server get tls config failed", zap.Error(err))
return errors.Trace(err)
}
addr := s.opts.addr
s.statusServer = &http.Server{Addr: addr, Handler: serverMux, TLSConfig: tlsConfig}
s.statusServer = &http.Server{Addr: conf.Addr, Handler: serverMux, TLSConfig: tlsConfig}

ln, err := net.Listen("tcp", addr)
ln, err := net.Listen("tcp", conf.Addr)
if err != nil {
return cerror.WrapError(cerror.ErrServeHTTP, err)
}
go func() {
log.Info("status http server is running", zap.String("addr", addr))
log.Info("status http server is running", zap.String("addr", conf.Addr))
if tlsConfig != nil {
err = s.statusServer.ServeTLS(ln, credential.CertPath, credential.KeyPath)
err = s.statusServer.ServeTLS(ln, conf.Security.CertPath, conf.Security.KeyPath)
} else {
err = s.statusServer.Serve(ln)
}
Expand Down
32 changes: 16 additions & 16 deletions cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.etcd.io/etcd/clientv3/concurrency"
Expand All @@ -32,16 +33,10 @@ var _ = check.Suite(&httpStatusSuite{})

const retryTime = 20

var testingServerOptions = options{
pdEndpoints: "http://127.0.0.1:2379",
addr: "127.0.0.1:8300",
advertiseAddr: "127.0.0.1:8300",
timezone: nil,
gcTTL: DefaultCDCGCSafePointTTL,
}
var advertiseAddr4Test = "127.0.0.1:8300"

func (s *httpStatusSuite) waitUntilServerOnline(c *check.C) {
statusURL := fmt.Sprintf("http://%s/status", testingServerOptions.advertiseAddr)
statusURL := fmt.Sprintf("http://%s/status", advertiseAddr4Test)
for i := 0; i < retryTime; i++ {
resp, err := http.Get(statusURL)
if err == nil {
Expand All @@ -57,8 +52,13 @@ func (s *httpStatusSuite) waitUntilServerOnline(c *check.C) {

func (s *httpStatusSuite) TestHTTPStatus(c *check.C) {
defer testleak.AfterTest(c)()
server := &Server{opts: testingServerOptions}
err := server.startStatusHTTP()
conf := config.GetDefaultServerConfig()
conf.Addr = advertiseAddr4Test
conf.AdvertiseAddr = advertiseAddr4Test
config.StoreGlobalServerConfig(conf)
server, err := NewServer([]string{"http://127.0.0.1:2379"})
c.Assert(err, check.IsNil)
err = server.startStatusHTTP()
c.Assert(err, check.IsNil)
defer func() {
c.Assert(server.statusServer.Close(), check.IsNil)
Expand All @@ -75,7 +75,7 @@ func (s *httpStatusSuite) TestHTTPStatus(c *check.C) {
}

func testPprof(c *check.C) {
resp, err := http.Get(fmt.Sprintf("http://%s/debug/pprof/cmdline", testingServerOptions.advertiseAddr))
resp, err := http.Get(fmt.Sprintf("http://%s/debug/pprof/cmdline", advertiseAddr4Test))
c.Assert(err, check.IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, check.Equals, 200)
Expand All @@ -84,31 +84,31 @@ func testPprof(c *check.C) {
}

func testReisgnOwner(c *check.C) {
uri := fmt.Sprintf("http://%s/capture/owner/resign", testingServerOptions.advertiseAddr)
uri := fmt.Sprintf("http://%s/capture/owner/resign", advertiseAddr4Test)
testHTTPPostOnly(c, uri)
testRequestNonOwnerFailed(c, uri)
}

func testHandleChangefeedAdmin(c *check.C) {
uri := fmt.Sprintf("http://%s/capture/owner/admin", testingServerOptions.advertiseAddr)
uri := fmt.Sprintf("http://%s/capture/owner/admin", advertiseAddr4Test)
testHTTPPostOnly(c, uri)
testRequestNonOwnerFailed(c, uri)
}

func testHandleRebalance(c *check.C) {
uri := fmt.Sprintf("http://%s/capture/owner/rebalance_trigger", testingServerOptions.advertiseAddr)
uri := fmt.Sprintf("http://%s/capture/owner/rebalance_trigger", advertiseAddr4Test)
testHTTPPostOnly(c, uri)
testRequestNonOwnerFailed(c, uri)
}

func testHandleMoveTable(c *check.C) {
uri := fmt.Sprintf("http://%s/capture/owner/move_table", testingServerOptions.advertiseAddr)
uri := fmt.Sprintf("http://%s/capture/owner/move_table", advertiseAddr4Test)
testHTTPPostOnly(c, uri)
testRequestNonOwnerFailed(c, uri)
}

func testHandleChangefeedQuery(c *check.C) {
uri := fmt.Sprintf("http://%s/capture/owner/changefeed/query", testingServerOptions.advertiseAddr)
uri := fmt.Sprintf("http://%s/capture/owner/changefeed/query", advertiseAddr4Test)
testHTTPPostOnly(c, uri)
testRequestNonOwnerFailed(c, uri)
}
Expand Down
Loading

0 comments on commit 6d01841

Please sign in to comment.