Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move tso to independent thread #8720

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ func (k *serviceModeKeeper) close() {
k.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if k.tsoClient != nil {
k.tsoClient.close()
}
k.tsoClient.close()
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
}
}
Expand Down
3 changes: 3 additions & 0 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,9 @@ func (c *pdServiceDiscovery) checkFollowerHealth(ctx context.Context) {

// Close releases all resources.
func (c *pdServiceDiscovery) Close() {
if c == nil {
return
}
c.closeOnce.Do(func() {
log.Info("[pd] close pd service discovery client")
c.clientConns.Range(func(key, cc any) bool {
Expand Down
3 changes: 3 additions & 0 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ func (c *tsoServiceDiscovery) retry(

// Close releases all resources
func (c *tsoServiceDiscovery) Close() {
if c == nil {
return
}
log.Info("closing tso service discovery")

c.cancel()
Expand Down
106 changes: 91 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"context"
"encoding/json"
errorspkg "errors"
"fmt"
"io"
"math"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/ratelimit"
Expand All @@ -56,6 +58,7 @@ import (
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/syncer"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/unsaferecovery"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
Expand Down Expand Up @@ -88,12 +91,13 @@ const (
// nodeStateCheckJobInterval is the interval to run node state check job.
nodeStateCheckJobInterval = 10 * time.Second
// metricsCollectionJobInterval is the interval to run metrics collection job.
metricsCollectionJobInterval = 10 * time.Second
updateStoreStatsInterval = 9 * time.Millisecond
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
serviceCheckInterval = 10 * time.Second
metricsCollectionJobInterval = 10 * time.Second
updateStoreStatsInterval = 9 * time.Millisecond
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
schedulingServiceCheckInterval = 10 * time.Second
tsoServiceCheckInterval = 100 * time.Millisecond
// persistLimitRetryTimes is used to reduce the probability of the persistent error
// since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist.
persistLimitRetryTimes = 5
Expand Down Expand Up @@ -144,6 +148,7 @@ type RaftCluster struct {
cancel context.CancelFunc

*core.BasicCluster // cached cluster info
member *member.EmbeddedEtcdMember

etcdClient *clientv3.Client
httpClient *http.Client
Expand Down Expand Up @@ -174,6 +179,7 @@ type RaftCluster struct {
keyspaceGroupManager *keyspace.GroupManager
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams
tsoAllocator *tso.AllocatorManager

// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
Expand All @@ -194,16 +200,18 @@ type Status struct {
}

// NewRaftCluster create a new cluster.
func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client) *RaftCluster {
func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client, tsoAllocator *tso.AllocatorManager) *RaftCluster {
return &RaftCluster{
serverCtx: ctx,
clusterID: clusterID,
member: member,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
BasicCluster: basicCluster,
storage: storage,
tsoAllocator: tsoAllocator,
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
Expand Down Expand Up @@ -314,11 +322,13 @@ func (c *RaftCluster) Start(s Server) error {
if err != nil {
return err
}
c.checkTSOService()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this func next to checkSchedulingService?

cluster, err := c.LoadClusterInfo()
if err != nil {
return err
}
if cluster == nil {
log.Warn("cluster is not bootstrapped")
return nil
}

Expand Down Expand Up @@ -351,7 +361,7 @@ func (c *RaftCluster) Start(s Server) error {
return err
}
}
c.checkServices()
c.checkSchedulingService()
c.wg.Add(9)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
Expand All @@ -370,7 +380,7 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

func (c *RaftCluster) checkServices() {
func (c *RaftCluster) checkSchedulingService() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
Expand All @@ -390,27 +400,90 @@ func (c *RaftCluster) checkServices() {
}
}

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isAPIServiceMode {
return
}
if c.member.IsLeader() {
if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
} else {
// leader exits, reset the allocator group
if err := c.stopTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}

failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}
}

func (c *RaftCluster) runServiceCheckJob() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(serviceCheckInterval)
schedulingTicker := time.NewTicker(schedulingServiceCheckInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Reset(time.Millisecond)
schedulingTicker.Reset(time.Millisecond)
})
defer ticker.Stop()
defer schedulingTicker.Stop()
tsoTicker := time.NewTicker(tsoServiceCheckInterval)
defer tsoTicker.Stop()

for {
select {
case <-c.ctx.Done():
log.Info("service check job is stopped")
return
case <-ticker.C:
c.checkServices()
case <-schedulingTicker.C:
c.checkSchedulingService()
case <-tsoTicker.C:
c.checkTSOService()
}
}
}

func (c *RaftCluster) startTSOJobs() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
return err
}
if !allocator.IsInitialize() {
log.Info("initializing the global TSO allocator")
if err := allocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return err
}
}
return nil
}

func (c *RaftCluster) stopTSOJobs() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
return err
}
if allocator.IsInitialize() {
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true)
}
return nil
}

// startGCTuner
func (c *RaftCluster) startGCTuner() {
defer logutil.LogPanic()
Expand Down Expand Up @@ -757,6 +830,9 @@ func (c *RaftCluster) Stop() {
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.stopSchedulingJobs()
}
if err := c.stopTSOJobs(); err != nil {
log.Error("failed to stop tso jobs", errs.ZapError(err))
}
c.heartbeatRunner.Stop()
c.miscRunner.Stop()
c.logRunner.Stop()
Expand Down
26 changes: 1 addition & 25 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package server
import (
"bytes"
"context"
errorspkg "errors"
"fmt"
"math/rand"
"net/http"
Expand Down Expand Up @@ -490,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error {

s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Client: s.client,
RootPath: s.rootPath,
Expand Down Expand Up @@ -1715,29 +1714,6 @@ func (s *Server) campaignLeader() {
s.member.KeepLeader(ctx)
log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name()))

if !s.IsAPIServiceMode() {
allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get the global TSO allocator", errs.ZapError(err))
return
}
log.Info("initializing the global TSO allocator")
if err := allocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return
}
defer func() {
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false)
failpoint.Inject("updateAfterResetTSO", func() {
if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}()
}
if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
return
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
return err == nil
})
// Resign leader to trigger the TSO resetting.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/updateAfterResetTSO", "return(true)"))
oldLeaderName := suite.cluster.WaitLeader()
re.NotEmpty(oldLeaderName)
err := suite.cluster.GetServer(oldLeaderName).ResignLeader()
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/updateAfterResetTSO"))
newLeaderName := suite.cluster.WaitLeader()
re.NotEmpty(newLeaderName)
re.NotEqual(oldLeaderName, newLeaderName)
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/tso/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() {
leaderName := suite.cluster.WaitLeader()
re.NotEmpty(leaderName)
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
suite.pdLeaderServer.BootstrapCluster()
backendEndpoints := suite.pdLeaderServer.GetAddr()
if suite.legacy {
suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints)
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (suite *tsoServerTestSuite) SetupSuite() {
leaderName := suite.cluster.WaitLeader()
re.NotEmpty(leaderName)
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
suite.pdLeaderServer.BootstrapCluster()
backendEndpoints := suite.pdLeaderServer.GetAddr()
if suite.legacy {
suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints)
Expand Down
6 changes: 3 additions & 3 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ func TestLoadClusterInfo(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
Expand Down Expand Up @@ -952,7 +952,7 @@ func TestLoadClusterInfo(t *testing.T) {
}
re.NoError(testStorage.Flush())

raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
raftCluster, err = raftCluster.LoadClusterInfo()
re.NoError(err)
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func TestTransferLeaderBack(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
Expand Down
3 changes: 3 additions & 0 deletions tests/server/tso/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) {
re.NoError(cluster.RunInitialServers())

cluster.WaitAllLeaders(re, dcLocationConfig)
leaderServer := cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()

// Wait for all nodes becoming healthy.
time.Sleep(time.Second * 5)
Expand Down
3 changes: 3 additions & 0 deletions tests/server/tso/global_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestDelaySyncTimestamp(t *testing.T) {
var leaderServer, nextLeaderServer *tests.TestServer
leaderServer = cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
for _, s := range cluster.GetServers() {
if s.GetConfig().Name != cluster.GetLeader() {
nextLeaderServer = s
Expand Down Expand Up @@ -146,6 +147,8 @@ func TestLogicalOverflow(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())

leaderServer := cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()

Expand Down
4 changes: 3 additions & 1 deletion tests/server/tso/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) {
re.NoError(cluster.RunInitialServers())

cluster.WaitAllLeaders(re, dcLocationConfig)
leaderServer := cluster.GetLeaderServer()
leaderServer.BootstrapCluster()
requestLocalTSOs(re, cluster, dcLocationConfig)

// Reboot the cluster.
Expand All @@ -125,7 +127,7 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())

// Re-request the global TSOs.
leaderServer := cluster.GetLeaderServer()
leaderServer = cluster.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
req := &pdpb.TsoRequest{
Expand Down
Loading