Skip to content

Commit

Permalink
api (ticdc): check http request context in api helper (#11165) (#11398)
Browse files Browse the repository at this point in the history
close #11046
  • Loading branch information
ti-chi-bot authored Aug 2, 2024
1 parent 29f757a commit 08dc05d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 36 deletions.
48 changes: 40 additions & 8 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const RegisterImportTaskPrefix = "/tidb/brie/import"

// APIV2Helpers is a collections of helper functions of OpenAPIV2.
// Defining it as an interface to make APIs more testable.
// Note: Every method in this interface should check the context before returning.
// If the context is canceled, the method should return an error immediately.
type APIV2Helpers interface {
// verifyCreateChangefeedConfig verifies the changefeedConfig,
// and yield a valid changefeedInfo or error
Expand Down Expand Up @@ -105,18 +107,22 @@ type APIV2Helpers interface {
// getEtcdClient returns an Etcd client given the PD endpoints and
// tls config
getEtcdClient(
ctx context.Context,
pdAddrs []string,
tlsConfig *tls.Config,
) (*clientv3.Client, error)

// getKVCreateTiStore wraps kv.createTiStore method to increase testability
createTiStore(
ctx context.Context,
pdAddrs []string,
credential *security.Credential,
) (tidbkv.Storage, error)

// getVerifiedTables wraps entry.VerifyTables to increase testability
getVerifiedTables(replicaConfig *config.ReplicaConfig,
getVerifiedTables(
ctx context.Context,
replicaConfig *config.ReplicaConfig,
storage tidbkv.Storage, startTs uint64,
scheme string, topic string, protocol config.Protocol,
) (ineligibleTables,
Expand Down Expand Up @@ -258,7 +264,8 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
}

// verifyUpstream verifies the upstream config before updating a changefeed
func (h APIV2HelpersImpl) verifyUpstream(ctx context.Context,
func (h APIV2HelpersImpl) verifyUpstream(
ctx context.Context,
changefeedConfig *ChangefeedConfig,
cfInfo *model.ChangeFeedInfo,
) error {
Expand Down Expand Up @@ -402,7 +409,8 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
// overrideCheckpointTs is the checkpointTs of the changefeed that specified by the user.
// or it is the checkpointTs of the changefeed before it is paused.
// we need to check weather the resuming changefeed is gc safe or not.
func (APIV2HelpersImpl) verifyResumeChangefeedConfig(ctx context.Context,
func (APIV2HelpersImpl) verifyResumeChangefeedConfig(
ctx context.Context,
pdClient pd.Client,
gcServiceID string,
changefeedID model.ChangeFeedID,
Expand Down Expand Up @@ -441,7 +449,8 @@ func (APIV2HelpersImpl) verifyResumeChangefeedConfig(ctx context.Context,
}

// getPDClient returns a PDClient given the PD cluster addresses and a credential
func (APIV2HelpersImpl) getPDClient(ctx context.Context,
func (APIV2HelpersImpl) getPDClient(
ctx context.Context,
pdAddrs []string,
credential *security.Credential,
) (pd.Client, error) {
Expand Down Expand Up @@ -473,7 +482,9 @@ func (APIV2HelpersImpl) getPDClient(ctx context.Context,
}

func (h APIV2HelpersImpl) getEtcdClient(
pdAddrs []string, tlsCfg *tls.Config,
ctx context.Context,
pdAddrs []string,
tlsCfg *tls.Config,
) (*clientv3.Client, error) {
conf := config.GetGlobalServerConfig()
grpcTLSOption, err := conf.Security.ToGRPCDialOption()
Expand All @@ -482,7 +493,7 @@ func (h APIV2HelpersImpl) getEtcdClient(
}
logConfig := &logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)
return clientv3.New(
res, err := clientv3.New(
clientv3.Config{
Endpoints: pdAddrs,
TLS: tlsCfg,
Expand All @@ -505,16 +516,33 @@ func (h APIV2HelpersImpl) getEtcdClient(
},
},
)
if err != nil {
return nil, errors.Trace(err)
}
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
}
return res, nil
}

// getTiStore wrap the kv.createTiStore method to increase testability
func (h APIV2HelpersImpl) createTiStore(pdAddrs []string,
func (h APIV2HelpersImpl) createTiStore(
ctx context.Context,
pdAddrs []string,
credential *security.Credential,
) (tidbkv.Storage, error) {
return kv.CreateTiStore(strings.Join(pdAddrs, ","), credential)
res, err := kv.CreateTiStore(strings.Join(pdAddrs, ","), credential)
if err != nil {
return nil, errors.Trace(err)
}
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
}
return res, nil
}

func (h APIV2HelpersImpl) getVerifiedTables(
ctx context.Context,
replicaConfig *config.ReplicaConfig,
storage tidbkv.Storage, startTs uint64,
scheme string, topic string, protocol config.Protocol,
Expand Down Expand Up @@ -551,5 +579,9 @@ func (h APIV2HelpersImpl) getVerifiedTables(
return nil, nil, err
}

if ctx.Err() != nil {
return nil, nil, errors.Trace(ctx.Err())
}

return ineligibleTables, eligibleTables, nil
}
24 changes: 12 additions & 12 deletions cdc/api/v2/api_helpers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
defer pdClient.Close()

// verify tables todo: del kvstore
kvStorage, err := h.helpers.createTiStore(cfg.PDAddrs, credential)
kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
return
Expand Down Expand Up @@ -143,7 +143,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
return
}

cli, err := h.helpers.getEtcdClient(cfg.PDAddrs, tlsCfg)
cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -332,8 +332,8 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

kvStore, err := h.helpers.createTiStore(cfg.PDAddrs, credential)
ctx := c.Request.Context()
kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(err)
return
Expand All @@ -351,7 +351,7 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(replicaCfg.Sink.Protocol))

ineligibleTables, eligibleTables, err := h.helpers.
getVerifiedTables(replicaCfg, kvStore, cfg.StartTs, scheme, topic, protocol)
getVerifiedTables(ctx, replicaCfg, kvStore, cfg.StartTs, scheme, topic, protocol)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -468,7 +468,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
if len(updateCfConfig.PDAddrs) != 0 || upManager == nil {
pdAddrs := updateCfConfig.PDAddrs
credentials := updateCfConfig.PDConfig.toCredential()
storage, err = h.helpers.createTiStore(pdAddrs, credentials)
storage, err = h.helpers.createTiStore(ctx, pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
}
Expand Down
35 changes: 25 additions & 10 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestCreateChangefeed(t *testing.T) {
getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).
Return(pdClient, nil).AnyTimes()
helpers.EXPECT().
createTiStore(gomock.Any(), gomock.Any()).
createTiStore(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, cerrors.ErrNewStore).
Times(1)
cfConfig.PDAddrs = []string{"http://127.0.0.1:2379", "http://127.0.0.1:2382"}
Expand All @@ -152,7 +152,7 @@ func TestCreateChangefeed(t *testing.T) {

// case 4: failed to verify tables
helpers.EXPECT().
createTiStore(gomock.Any(), gomock.Any()).
createTiStore(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil).
AnyTimes()
helpers.EXPECT().
Expand All @@ -176,9 +176,9 @@ func TestCreateChangefeed(t *testing.T) {

// case 5:
helpers.EXPECT().
getEtcdClient(gomock.Any(), gomock.Any()).
getEtcdClient(gomock.Any(), gomock.Any(), gomock.Any()).
Return(testEtcdCluster.RandClient(), nil)
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(),
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil, nil).
AnyTimes()
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestCreateChangefeed(t *testing.T) {

// case 6: success
helpers.EXPECT().
getEtcdClient(gomock.Any(), gomock.Any()).
getEtcdClient(gomock.Any(), gomock.Any(), gomock.Any()).
Return(testEtcdCluster.RandClient(), nil)
ctrl.EXPECT().
CreateChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestUpdateChangefeed(t *testing.T) {
verifyUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).AnyTimes()
helpers.EXPECT().
createTiStore(gomock.Any(), gomock.Any()).
createTiStore(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil).
AnyTimes()
mockCapture.EXPECT().GetUpstreamManager().Return(nil, nil).AnyTimes()
Expand Down Expand Up @@ -649,7 +649,7 @@ func TestVerifyTable(t *testing.T) {
body, err := json.Marshal(&updateCfg)
require.Nil(t, err)
helpers.EXPECT().
createTiStore(gomock.Any(), gomock.Any()).
createTiStore(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, cerrors.ErrNewStore).
Times(1)

Expand All @@ -664,10 +664,10 @@ func TestVerifyTable(t *testing.T) {

// case 3: getVerifiedTables failed
helpers.EXPECT().
createTiStore(gomock.Any(), gomock.Any()).
createTiStore(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil).
AnyTimes()
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(),
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil, cerrors.ErrFilterRuleInvalid).
Times(1)
Expand All @@ -689,7 +689,7 @@ func TestVerifyTable(t *testing.T) {
ineligible := []model.TableName{
{Schema: "test", Table: "invalidTable"},
}
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(),
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).
Return(eligible, ineligible, nil)

Expand All @@ -701,6 +701,21 @@ func TestVerifyTable(t *testing.T) {
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, http.StatusOK, w.Code)

// case 5: context canceled
ctx, cancel := context.WithCancel(context.Background())
cancel()
helpers.EXPECT().getVerifiedTables(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil, ctx.Err()).
Times(1)
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(ctx, verify.method, verify.url, bytes.NewReader(body))
router.ServeHTTP(w, req)
respErr = model.HTTPError{}
err = json.NewDecoder(w.Body).Decode(&respErr)
require.Nil(t, err)
require.Contains(t, respErr.Error, "context canceled")
}

func TestResumeChangefeed(t *testing.T) {
Expand Down

0 comments on commit 08dc05d

Please sign in to comment.