Skip to content

Commit

Permalink
Init TSO service and start servicce loop
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Feb 17, 2023
1 parent d203d79 commit 6804a62
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 63 deletions.
1 change: 1 addition & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func NewTSOServiceCommand() *cobra.Command {
}
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().BoolP("config-check", "", false, "check config file validity and exit")
cmd.Flags().StringP("backend-endpoints", "", "http://127.0.0.1:2379", "url for etcd client")
cmd.Flags().StringP("listen-addr", "", "", "listen address for tso service")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ require (
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/stretchr/objx v0.5.0 // indirect
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
Expand Down
10 changes: 4 additions & 6 deletions pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"net/http"

"github.com/tikv/pd/pkg/member"
"go.etcd.io/etcd/clientv3"
)

Expand All @@ -38,9 +37,8 @@ type Server interface {
GetHTTPClient() *http.Client
// AddStartCallback adds a callback in the startServer phase.
AddStartCallback(callbacks ...func())
// TODO: replace these two methods with `primary` function without etcd server dependency.
// GetMember returns the member information.
GetMember() *member.Member
// AddLeaderCallback adds a callback in the leader campaign phase.
AddLeaderCallback(callbacks ...func(context.Context))
// IsServing returns whether the server is the leader if there is embedded etcd, or the primary otherwise.
IsServing() bool
// AddServiceReadyCallback adds a callback when the server becomes the leader if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
}
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *Service) GetManager() *Manager {
}

func (s *Service) checkLeader() error {
if !s.manager.member.IsLeader() {
if !s.manager.srv.IsServing() {
return errNotLeader
}
return nil
Expand Down
10 changes: 4 additions & 6 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"go.uber.org/zap"
Expand All @@ -38,7 +37,7 @@ const defaultConsumptionChanSize = 1024
// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
member *member.Member
srv bs.Server
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
// consumptionChan is used to send the consumption
Expand All @@ -52,7 +51,6 @@ type Manager struct {
// NewManager returns a new Manager.
func NewManager(srv bs.Server) *Manager {
m := &Manager{
member: &member.Member{},
groups: make(map[string]*ResourceGroup),
consumptionDispatcher: make(chan struct {
resourceGroupName string
Expand All @@ -66,10 +64,10 @@ func NewManager(srv bs.Server) *Manager {
kv.NewEtcdKVBase(srv.GetClient(), "resource_group"),
nil,
)
m.member = srv.GetMember()
m.srv = srv
})
// The second initialization after the leader is elected.
srv.AddLeaderCallback(m.Init)
// The second initialization after becoming primary.
srv.AddServiceReadyCallback(m.Init)
return m
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ const (

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
)

Expand Down
Loading

0 comments on commit 6804a62

Please sign in to comment.