Skip to content

Commit

Permalink
Run separate read-only port (#325)
Browse files Browse the repository at this point in the history
* Run separate read-only port

* Fixes

* More fix

* More fix
  • Loading branch information
reshke authored Nov 24, 2023
1 parent 6542b96 commit 7fdfc40
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 27 deletions.
3 changes: 2 additions & 1 deletion balancer/pkg/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/port"
)

// TODO use only one place to store strings
Expand Down Expand Up @@ -868,7 +869,7 @@ func (b *Balancer) RunAdm(ctx context.Context, listener net.Listener, tlsCfg *tl
}

func (b *Balancer) servAdm(ctx context.Context, conn net.Conn, tlsconfig *tls.Config) error {
cl := client.NewPsqlClient(conn)
cl := client.NewPsqlClient(conn, port.DefaultRouterPortType)

if err := cl.Init(tlsconfig); err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions cmd/worldmock/worldmock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/pkg/txstatus"
"github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/route"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ func (w *WorldMock) Run() error {
}

func (w *WorldMock) serv(netconn net.Conn) error {
cl := client.NewPsqlClient(netconn)
cl := client.NewPsqlClient(netconn, port.DefaultRouterPortType)

err := cl.Init(nil)

Expand All @@ -94,7 +95,7 @@ func (w *WorldMock) serv(netconn net.Conn) error {
return err
}
spqrlog.Zero.Info().Msg("client auth OK")

for {
msg, err := cl.Receive()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pg-sharding/spqr/qdb"
router "github.com/pg-sharding/spqr/router"
psqlclient "github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/route"
spqrparser "github.com/pg-sharding/spqr/yacc/console"
)
Expand Down Expand Up @@ -1005,7 +1006,7 @@ func (qc *qdbCoordinator) UnregisterRouter(ctx context.Context, rID string) erro
}

func (qc *qdbCoordinator) PrepareClient(nconn net.Conn) (CoordinatorClient, error) {
cl := psqlclient.NewPsqlClient(nconn)
cl := psqlclient.NewPsqlClient(nconn, port.DefaultRouterPortType)

if err := cl.Init(nil); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/config/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Router struct {

Host string `json:"host" toml:"host" yaml:"host"`
RouterPort string `json:"router_port" toml:"router_port" yaml:"router_port"`
RouterROPort string `json:"router_ro_port" toml:"router_ro_port" yaml:"router_ro_port"`
AdminConsolePort string `json:"admin_console_port" toml:"admin_console_port" yaml:"admin_console_port"`
GrpcApiPort string `json:"grpc_api_port" toml:"grpc_api_port" yaml:"grpc_api_port"`

Expand Down
20 changes: 13 additions & 7 deletions router/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
router "github.com/pg-sharding/spqr/router"
rgrpc "github.com/pg-sharding/spqr/router/grpc"
"github.com/pg-sharding/spqr/router/port"
"google.golang.org/grpc"
)

Expand All @@ -25,15 +26,20 @@ func NewApp(sg *router.InstanceImpl) *App {
func (app *App) ServeRouter(ctx context.Context) error {
var lwg sync.WaitGroup

listen := map[string]struct{}{
net.JoinHostPort("localhost", app.spqr.RuleRouter.Config().RouterPort): {},
net.JoinHostPort(app.spqr.RuleRouter.Config().Host, app.spqr.RuleRouter.Config().RouterPort): {},
listen := map[string]port.RouterPortType{
net.JoinHostPort("localhost", app.spqr.RuleRouter.Config().RouterPort): port.DefaultRouterPortType,
net.JoinHostPort(app.spqr.RuleRouter.Config().Host, app.spqr.RuleRouter.Config().RouterPort): port.DefaultRouterPortType,
}

if app.spqr.RuleRouter.Config().RouterROPort != "" {
listen[net.JoinHostPort("localhost", app.spqr.RuleRouter.Config().RouterROPort)] = port.RORouterPortType
listen[net.JoinHostPort(app.spqr.RuleRouter.Config().Host, app.spqr.RuleRouter.Config().RouterROPort)] = port.RORouterPortType
}

lwg.Add(len(listen))

for addr := range listen {
go func(address string) {
for addr, portType := range listen {
go func(address string, pt port.RouterPortType) {
defer lwg.Done()
var listener net.Listener
var err error
Expand All @@ -58,8 +64,8 @@ func (app *App) ServeRouter(ctx context.Context) error {
spqrlog.Zero.Info().
Str("address", address).
Msg("SPQR Router is ready by postgresql proto")
_ = app.spqr.Run(ctx, listener)
}(addr)
_ = app.spqr.Run(ctx, listener, pt)
}(addr, portType)
}
lwg.Wait()

Expand Down
12 changes: 10 additions & 2 deletions router/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pg-sharding/spqr/pkg/shard"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/pkg/txstatus"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/route"
"github.com/pg-sharding/spqr/router/server"
)
Expand Down Expand Up @@ -105,14 +106,21 @@ type PsqlClient struct {
server server.Server
}

func NewPsqlClient(pgconn conn.RawConn) *PsqlClient {
func NewPsqlClient(pgconn conn.RawConn, pt port.RouterPortType) *PsqlClient {
tsa := config.TargetSessionAttrsRW

// enforce default port behaviour
if pt == port.RORouterPortType {
tsa = config.TargetSessionAttrsRO
}
cl := &PsqlClient{
activeParamSet: make(map[string]string),
conn: pgconn,
startupMsg: &pgproto3.StartupMessage{},
prepStmts: map[string]string{},
tsa: config.TargetSessionAttrsRW,
tsa: tsa,
}

cl.id = fmt.Sprintf("%p", cl)

return cl
Expand Down
5 changes: 3 additions & 2 deletions router/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/jackc/pgx/v5/pgproto3"
"github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/port"
"go.uber.org/mock/gomock"

"github.com/pg-sharding/spqr/pkg/conn"
Expand Down Expand Up @@ -40,7 +41,7 @@ func TestCancel(t *testing.T) {
return 12, nil
}).Times(1)

client := client.NewPsqlClient(rconn)
client := client.NewPsqlClient(rconn, port.DefaultRouterPortType)

err := client.Init(nil)
assert.Equal(uint32(7), client.CancelMsg().ProcessID)
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestNoGSSAPI(t *testing.T) {
return 4, nil
}).Times(1)

client := client.NewPsqlClient(rconn)
client := client.NewPsqlClient(rconn, port.DefaultRouterPortType)

err := client.Init(nil)
assert.Equal(exprErr, err)
Expand Down
15 changes: 8 additions & 7 deletions router/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/console"
"github.com/pg-sharding/spqr/router/poolmgr"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/qrouter"
"github.com/pg-sharding/spqr/router/rulerouter"
sdnotifier "github.com/pg-sharding/spqr/router/sdnotifier"
Expand Down Expand Up @@ -162,8 +163,8 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string) (*InstanceIm
}, nil
}

func (r *InstanceImpl) serv(netconn net.Conn, admin_console bool) error {
routerClient, err := r.RuleRouter.PreRoute(netconn, admin_console)
func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error {
routerClient, err := r.RuleRouter.PreRoute(netconn, pt)
if err != nil {
_ = netconn.Close()
return err
Expand All @@ -176,7 +177,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, admin_console bool) error {
return r.RuleRouter.CancelClient(routerClient.CancelMsg())
}

if admin_console || routerClient.DB() == "spqr-console" {
if pt == port.ADMRouterPortType || routerClient.DB() == "spqr-console" {
return r.AdmConsole.Serve(context.Background(), routerClient)
}

Expand All @@ -198,7 +199,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, admin_console bool) error {
return Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer)
}

func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener) error {
func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error {
if r.WithJaeger {
closer, err := r.initJaegerTracer(r.RuleRouter.Config())
if err != nil {
Expand Down Expand Up @@ -229,7 +230,7 @@ func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener) error {

go accept(listener, cChan)

if (r.notifier != nil) {
if r.notifier != nil {
go func() {
for {
if err := r.notifier.Notify(); err != nil {
Expand All @@ -248,7 +249,7 @@ func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener) error {
_ = conn.Close()
} else {
go func() {
if err := r.serv(conn, false); err != nil {
if err := r.serv(conn, pt); err != nil {
spqrlog.Zero.Error().Err(err).Msg("error serving client")
}
}()
Expand Down Expand Up @@ -290,7 +291,7 @@ func (r *InstanceImpl) RunAdm(ctx context.Context, listener net.Listener) error
return nil
case conn := <-cChan:
go func() {
if err := r.serv(conn, true); err != nil {
if err := r.serv(conn, port.ADMRouterPortType); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}()
Expand Down
11 changes: 11 additions & 0 deletions router/port/port.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package port

type RouterPortType int

const (
DefaultRouterPortType = RouterPortType(0)

RORouterPortType = RouterPortType(1)

ADMRouterPortType = RouterPortType(2)
)
11 changes: 6 additions & 5 deletions router/rulerouter/rulerouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/qdb"
rclient "github.com/pg-sharding/spqr/router/client"
notifier "github.com/pg-sharding/spqr/router/sdnotifier"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/route"
"github.com/pg-sharding/spqr/router/rule"
notifier "github.com/pg-sharding/spqr/router/sdnotifier"
"github.com/pkg/errors"
)

Expand All @@ -27,7 +28,7 @@ type RuleRouter interface {

Shutdown() error
Reload(configPath string) error
PreRoute(conn net.Conn, admin_client bool) (rclient.RouterClient, error)
PreRoute(conn net.Conn, pt port.RouterPortType) (rclient.RouterClient, error)
PreRouteInitializedClientAdm(cl rclient.RouterClient) (rclient.RouterClient, error)
ObsoleteRoute(key route.Key) error

Expand Down Expand Up @@ -163,8 +164,8 @@ func NewRouter(tlsconfig *tls.Config, rcfg *config.Router, notifier *notifier.No
}
}

func (r *RuleRouterImpl) PreRoute(conn net.Conn, admin_client bool) (rclient.RouterClient, error) {
cl := rclient.NewPsqlClient(conn)
func (r *RuleRouterImpl) PreRoute(conn net.Conn, pt port.RouterPortType) (rclient.RouterClient, error) {
cl := rclient.NewPsqlClient(conn, pt)

if err := cl.Init(r.tlsconfig); err != nil {
return cl, err
Expand All @@ -174,7 +175,7 @@ func (r *RuleRouterImpl) PreRoute(conn net.Conn, admin_client bool) (rclient.Rou
return cl, nil
}

if admin_client || cl.DB() == "spqr-console" {
if pt == port.ADMRouterPortType || cl.DB() == "spqr-console" {
return r.PreRouteInitializedClientAdm(cl)
}

Expand Down

0 comments on commit 7fdfc40

Please sign in to comment.