Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support clustering #112

Merged
merged 2 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions cmd/tiproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ func main() {
}

configFile := rootCmd.PersistentFlags().String("config", "conf/proxy.yaml", "proxy config file path")
pubAddr := rootCmd.PersistentFlags().String("pub_addr", "127.0.0.1", "IP or domain, will be used as the accessible addr for other clients")
logEncoder := rootCmd.PersistentFlags().String("log_encoder", "", "log in format of tidb, console, or json")
logLevel := rootCmd.PersistentFlags().String("log_level", "", "log level")
clusterName := rootCmd.PersistentFlags().String("cluster_name", "tiproxy", "default cluster name, used to generate node name and differential clusters in dns discovery")
nodeName := rootCmd.PersistentFlags().String("node_name", "", "by default, it is generate prefixed by cluster-name")
pubAddr := rootCmd.PersistentFlags().String("pub_addr", "127.0.0.1", "IP or domain, will be used as the accessible addr for others")
bootstrapClusters := rootCmd.PersistentFlags().StringSlice("bootstrap_clusters", []string{}, "lists of other nodes in the cluster, e.g. 'n1=xxx,n2=xxx', where xx are IPs or domains")
bootstrapDiscoveryUrl := rootCmd.PersistentFlags().String("bootstrap_discovery_etcd", "", "etcd discovery service url")
bootstrapDiscoveryDNS := rootCmd.PersistentFlags().String("bootstrap_discovery_dns", "", "dns srv discovery")

rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
proxyConfigData, err := os.ReadFile(*configFile)
Expand All @@ -55,7 +60,16 @@ func main() {
cfg.Log.Level = *logLevel
}

srv, err := server.NewServer(cmd.Context(), cfg, *pubAddr)
cfg.Cluster = config.Cluster{
PubAddr: *pubAddr,
ClusterName: *clusterName,
NodeName: *nodeName,
BootstrapDurl: *bootstrapDiscoveryUrl,
BootstrapDdns: *bootstrapDiscoveryDNS,
BootstrapClusters: *bootstrapClusters,
}

srv, err := server.NewServer(cmd.Context(), cfg)
if err != nil {
return errors.Wrapf(err, "fail to create server")
}
Expand Down
10 changes: 10 additions & 0 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ type Config struct {
Security Security `yaml:"security,omitempty" toml:"security,omitempty" json:"security,omitempty"`
Metrics Metrics `yaml:"metrics,omitempty" toml:"metrics,omitempty" json:"metrics,omitempty"`
Log Log `yaml:"log,omitempty" toml:"log,omitempty" json:"log,omitempty"`
Cluster Cluster `yaml:"-" toml:"-" json:"-"`
}

type Cluster struct {
PubAddr string
ClusterName string
NodeName string
BootstrapDurl string
BootstrapDdns string
BootstrapClusters []string
}

