From 74fc89ae07c13622b1e1ae65d820cf9c0ac6a2f2 Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 22 Nov 2023 17:25:41 +0000 Subject: [PATCH] Run separate read-only port --- balancer/pkg/balancer.go | 3 ++- cmd/worldmock/worldmock.go | 5 +++-- coordinator/provider/coordinator.go | 3 ++- pkg/config/router.go | 1 + router/app/app.go | 13 ++++++++----- router/client/client.go | 12 ++++++++++-- router/client/client_test.go | 5 +++-- router/instance.go | 15 ++++++++------- router/port/port.go | 11 +++++++++++ router/rulerouter/rulerouter.go | 11 ++++++----- 10 files changed, 54 insertions(+), 25 deletions(-) create mode 100644 router/port/port.go diff --git a/balancer/pkg/balancer.go b/balancer/pkg/balancer.go index cb15d7ce5..834c3c4db 100644 --- a/balancer/pkg/balancer.go +++ b/balancer/pkg/balancer.go @@ -14,6 +14,7 @@ import ( "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/router/client" + "github.com/pg-sharding/spqr/router/port" ) // TODO use only one place to store strings @@ -868,7 +869,7 @@ func (b *Balancer) RunAdm(ctx context.Context, listener net.Listener, tlsCfg *tl } func (b *Balancer) servAdm(ctx context.Context, conn net.Conn, tlsconfig *tls.Config) error { - cl := client.NewPsqlClient(conn) + cl := client.NewPsqlClient(conn, port.DefaultRouterPortType) if err := cl.Init(tlsconfig); err != nil { return err diff --git a/cmd/worldmock/worldmock.go b/cmd/worldmock/worldmock.go index 0f9b74a54..5c4ff0158 100644 --- a/cmd/worldmock/worldmock.go +++ b/cmd/worldmock/worldmock.go @@ -12,6 +12,7 @@ import ( "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/pkg/txstatus" "github.com/pg-sharding/spqr/router/client" + "github.com/pg-sharding/spqr/router/port" "github.com/pg-sharding/spqr/router/route" ) @@ -67,7 +68,7 @@ func (w *WorldMock) Run() error { } func (w *WorldMock) serv(netconn net.Conn) error { - cl := client.NewPsqlClient(netconn) + cl := client.NewPsqlClient(netconn, port.DefaultRouterPortType) err := cl.Init(nil) @@ -94,7 +95,7 @@ func (w *WorldMock) serv(netconn net.Conn) error { return err } spqrlog.Zero.Info().Msg("client auth OK") - + for { msg, err := cl.Receive() if err != nil { diff --git a/coordinator/provider/coordinator.go b/coordinator/provider/coordinator.go index 664cd67a1..3c4b22b5c 100644 --- a/coordinator/provider/coordinator.go +++ b/coordinator/provider/coordinator.go @@ -34,6 +34,7 @@ import ( "github.com/pg-sharding/spqr/qdb" router "github.com/pg-sharding/spqr/router" psqlclient "github.com/pg-sharding/spqr/router/client" + "github.com/pg-sharding/spqr/router/port" "github.com/pg-sharding/spqr/router/route" spqrparser "github.com/pg-sharding/spqr/yacc/console" ) @@ -1005,7 +1006,7 @@ func (qc *qdbCoordinator) UnregisterRouter(ctx context.Context, rID string) erro } func (qc *qdbCoordinator) PrepareClient(nconn net.Conn) (CoordinatorClient, error) { - cl := psqlclient.NewPsqlClient(nconn) + cl := psqlclient.NewPsqlClient(nconn, port.DefaultRouterPortType) if err := cl.Init(nil); err != nil { return nil, err diff --git a/pkg/config/router.go b/pkg/config/router.go index 4cc7399a8..552d16020 100644 --- a/pkg/config/router.go +++ b/pkg/config/router.go @@ -45,6 +45,7 @@ type Router struct { Host string `json:"host" toml:"host" yaml:"host"` RouterPort string `json:"router_port" toml:"router_port" yaml:"router_port"` + RouterROPort string `json:"router_ro_port" toml:"router_ro_port" yaml:"router_ro_port"` AdminConsolePort string `json:"admin_console_port" toml:"admin_console_port" yaml:"admin_console_port"` GrpcApiPort string `json:"grpc_api_port" toml:"grpc_api_port" yaml:"grpc_api_port"` diff --git a/router/app/app.go b/router/app/app.go index d9e389bef..cc60d9a18 100644 --- a/router/app/app.go +++ b/router/app/app.go @@ -9,6 +9,7 @@ import ( "github.com/pg-sharding/spqr/pkg/spqrlog" router "github.com/pg-sharding/spqr/router" rgrpc "github.com/pg-sharding/spqr/router/grpc" + "github.com/pg-sharding/spqr/router/port" "google.golang.org/grpc" ) @@ -25,14 +26,16 @@ func NewApp(sg *router.InstanceImpl) *App { func (app *App) ServeRouter(ctx context.Context) error { var lwg sync.WaitGroup - listen := map[string]struct{}{ - net.JoinHostPort("localhost", app.spqr.RuleRouter.Config().RouterPort): {}, - net.JoinHostPort(app.spqr.RuleRouter.Config().Host, app.spqr.RuleRouter.Config().RouterPort): {}, + listen := map[string]port.RouterPortType{ + net.JoinHostPort("localhost", app.spqr.RuleRouter.Config().RouterPort): port.DefaultRouterPortType, + net.JoinHostPort(app.spqr.RuleRouter.Config().Host, app.spqr.RuleRouter.Config().RouterPort): port.DefaultRouterPortType, + net.JoinHostPort("localhost", app.spqr.RuleRouter.Config().RouterROPort): port.RORouterPortType, + net.JoinHostPort(app.spqr.RuleRouter.Config().Host, app.spqr.RuleRouter.Config().RouterROPort): port.RORouterPortType, } lwg.Add(len(listen)) - for addr := range listen { + for addr, portType := range listen { go func(address string) { defer lwg.Done() var listener net.Listener @@ -58,7 +61,7 @@ func (app *App) ServeRouter(ctx context.Context) error { spqrlog.Zero.Info(). Str("address", address). Msg("SPQR Router is ready by postgresql proto") - _ = app.spqr.Run(ctx, listener) + _ = app.spqr.Run(ctx, listener, portType) }(addr) } lwg.Wait() diff --git a/router/client/client.go b/router/client/client.go index c877206e3..6f9371302 100644 --- a/router/client/client.go +++ b/router/client/client.go @@ -18,6 +18,7 @@ import ( "github.com/pg-sharding/spqr/pkg/shard" "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/pkg/txstatus" + "github.com/pg-sharding/spqr/router/port" "github.com/pg-sharding/spqr/router/route" "github.com/pg-sharding/spqr/router/server" ) @@ -105,14 +106,21 @@ type PsqlClient struct { server server.Server } -func NewPsqlClient(pgconn conn.RawConn) *PsqlClient { +func NewPsqlClient(pgconn conn.RawConn, pt port.RouterPortType) *PsqlClient { + tsa := config.TargetSessionAttrsRW + + // enforce default port behaviour + if pt == port.RORouterPortType { + tsa = config.TargetSessionAttrsRO + } cl := &PsqlClient{ activeParamSet: make(map[string]string), conn: pgconn, startupMsg: &pgproto3.StartupMessage{}, prepStmts: map[string]string{}, - tsa: config.TargetSessionAttrsRW, + tsa: tsa, } + cl.id = fmt.Sprintf("%p", cl) return cl diff --git a/router/client/client_test.go b/router/client/client_test.go index 2a2808ea4..182d78da7 100644 --- a/router/client/client_test.go +++ b/router/client/client_test.go @@ -7,6 +7,7 @@ import ( "github.com/jackc/pgx/v5/pgproto3" "github.com/pg-sharding/spqr/router/client" + "github.com/pg-sharding/spqr/router/port" "go.uber.org/mock/gomock" "github.com/pg-sharding/spqr/pkg/conn" @@ -40,7 +41,7 @@ func TestCancel(t *testing.T) { return 12, nil }).Times(1) - client := client.NewPsqlClient(rconn) + client := client.NewPsqlClient(rconn, port.DefaultRouterPortType) err := client.Init(nil) assert.Equal(uint32(7), client.CancelMsg().ProcessID) @@ -84,7 +85,7 @@ func TestNoGSSAPI(t *testing.T) { return 4, nil }).Times(1) - client := client.NewPsqlClient(rconn) + client := client.NewPsqlClient(rconn, port.DefaultRouterPortType) err := client.Init(nil) assert.Equal(exprErr, err) diff --git a/router/instance.go b/router/instance.go index 14c15eba6..534a8022b 100644 --- a/router/instance.go +++ b/router/instance.go @@ -17,6 +17,7 @@ import ( "github.com/pg-sharding/spqr/router/client" "github.com/pg-sharding/spqr/router/console" "github.com/pg-sharding/spqr/router/poolmgr" + "github.com/pg-sharding/spqr/router/port" "github.com/pg-sharding/spqr/router/qrouter" "github.com/pg-sharding/spqr/router/rulerouter" sdnotifier "github.com/pg-sharding/spqr/router/sdnotifier" @@ -162,8 +163,8 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string) (*InstanceIm }, nil } -func (r *InstanceImpl) serv(netconn net.Conn, admin_console bool) error { - routerClient, err := r.RuleRouter.PreRoute(netconn, admin_console) +func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error { + routerClient, err := r.RuleRouter.PreRoute(netconn, pt) if err != nil { _ = netconn.Close() return err @@ -176,7 +177,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, admin_console bool) error { return r.RuleRouter.CancelClient(routerClient.CancelMsg()) } - if admin_console || routerClient.DB() == "spqr-console" { + if pt == port.ADMRouterPortType || routerClient.DB() == "spqr-console" { return r.AdmConsole.Serve(context.Background(), routerClient) } @@ -198,7 +199,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, admin_console bool) error { return Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer) } -func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener) error { +func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error { if r.WithJaeger { closer, err := r.initJaegerTracer(r.RuleRouter.Config()) if err != nil { @@ -229,7 +230,7 @@ func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener) error { go accept(listener, cChan) - if (r.notifier != nil) { + if r.notifier != nil { go func() { for { if err := r.notifier.Notify(); err != nil { @@ -248,7 +249,7 @@ func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener) error { _ = conn.Close() } else { go func() { - if err := r.serv(conn, false); err != nil { + if err := r.serv(conn, pt); err != nil { spqrlog.Zero.Error().Err(err).Msg("error serving client") } }() @@ -290,7 +291,7 @@ func (r *InstanceImpl) RunAdm(ctx context.Context, listener net.Listener) error return nil case conn := <-cChan: go func() { - if err := r.serv(conn, true); err != nil { + if err := r.serv(conn, port.ADMRouterPortType); err != nil { spqrlog.Zero.Error().Err(err).Msg("") } }() diff --git a/router/port/port.go b/router/port/port.go new file mode 100644 index 000000000..be7fbfd23 --- /dev/null +++ b/router/port/port.go @@ -0,0 +1,11 @@ +package port + +type RouterPortType int + +const ( + DefaultRouterPortType = RouterPortType(0) + + RORouterPortType = RouterPortType(1) + + ADMRouterPortType = RouterPortType(1) +) diff --git a/router/rulerouter/rulerouter.go b/router/rulerouter/rulerouter.go index 9f5bc4490..013add858 100644 --- a/router/rulerouter/rulerouter.go +++ b/router/rulerouter/rulerouter.go @@ -16,9 +16,10 @@ import ( "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/qdb" rclient "github.com/pg-sharding/spqr/router/client" - notifier "github.com/pg-sharding/spqr/router/sdnotifier" + "github.com/pg-sharding/spqr/router/port" "github.com/pg-sharding/spqr/router/route" "github.com/pg-sharding/spqr/router/rule" + notifier "github.com/pg-sharding/spqr/router/sdnotifier" "github.com/pkg/errors" ) @@ -27,7 +28,7 @@ type RuleRouter interface { Shutdown() error Reload(configPath string) error - PreRoute(conn net.Conn, admin_client bool) (rclient.RouterClient, error) + PreRoute(conn net.Conn, pt port.RouterPortType) (rclient.RouterClient, error) PreRouteInitializedClientAdm(cl rclient.RouterClient) (rclient.RouterClient, error) ObsoleteRoute(key route.Key) error @@ -163,8 +164,8 @@ func NewRouter(tlsconfig *tls.Config, rcfg *config.Router, notifier *notifier.No } } -func (r *RuleRouterImpl) PreRoute(conn net.Conn, admin_client bool) (rclient.RouterClient, error) { - cl := rclient.NewPsqlClient(conn) +func (r *RuleRouterImpl) PreRoute(conn net.Conn, pt port.RouterPortType) (rclient.RouterClient, error) { + cl := rclient.NewPsqlClient(conn, pt) if err := cl.Init(r.tlsconfig); err != nil { return cl, err @@ -174,7 +175,7 @@ func (r *RuleRouterImpl) PreRoute(conn net.Conn, admin_client bool) (rclient.Rou return cl, nil } - if admin_client || cl.DB() == "spqr-console" { + if pt == port.ADMRouterPortType || cl.DB() == "spqr-console" { return r.PreRouteInitializedClientAdm(cl) }