Skip to content

Commit

Permalink
fix failpoint and pass the context
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 6, 2023
1 parent 566beda commit 19af41d
Show file tree
Hide file tree
Showing 17 changed files with 38 additions and 40 deletions.
2 changes: 1 addition & 1 deletion pkg/dashboard/adapter/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (m *Manager) updateInfo() {
m.isLeader = true

var err error
if m.members, err = cluster.GetMembers(m.srv.GetClient()); err != nil {
if m.members, err = cluster.GetMembers(m.srv.GetClient().Ctx(), m.srv.GetClient()); err != nil {
log.Warn("failed to get members", errs.ZapError(err))
m.members = nil
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (m *EmbeddedEtcdMember) ResignEtcdLeader(ctx context.Context, from string,
log.Info("try to resign etcd leader to next pd-server", zap.String("from", from), zap.String("to", nextEtcdLeader))
// Determine next etcd leader candidates.
var etcdLeaderIDs []uint64
res, err := etcdutil.ListEtcdMembers(m.client)
res, err := etcdutil.ListEtcdMembers(ctx, m.client)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
failpoint.Inject("concurrentRemoveOperator", func() {
time.Sleep(500 * time.Millisecond)
})

// Update operator status:
// The operator status should be STARTED.
// Check will call CheckSuccess and CheckTimeout.
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ func (suite *operatorControllerTestSuite) TestConcurrentRemoveOperator() {
oc.SetOperator(op1)

suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator", "return(true)"))
defer suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator"))

var wg sync.WaitGroup
wg.Add(2)
go func() {
Expand All @@ -324,6 +322,7 @@ func (suite *operatorControllerTestSuite) TestConcurrentRemoveOperator() {
wg.Wait()

suite.Equal(op2, oc.GetOperator(1))
suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator"))
}

func (suite *operatorControllerTestSuite) TestPollDispatchRegion() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func AddEtcdMember(client *clientv3.Client, urls []string) (*clientv3.MemberAddR
}

// ListEtcdMembers returns a list of internal etcd members.
func ListEtcdMembers(client *clientv3.Client) (*clientv3.MemberListResponse, error) {
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
func ListEtcdMembers(ctx context.Context, client *clientv3.Client) (*clientv3.MemberListResponse, error) {
ctx, cancel := context.WithTimeout(ctx, DefaultRequestTimeout)
listResp, err := client.MemberList(ctx)
cancel()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMemberHelpers(t *testing.T) {
<-etcd1.Server.ReadyNotify()

// Test ListEtcdMembers
listResp1, err := ListEtcdMembers(client1)
listResp1, err := ListEtcdMembers(client1.Ctx(), client1)
re.NoError(err)
re.Len(listResp1.Members, 1)
// types.ID is an alias of uint64.
Expand All @@ -79,7 +79,7 @@ func TestMemberHelpers(t *testing.T) {
_, err = RemoveEtcdMember(client1, uint64(etcd2.Server.ID()))
re.NoError(err)

listResp3, err := ListEtcdMembers(client1)
listResp3, err := ListEtcdMembers(client1.Ctx(), client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd1.Server.ID()), listResp3.Members[0].ID)
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestEtcdClientSync(t *testing.T) {
etcd1.Close()

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
listResp3, err := ListEtcdMembers(client1.Ctx(), client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)
Expand Down Expand Up @@ -338,7 +338,7 @@ func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Clien

func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) {
// Check the client can get the new member.
listResp, err := ListEtcdMembers(client)
listResp, err := ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
re.Len(listResp.Members, len(etcds))
inList := func(m *etcdserverpb.Member) bool {
Expand Down
2 changes: 1 addition & 1 deletion server/api/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newHealthHandler(svr *server.Server, rd *render.Render) *healthHandler {
// @Router /health [get]
func (h *healthHandler) GetHealthStatus(w http.ResponseWriter, r *http.Request) {
client := h.svr.GetClient()
members, err := cluster.GetMembers(client)
members, err := cluster.GetMembers(r.Context(), client)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
6 changes: 3 additions & 3 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (h *memberHandler) DeleteMemberByName(w http.ResponseWriter, r *http.Reques
// Get etcd ID by name.
var id uint64
name := mux.Vars(r)["name"]
listResp, err := etcdutil.ListEtcdMembers(client)
listResp, err := etcdutil.ListEtcdMembers(r.Context(), client)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -287,7 +287,7 @@ func (h *leaderHandler) GetLeader(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /leader/resign [post]
func (h *leaderHandler) ResignLeader(w http.ResponseWriter, r *http.Request) {
err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), "")
err := h.svr.GetMember().ResignEtcdLeader(r.Context(), h.svr.Name(), "")
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand All @@ -304,7 +304,7 @@ func (h *leaderHandler) ResignLeader(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /leader/transfer/{nextLeader} [post]
func (h *leaderHandler) TransferLeader(w http.ResponseWriter, r *http.Request) {
err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), mux.Vars(r)["next_leader"])
err := h.svr.GetMember().ResignEtcdLeader(r.Context(), h.svr.Name(), mux.Vars(r)["next_leader"])
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
8 changes: 4 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2118,7 +2118,7 @@ func (c *RaftCluster) resetClusterMetrics() {
}

func (c *RaftCluster) collectHealthStatus() {
members, err := GetMembers(c.etcdClient)
members, err := GetMembers(c.etcdClient.Ctx(), c.etcdClient)
if err != nil {
log.Error("get members error", errs.ZapError(err))
}
Expand Down Expand Up @@ -2660,8 +2660,8 @@ func CheckHealth(client *http.Client, members []*pdpb.Member) map[uint64]*pdpb.M
}

// GetMembers return a slice of Members.
func GetMembers(etcdClient *clientv3.Client) ([]*pdpb.Member, error) {
listResp, err := etcdutil.ListEtcdMembers(etcdClient)
func GetMembers(ctx context.Context, etcdClient *clientv3.Client) ([]*pdpb.Member, error) {
listResp, err := etcdutil.ListEtcdMembers(ctx, etcdClient)
if err != nil {
return nil, err
}
Expand All @@ -2682,7 +2682,7 @@ func GetMembers(etcdClient *clientv3.Client) ([]*pdpb.Member, error) {

// IsClientURL returns whether addr is a ClientUrl of any member.
func IsClientURL(addr string, etcdClient *clientv3.Client) bool {
members, err := GetMembers(etcdClient)
members, err := GetMembers(etcdClient.Ctx(), etcdClient)
if err != nil {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoR
}

// GetMembers implements gRPC PDServer.
func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
func (s *GrpcServer) GetMembers(ctx context.Context, _ *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
// at startup and needs to get the cluster ID with the first request (i.e. GetMembers).
if s.IsClosed() {
Expand All @@ -138,7 +138,7 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb
},
}, nil
}
members, err := cluster.GetMembers(s.GetClient())
members, err := cluster.GetMembers(ctx, s.GetClient())
if err != nil {
return &pdpb.GetMembersResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
Expand Down
4 changes: 2 additions & 2 deletions server/join/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func PrepareJoinCluster(cfg *config.Config) error {
}
defer client.Close()

listResp, err := etcdutil.ListEtcdMembers(client)
listResp, err := etcdutil.ListEtcdMembers(client.Ctx(), client)
if err != nil {
return err
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func PrepareJoinCluster(cfg *config.Config) error {
)

for i := 0; i < listMemberRetryTimes; i++ {
listResp, err = etcdutil.ListEtcdMembers(client)
listResp, err = etcdutil.ListEtcdMembers(client.Ctx(), client)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions server/region_syncer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestLoadRegion(t *testing.T) {
for i := 0; i < 30; i++ {
rs.SaveRegion(&metapb.Region{Id: uint64(i) + 1})
}
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/base_backend/slowLoadRegion", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/endpoint/slowLoadRegion", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/base_backend/slowLoadRegion"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/endpoint/slowLoadRegion"))
}()

rc := NewRegionSyncer(server)
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (s *Server) startEtcd(ctx context.Context) error {
}

// update advertise peer urls.
etcdMembers, err := etcdutil.ListEtcdMembers(s.client)
etcdMembers, err := etcdutil.ListEtcdMembers(newCtx, s.client)
if err != nil {
return err
}
Expand Down Expand Up @@ -845,7 +845,7 @@ func (s *Server) GetMembers() ([]*pdpb.Member, error) {
if s.IsClosed() {
return nil, errs.ErrServerNotStarted.FastGenByArgs()
}
return cluster.GetMembers(s.GetClient())
return cluster.GetMembers(s.GetClient().Ctx(), s.GetClient())
}

// GetServiceMiddlewareConfig gets the service middleware config information.
Expand Down
8 changes: 4 additions & 4 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
func (suite *tsoClientTestSuite) TestRandomResignLeader() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))

parallelAct := func() {
// After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here.
Expand Down Expand Up @@ -265,12 +264,12 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() {
}

mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
}

func (suite *tsoClientTestSuite) TestRandomShutdown() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))

parallelAct := func() {
// After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here.
Expand All @@ -289,16 +288,15 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() {
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct)
suite.TearDownSuite()
suite.SetupSuite()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
}

// When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time.
func TestMixedTSODeployment(t *testing.T) {
re := require.New(t)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)"))
defer re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateServiceMode"))

ctx, cancel := context.WithCancel(context.Background())
cluster, err := tests.NewTestCluster(ctx, 1)
Expand Down Expand Up @@ -335,6 +333,8 @@ func TestMixedTSODeployment(t *testing.T) {
cancel1()
}()
wg.Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateServiceMode"))
}

func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) {
Expand Down
2 changes: 1 addition & 1 deletion tests/pdctl/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestHealth(t *testing.T) {
defer tc.Destroy()

client := tc.GetEtcdClient()
members, err := cluster.GetMembers(client)
members, err := cluster.GetMembers(client.Ctx(), client)
re.NoError(err)
healthMembers := cluster.CheckHealth(tc.GetHTTPClient(), members)
healths := []api.Health{}
Expand Down
6 changes: 3 additions & 3 deletions tests/pdctl/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ func TestMember(t *testing.T) {
// member delete name <member_name>
err = svr.Destroy()
re.NoError(err)
members, err := etcdutil.ListEtcdMembers(client)
members, err := etcdutil.ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
re.Len(members.Members, 3)
args = []string{"-u", pdAddr, "member", "delete", "name", name}
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
testutil.Eventually(re, func() bool {
members, err = etcdutil.ListEtcdMembers(client)
members, err = etcdutil.ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
return len(members.Members) == 2
})
Expand All @@ -104,7 +104,7 @@ func TestMember(t *testing.T) {
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
testutil.Eventually(re, func() bool {
members, err = etcdutil.ListEtcdMembers(client)
members, err = etcdutil.ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
return len(members.Members) == 2
})
Expand Down
10 changes: 5 additions & 5 deletions tests/server/join/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestSimpleJoin(t *testing.T) {

pd1 := cluster.GetServer("pd1")
client := pd1.GetEtcdClient()
members, err := etcdutil.ListEtcdMembers(client)
members, err := etcdutil.ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
re.Len(members.Members, 1)

Expand All @@ -58,7 +58,7 @@ func TestSimpleJoin(t *testing.T) {
re.NoError(err)
_, err = os.Stat(path.Join(pd2.GetConfig().DataDir, "join"))
re.False(os.IsNotExist(err))
members, err = etcdutil.ListEtcdMembers(client)
members, err = etcdutil.ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
re.Len(members.Members, 2)
re.Equal(pd1.GetClusterID(), pd2.GetClusterID())
Expand All @@ -73,7 +73,7 @@ func TestSimpleJoin(t *testing.T) {
re.NoError(err)
_, err = os.Stat(path.Join(pd3.GetConfig().DataDir, "join"))
re.False(os.IsNotExist(err))
members, err = etcdutil.ListEtcdMembers(client)
members, err = etcdutil.ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
re.Len(members.Members, 3)
re.Equal(pd1.GetClusterID(), pd3.GetClusterID())
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestFailedAndDeletedPDJoinsPreviousCluster(t *testing.T) {
res := cluster.RunServer(pd3)
re.Error(<-res)

members, err := etcdutil.ListEtcdMembers(client)
members, err := etcdutil.ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
re.Len(members.Members, 2)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestDeletedPDJoinsPreviousCluster(t *testing.T) {
res := cluster.RunServer(pd3)
re.Error(<-res)

members, err := etcdutil.ListEtcdMembers(client)
members, err := etcdutil.ListEtcdMembers(client.Ctx(), client)
re.NoError(err)
re.Len(members.Members, 2)
}
Expand Down

0 comments on commit 19af41d

Please sign in to comment.