diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index ca5f1fe11ea..66a22d98c26 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -32,10 +32,13 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/pingcap/sysutil" + "github.com/pkg/errors" "github.com/soheilhy/cmux" "github.com/spf13/cobra" bs "github.com/tikv/pd/pkg/basicserver" @@ -66,6 +69,11 @@ const ( // tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes. // format: "/ms/{cluster_id}/tso". tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName + + // maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID. + maxRetryTimesWaitAPIService = 60 + // retryIntervalWaitAPIService is the interval to retry. + retryIntervalWaitAPIService = 3 * time.Second ) var _ bs.Server = (*Server)(nil) @@ -147,6 +155,15 @@ func (s *Server) GetAddr() string { // Run runs the TSO server. func (s *Server) Run() error { + skipWaitAPIServiceReady := false + failpoint.Inject("skipWaitAPIServiceReady", func() { + skipWaitAPIServiceReady = true + }) + if !skipWaitAPIServiceReady { + if err := s.waitAPIServiceReady(); err != nil { + return err + } + } go systimemon.StartMonitor(s.ctx, time.Now, func() { log.Error("system time jumps backward", errs.ZapError(errs.ErrIncorrectSystemTime)) timeJumpBackCounter.Inc() @@ -517,6 +534,51 @@ func (s *Server) startServer() (err error) { return nil } +func (s *Server) waitAPIServiceReady() error { + for i := 0; i < maxRetryTimesWaitAPIService; i++ { + ready, err := s.isAPIServiceReady() + if err != nil { + log.Warn("failed to check api server ready", errs.ZapError(err)) + } + if ready { + return nil + } + select { + case <-s.ctx.Done(): + return errors.New("context canceled while waiting api server ready") + case <-time.After(retryIntervalWaitAPIService): + log.Debug("api server is not ready, retrying") + } + } + return errors.Errorf("failed to wait api server ready after retrying %d times", maxRetryTimesWaitAPIService) +} + +func (s *Server) isAPIServiceReady() (bool, error) { + urls := strings.Split(s.cfg.BackendEndpoints, ",") + if len(urls) == 0 { + return false, errors.New("no backend endpoints") + } + cc, err := s.GetDelegateClient(s.ctx, urls[0]) + if err != nil { + return false, err + } + clusterInfo, err := pdpb.NewPDClient(cc).GetClusterInfo(s.ctx, &pdpb.GetClusterInfoRequest{}) + if err != nil { + return false, err + } + if clusterInfo.GetHeader().GetError() != nil { + return false, errors.Errorf(clusterInfo.GetHeader().GetError().String()) + } + modes := clusterInfo.ServiceModes + if len(modes) == 0 { + return false, errors.New("no service mode") + } + if modes[0] == pdpb.ServiceMode_API_SVC_MODE { + return true, nil + } + return false, nil +} + // CreateServer creates the Server func CreateServer(ctx context.Context, cfg *Config) *Server { svr := &Server{ diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index 3ca1ad39436..0b07c528aac 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -85,19 +85,22 @@ func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Asser return s, cleanup } -// StartSingleTSOTestServer creates and starts a tso server with default config for testing. -func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) { +// StartSingleTSOTestServerWithoutCheck creates and starts a tso server with default config for testing. +func StartSingleTSOTestServerWithoutCheck(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func(), error) { cfg := tso.NewConfig() cfg.BackendEndpoints = backendEndpoints cfg.ListenAddr = listenAddrs cfg, err := tso.GenerateConfig(cfg) re.NoError(err) - // Setup the logger. err = InitLogger(cfg) re.NoError(err) + return NewTSOTestServer(ctx, cfg) +} - s, cleanup, err := NewTSOTestServer(ctx, cfg) +// StartSingleTSOTestServer creates and starts a tso server with default config for testing. +func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) { + s, cleanup, err := StartSingleTSOTestServerWithoutCheck(ctx, re, backendEndpoints, listenAddrs) re.NoError(err) testutil.Eventually(re, func() bool { return !s.IsClosed() diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index b3453140543..b320e191b57 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -154,7 +154,9 @@ func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() { func TestTSOPath(t *testing.T) { re := require.New(t) checkTSOPath(re, true /*isAPIServiceMode*/) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/tso/server/skipWaitAPIServiceReady", "return(true)")) checkTSOPath(re, false /*isAPIServiceMode*/) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/tso/server/skipWaitAPIServiceReady")) } func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { @@ -210,6 +212,56 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int return count } +func TestWaitAPIServiceReady(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + startCluster := func(isAPIServiceMode bool) (cluster *tests.TestCluster, backendEndpoints string) { + var err error + if isAPIServiceMode { + cluster, err = tests.NewTestAPICluster(ctx, 1) + } else { + cluster, err = tests.NewTestCluster(ctx, 1) + } + re.NoError(err) + err = cluster.RunInitialServers() + re.NoError(err) + leaderName := cluster.WaitLeader() + pdLeader := cluster.GetServer(leaderName) + return cluster, pdLeader.GetAddr() + } + + // tso server cannot be started because the pd server is not ready as api service. + cluster, backendEndpoints := startCluster(false /*isAPIServiceMode*/) + sctx, scancel := context.WithTimeout(ctx, time.Second*10) + defer scancel() + s, _, err := mcs.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc()) + re.Error(err) + re.Nil(s) + cluster.Destroy() + + // tso server can be started because the pd server is ready as api service. + cluster, backendEndpoints = startCluster(true /*isAPIServiceMode*/) + sctx, scancel = context.WithTimeout(ctx, time.Second*10) + defer scancel() + s, cleanup, err := mcs.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc()) + re.NoError(err) + defer cluster.Destroy() + defer cleanup() + + for i := 0; i < 12; i++ { + select { + case <-time.After(time.Second): + case <-sctx.Done(): + return + } + if s != nil && s.IsServing() { + break + } + } +} + type APIServerForwardTestSuite struct { suite.Suite ctx context.Context diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 1b3e4ae25c6..b8531c3adbf 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -307,9 +307,13 @@ func TestMixedTSODeployment(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - defer re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) - defer re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateServiceMode")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/tso/server/skipWaitAPIServiceReady", "return(true)")) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateServiceMode")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/tso/server/skipWaitAPIServiceReady")) + }() ctx, cancel := context.WithCancel(context.Background()) cluster, err := tests.NewTestCluster(ctx, 1)