From b8b17a47b1a3bb24a8c42cdc5b84a2f015c60eb5 Mon Sep 17 00:00:00 2001 From: xhe Date: Mon, 10 Oct 2022 18:06:42 +0800 Subject: [PATCH 1/2] *: support clustering Signed-off-by: xhe --- cmd/tiproxy/main.go | 18 +++++++++++++-- lib/config/proxy.go | 10 ++++++++ pkg/server/server.go | 55 ++++++++++++++++++++++++++++++++++++++------ 3 files changed, 74 insertions(+), 9 deletions(-) diff --git a/cmd/tiproxy/main.go b/cmd/tiproxy/main.go index c407aa8a..1900516e 100644 --- a/cmd/tiproxy/main.go +++ b/cmd/tiproxy/main.go @@ -33,9 +33,14 @@ func main() { } configFile := rootCmd.PersistentFlags().String("config", "conf/proxy.yaml", "proxy config file path") - pubAddr := rootCmd.PersistentFlags().String("pub_addr", "127.0.0.1", "IP or domain, will be used as the accessible addr for other clients") logEncoder := rootCmd.PersistentFlags().String("log_encoder", "", "log in format of tidb, console, or json") logLevel := rootCmd.PersistentFlags().String("log_level", "", "log level") + clusterName := rootCmd.PersistentFlags().String("cluster_name", "tiproxy", "default cluster name, used to generate node name and differential clusters in dns discovery") + nodeName := rootCmd.PersistentFlags().String("node_name", "", "by default, it is generate prefixed by cluster-name") + pubAddr := rootCmd.PersistentFlags().String("pub_addr", "127.0.0.1", "IP or domain, will be used as the accessible addr for others") + bootstrapClusters := rootCmd.PersistentFlags().StringSlice("bootstrap_clusters", []string{}, "lists of other nodes in the cluster, e.g. 'n1=xxx,n2=xxx', where xx are IPs or domains") + bootstrapDiscoveryUrl := rootCmd.PersistentFlags().String("bootstrap_discovery_etcd", "", "etcd discovery service url") + bootstrapDiscoveryDNS := rootCmd.PersistentFlags().String("bootstrap_discovery_dns", "", "dns srv discovery") rootCmd.RunE = func(cmd *cobra.Command, _ []string) error { proxyConfigData, err := os.ReadFile(*configFile) @@ -55,7 +60,16 @@ func main() { cfg.Log.Level = *logLevel } - srv, err := server.NewServer(cmd.Context(), cfg, *pubAddr) + cfg.Cluster = config.Cluster{ + PubAddr: *pubAddr, + ClusterName: *clusterName, + NodeName: *nodeName, + BootstrapDurl: *bootstrapDiscoveryUrl, + BootstrapDdns: *bootstrapDiscoveryDNS, + BootstrapClusters: *bootstrapClusters, + } + + srv, err := server.NewServer(cmd.Context(), cfg) if err != nil { return errors.Wrapf(err, "fail to create server") } diff --git a/lib/config/proxy.go b/lib/config/proxy.go index 426439b7..c576dfc2 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -35,6 +35,16 @@ type Config struct { Security Security `yaml:"security,omitempty" toml:"security,omitempty" json:"security,omitempty"` Metrics Metrics `yaml:"metrics,omitempty" toml:"metrics,omitempty" json:"metrics,omitempty"` Log Log `yaml:"log,omitempty" toml:"log,omitempty" json:"log,omitempty"` + Cluster Cluster `yaml:"-" toml:"-" json:"-"` +} + +type Cluster struct { + PubAddr string + ClusterName string + NodeName string + BootstrapDurl string + BootstrapDdns string + BootstrapClusters []string } type Metrics struct { diff --git a/pkg/server/server.go b/pkg/server/server.go index 7af53307..e9f3320e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -21,6 +21,7 @@ import ( "net/url" "path/filepath" "strconv" + "strings" "time" ginzap "github.com/gin-contrib/zap" @@ -57,7 +58,7 @@ type Server struct { Proxy *proxy.SQLServer } -func NewServer(ctx context.Context, cfg *config.Config, pubAddr string) (srv *Server, err error) { +func NewServer(ctx context.Context, cfg *config.Config) (srv *Server, err error) { srv = &Server{ ConfigManager: mgrcfg.NewConfigManager(), MetricsManager: metrics.NewMetricsManager(), @@ -115,7 +116,7 @@ func NewServer(ctx context.Context, cfg *config.Config, pubAddr string) (srv *Se // 2. pass down '*Server' struct such that the underlying relies on the pointer only. But it does not work well for golang. To avoid cyclic imports between 'api' and `server` packages, two packages needs to be merged. That is basically what happened to TiDB '*Session'. api.Register(engine.Group("/api"), cfg.API, lg.Named("api"), srv.NamespaceManager, srv.ConfigManager) - srv.Etcd, err = buildEtcd(cfg, lg.Named("etcd"), pubAddr, engine) + srv.Etcd, err = buildEtcd(cfg, lg.Named("etcd"), engine) if err != nil { err = errors.WithStack(err) return @@ -251,7 +252,28 @@ func (s *Server) Close() error { return errors.Collect(ErrCloseServer, errs...) } -func buildEtcd(cfg *config.Config, logger *zap.Logger, pubAddr string, engine *gin.Engine) (srv *embed.Etcd, err error) { +func buildEtcd(cfg *config.Config, logger *zap.Logger, engine *gin.Engine) (srv *embed.Etcd, err error) { + if cfg.Cluster.ClusterName == "" { + return nil, errors.New("cluster_name can not be empty") + } + if cfg.Cluster.NodeName == "" { + cfg.Cluster.NodeName = fmt.Sprintf("%s-%d", cfg.Cluster.ClusterName, time.Now().UnixMicro()) + } + + cnt := 0 + if len(cfg.Cluster.BootstrapClusters) != 0 { + cnt++ + } + if cfg.Cluster.BootstrapDurl != "" { + cnt++ + } + if cfg.Cluster.BootstrapDdns != "" { + cnt++ + } + if cnt > 1 { + return nil, errors.New("you can only pass one 'bootstrap_xxx' to bootstrap the node, or leave them empty to start a single-node cluster") + } + etcd_cfg := embed.NewConfig() if etcd_cfg.ClientTLSInfo, etcd_cfg.PeerTLSInfo, err = security.BuildEtcdTLSConfig(logger, cfg.Security.ServerTLS, cfg.Security.PeerTLS); err != nil { @@ -269,7 +291,7 @@ func buildEtcd(cfg *config.Config, logger *zap.Logger, pubAddr string, engine *g } etcd_cfg.LCUrls = []url.URL{*apiAddr} apiAddrAdvertise := *apiAddr - apiAddrAdvertise.Host = fmt.Sprintf("%s:%s", pubAddr, apiAddrAdvertise.Port()) + apiAddrAdvertise.Host = fmt.Sprintf("%s:%s", cfg.Cluster.PubAddr, apiAddrAdvertise.Port()) etcd_cfg.ACUrls = []url.URL{apiAddrAdvertise} peerPort := cfg.Advance.PeerPort @@ -289,11 +311,30 @@ func buildEtcd(cfg *config.Config, logger *zap.Logger, pubAddr string, engine *g peerAddr.Host = fmt.Sprintf("%s:%s", peerAddr.Hostname(), peerPort) etcd_cfg.LPUrls = []url.URL{peerAddr} peerAddrAdvertise := peerAddr - peerAddrAdvertise.Host = fmt.Sprintf("%s:%s", pubAddr, peerPort) + peerAddrAdvertise.Host = fmt.Sprintf("%s:%s", cfg.Cluster.PubAddr, peerPort) etcd_cfg.APUrls = []url.URL{peerAddrAdvertise} - etcd_cfg.Name = "proxy-" + fmt.Sprint(time.Now().UnixMicro()) - etcd_cfg.InitialCluster = etcd_cfg.InitialClusterFromName(etcd_cfg.Name) + etcd_cfg.Name = cfg.Cluster.NodeName + if cnt == 0 { + etcd_cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Cluster.NodeName, peerAddrAdvertise.String()) + etcd_cfg.InitialClusterToken = cfg.Cluster.ClusterName + } else if len(cfg.Cluster.BootstrapClusters) > 0 { + for i := range cfg.Cluster.BootstrapClusters { + if etcd_cfg.PeerTLSInfo.Empty() { + cfg.Cluster.BootstrapClusters[i] = strings.Replace(cfg.Cluster.BootstrapClusters[i], "=", "=http://", 1) + } else { + cfg.Cluster.BootstrapClusters[i] = strings.Replace(cfg.Cluster.BootstrapClusters[i], "=", "=https://", 1) + } + } + etcd_cfg.InitialCluster = strings.Join(append(cfg.Cluster.BootstrapClusters, fmt.Sprintf("%s=%s", cfg.Cluster.NodeName, peerAddrAdvertise.String())), ",") + etcd_cfg.InitialClusterToken = cfg.Cluster.ClusterName + } else if cfg.Cluster.BootstrapDurl != "" { + etcd_cfg.Durl = cfg.Cluster.BootstrapDurl + } else if cfg.Cluster.BootstrapDdns != "" { + etcd_cfg.DNSCluster = cfg.Cluster.BootstrapDdns + etcd_cfg.DNSClusterServiceName = cfg.Cluster.ClusterName + } + etcd_cfg.Dir = filepath.Join(cfg.Workdir, "etcd") etcd_cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger) From f93e7525b84fb1bddcc602c733a66705cbcf51b1 Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 11 Oct 2022 20:33:15 +0800 Subject: [PATCH 2/2] *: use hostname to generate Signed-off-by: xhe --- pkg/server/server.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index e9f3320e..8821b614 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -19,10 +19,10 @@ import ( "fmt" "net/http" "net/url" + "os" "path/filepath" "strconv" "strings" - "time" ginzap "github.com/gin-contrib/zap" "github.com/gin-gonic/gin" @@ -257,7 +257,11 @@ func buildEtcd(cfg *config.Config, logger *zap.Logger, engine *gin.Engine) (srv return nil, errors.New("cluster_name can not be empty") } if cfg.Cluster.NodeName == "" { - cfg.Cluster.NodeName = fmt.Sprintf("%s-%d", cfg.Cluster.ClusterName, time.Now().UnixMicro()) + hname, err := os.Hostname() + if err != nil { + return nil, errors.WithStack(err) + } + cfg.Cluster.NodeName = fmt.Sprintf("%s-%s", cfg.Cluster.ClusterName, hname) } cnt := 0