Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add service subcommand #5971

Merged
merged 6 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 105 additions & 32 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/autoscaling"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand All @@ -41,43 +42,83 @@ import (
)

func main() {
ctx, cancel, svr := createServerWrapper(os.Args[1:])

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()
rootCmd := &cobra.Command{
Use: "pd-server",
Short: "Placement Driver server",
Run: createServerWrapper,
}

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
rootCmd.Flags().BoolP("version", "V", false, "print version information and exit")
rootCmd.Flags().StringP("config", "", "", "config file")
rootCmd.Flags().BoolP("config-check", "", false, "check config file validity and exit")
rootCmd.Flags().StringP("name", "", "", "human-readable name for this pd member")
rootCmd.Flags().StringP("data-dir", "", "", "path to the data directory (default 'default.${name}')")
rootCmd.Flags().StringP("client-urls", "", "http://127.0.0.1:2379", "url for client traffic")
rootCmd.Flags().StringP("advertise-client-urls", "", "", "advertise url for client traffic (default '${client-urls}')")
rootCmd.Flags().StringP("peer-urls", "", "http://127.0.0.1:2379", "url for peer traffic")
rootCmd.Flags().StringP("advertise-peer-urls", "", "", "advertise url for peer traffic (default '${peer-urls}')")
rootCmd.Flags().StringP("initial-cluster", "", "", "initial cluster configuration for bootstrapping, e,g. pd=http://127.0.0.1:2380")
rootCmd.Flags().StringP("join", "", "", "join to an existing cluster (usage: cluster's '${advertise-client-urls}'")
rootCmd.Flags().StringP("metrics-addr", "", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
rootCmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
rootCmd.Flags().StringP("log-file", "", "", "log file path")
rootCmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
rootCmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
rootCmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
rootCmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster")
rootCmd.AddCommand(NewServiceCommand())

rootCmd.SetOutput(os.Stdout)
if err := rootCmd.Execute(); err != nil {
rootCmd.Println(err)
os.Exit(1)
}
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))
// NewServiceCommand returns the service command.
func NewServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "service <tso>",
Short: "Run a service",
}
cmd.AddCommand(NewTSOServiceCommand())
return cmd
}

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
// NewTSOServiceCommand returns the unsafe remove failed stores command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tso",
Short: "Run the tso service",
Run: tso.CreateServerWrapper,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is why we removed the return data types and have to duplicate server run() and exit code. LGTM

}
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "http://127.0.0.1:2379", "url for etcd client")
cmd.Flags().StringP("listen-addr", "", "", "listen address for tso service")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
return cmd
}

func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs.Server) {
func createServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cfg := config.NewConfig()
err := cfg.Parse(args)
flagSet := cmd.Flags()
flagSet.Parse(args)
err := cfg.Parse(flagSet)
if err != nil {
cmd.Println(err)
return
}

if cfg.Version {
printVersion, err := flagSet.GetBool("version")
if err != nil {
cmd.Println(err)
return
}
if printVersion {
server.PrintPDInfo()
exit(0)
}
Expand All @@ -92,15 +133,21 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
configCheck, err := flagSet.GetBool("config-check")
if err != nil {
cmd.Println(err)
return
}

if configCheck {
server.PrintConfigCheckMsg(cfg)
exit(0)
}

// New zap logger
err = cfg.SetupLogger()
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err == nil {
log.ReplaceGlobals(cfg.GetZapLogger(), cfg.GetZapLogProperties())
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
} else {
log.Fatal("initialize logger error", errs.ZapError(err))
}
Expand Down Expand Up @@ -132,7 +179,33 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("create server failed", errs.ZapError(err))
}

return ctx, cancel, svr
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}

func exit(code int) {
Expand Down
75 changes: 58 additions & 17 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ import (
"flag"
"net/http"
"os"
"os/signal"
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// Server is the TSO server, and it implements bs.Server.
Expand Down Expand Up @@ -69,12 +72,24 @@ func (s *Server) GetHTTPClient() *http.Client {
}

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, bs.Server) {
func CreateServerWrapper(cmd *cobra.Command, args []string) {
cmd.Flags().Parse(args)
cfg := tso.NewConfig()
err := cfg.Parse(os.Args[1:])
flagSet := cmd.Flags()
err := cfg.Parse(flagSet)
if err != nil {
cmd.Println(err)
return
}

if cfg.Version {
printVersionInfo()
printVersion, err := flagSet.GetBool("version")
if err != nil {
cmd.Println(err)
return
}
if printVersion {
// TODO: support printing TSO server info
// server.PrintTSOInfo()
exit(0)
}

Expand All @@ -88,29 +103,55 @@ func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
printConfigCheckMsg(cfg)
exit(0)
// New zap logger
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err == nil {
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
} else {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()

// TODO: Initialize logger
// TODO: support printing TSO server info
// LogTSOInfo()

// TODO: Make it configurable if it has big impact on performance.
grpcprometheus.EnableHandlingTimeHistogram()

metricutil.Push(&cfg.Metric)

// TODO: Create the server
ctx, cancel := context.WithCancel(context.Background())
svr := &Server{}

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

return nil, nil, nil
}

// TODO: implement it
func printVersionInfo() {
}
<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

// TODO: implement it
func printConfigCheckMsg(cfg *tso.Config) {
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}

func exit(code int) {
Expand Down
Loading