type Metrics struct {
Expand Down
55 changes: 48 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"path/filepath"
"strconv"
"strings"
"time"

ginzap "github.com/gin-contrib/zap"
Expand Down Expand Up @@ -57,7 +58,7 @@ type Server struct {
Proxy *proxy.SQLServer
}

func NewServer(ctx context.Context, cfg *config.Config, pubAddr string) (srv *Server, err error) {
func NewServer(ctx context.Context, cfg *config.Config) (srv *Server, err error) {
srv = &Server{
ConfigManager: mgrcfg.NewConfigManager(),
MetricsManager: metrics.NewMetricsManager(),
Expand Down Expand Up @@ -115,7 +116,7 @@ func NewServer(ctx context.Context, cfg *config.Config, pubAddr string) (srv *Se
// 2. pass down '*Server' struct such that the underlying relies on the pointer only. But it does not work well for golang. To avoid cyclic imports between 'api' and `server` packages, two packages needs to be merged. That is basically what happened to TiDB '*Session'.
api.Register(engine.Group("/api"), cfg.API, lg.Named("api"), srv.NamespaceManager, srv.ConfigManager)

srv.Etcd, err = buildEtcd(cfg, lg.Named("etcd"), pubAddr, engine)
srv.Etcd, err = buildEtcd(cfg, lg.Named("etcd"), engine)
if err != nil {
err = errors.WithStack(err)
return
Expand Down Expand Up @@ -251,7 +252,28 @@ func (s *Server) Close() error {
return errors.Collect(ErrCloseServer, errs...)
}

func buildEtcd(cfg *config.Config, logger *zap.Logger, pubAddr string, engine *gin.Engine) (srv *embed.Etcd, err error) {
func buildEtcd(cfg *config.Config, logger *zap.Logger, engine *gin.Engine) (srv *embed.Etcd, err error) {
if cfg.Cluster.ClusterName == "" {
return nil, errors.New("cluster_name can not be empty")
}
if cfg.Cluster.NodeName == "" {
cfg.Cluster.NodeName = fmt.Sprintf("%s-%d", cfg.Cluster.ClusterName, time.Now().UnixMicro())
xhebox marked this conversation as resolved.
Show resolved Hide resolved
}

cnt := 0
if len(cfg.Cluster.BootstrapClusters) != 0 {
cnt++
}
if cfg.Cluster.BootstrapDurl != "" {
cnt++
}
if cfg.Cluster.BootstrapDdns != "" {
cnt++
}
if cnt > 1 {
return nil, errors.New("you can only pass one 'bootstrap_xxx' to bootstrap the node, or leave them empty to start a single-node cluster")
}
Comment on lines +267 to +279
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make it so complicated, are other components also like this?

Copy link
Collaborator Author

@xhebox xhebox Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common for etcd and other cloud native applications. Other pingcap components relies on the central PD(they only need PD).

PD only provide bootstrapClusters options, and a sugar option of bootstrapCluster by -join xxx:2380 if you don't want to write all clusters by hands. I have no plan to include that sugar option in this PR.

Honestly, discovery or dns is more convenient.

To use --initial-clusters, you need to specify all nodes. To use -join, the cluster must be bootstraped already. Whatever you need to pass different options fot different PD and probably follow some orders(-join).

Operator has implemented some very complex code. But I hope I can start all tiproxy nodes by some etcd discovery like ./bin/tiproxy --bootstrap-discovery-etcd xxxx.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my option, there's even no need to merge this PR, right? Proxies don't need to know each other, except for updating configurations. But updating configurations can be achieved by configuring the etcd address.

Copy link
Collaborator Author

@xhebox xhebox Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have no etcd address. Tiproxy itself is a etcd server.

Unless you want a 'master tiproxy node' explicitly, again you need you need to first bootstrap a central single-node cluster. Other nodes need to -central-etcd xxx.

Or if you want to propagate global configurations to every node manually by tiproxyctl. For example, tls certs, namespace/routing/filter rules.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if you want to remove etcd from tiproxy, and relies on an external ETCD. I think it makes things complicated. If you have PD, you don't want another etcd. If you use tiproxy with static servers, you need another etcd.


etcd_cfg := embed.NewConfig()

if etcd_cfg.ClientTLSInfo, etcd_cfg.PeerTLSInfo, err = security.BuildEtcdTLSConfig(logger, cfg.Security.ServerTLS, cfg.Security.PeerTLS); err != nil {
Expand All @@ -269,7 +291,7 @@ func buildEtcd(cfg *config.Config, logger *zap.Logger, pubAddr string, engine *g
}
etcd_cfg.LCUrls = []url.URL{*apiAddr}
apiAddrAdvertise := *apiAddr
apiAddrAdvertise.Host = fmt.Sprintf("%s:%s", pubAddr, apiAddrAdvertise.Port())
apiAddrAdvertise.Host = fmt.Sprintf("%s:%s", cfg.Cluster.PubAddr, apiAddrAdvertise.Port())
etcd_cfg.ACUrls = []url.URL{apiAddrAdvertise}

peerPort := cfg.Advance.PeerPort
Expand All @@ -289,11 +311,30 @@ func buildEtcd(cfg *config.Config, logger *zap.Logger, pubAddr string, engine *g
peerAddr.Host = fmt.Sprintf("%s:%s", peerAddr.Hostname(), peerPort)
etcd_cfg.LPUrls = []url.URL{peerAddr}
peerAddrAdvertise := peerAddr
peerAddrAdvertise.Host = fmt.Sprintf("%s:%s", pubAddr, peerPort)
peerAddrAdvertise.Host = fmt.Sprintf("%s:%s", cfg.Cluster.PubAddr, peerPort)
etcd_cfg.APUrls = []url.URL{peerAddrAdvertise}

etcd_cfg.Name = "proxy-" + fmt.Sprint(time.Now().UnixMicro())
etcd_cfg.InitialCluster = etcd_cfg.InitialClusterFromName(etcd_cfg.Name)
etcd_cfg.Name = cfg.Cluster.NodeName
if cnt == 0 {
etcd_cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Cluster.NodeName, peerAddrAdvertise.String())
etcd_cfg.InitialClusterToken = cfg.Cluster.ClusterName
} else if len(cfg.Cluster.BootstrapClusters) > 0 {
for i := range cfg.Cluster.BootstrapClusters {
if etcd_cfg.PeerTLSInfo.Empty() {
cfg.Cluster.BootstrapClusters[i] = strings.Replace(cfg.Cluster.BootstrapClusters[i], "=", "=http://", 1)
} else {
cfg.Cluster.BootstrapClusters[i] = strings.Replace(cfg.Cluster.BootstrapClusters[i], "=", "=https://", 1)
}
}
etcd_cfg.InitialCluster = strings.Join(append(cfg.Cluster.BootstrapClusters, fmt.Sprintf("%s=%s", cfg.Cluster.NodeName, peerAddrAdvertise.String())), ",")
etcd_cfg.InitialClusterToken = cfg.Cluster.ClusterName
} else if cfg.Cluster.BootstrapDurl != "" {
etcd_cfg.Durl = cfg.Cluster.BootstrapDurl
} else if cfg.Cluster.BootstrapDdns != "" {
etcd_cfg.DNSCluster = cfg.Cluster.BootstrapDdns
etcd_cfg.DNSClusterServiceName = cfg.Cluster.ClusterName
}

etcd_cfg.Dir = filepath.Join(cfg.Workdir, "etcd")
etcd_cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger)

Expand Down