Skip to content

Commit

Permalink
proxy: remove driver and queryctx (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored Aug 3, 2022
1 parent 080c85f commit a6db638
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 273 deletions.
3 changes: 1 addition & 2 deletions pkg/manager/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pingcap/TiProxy/pkg/config"
"github.com/pingcap/TiProxy/pkg/manager/router"
"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/TiProxy/pkg/util/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -146,7 +145,7 @@ func (mgr *NamespaceManager) Init(logger *zap.Logger, nss []*config.Namespace, c
return mgr.CommitNamespaces(nss, nil)
}

func (n *NamespaceManager) GetNamespace(nm string) (driver.Namespace, bool) {
func (n *NamespaceManager) GetNamespace(nm string) (*Namespace, bool) {
n.RLock()
defer n.RUnlock()

Expand Down
6 changes: 3 additions & 3 deletions pkg/manager/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package namespace
import (
"crypto/tls"

"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/TiProxy/pkg/manager/router"
)

type Namespace struct {
name string
router driver.Router
router router.Router
frontendTLS *tls.Config
backendTLS *tls.Config
}
Expand All @@ -40,7 +40,7 @@ func (n *Namespace) BackendTLSConfig() *tls.Config {
return n.backendTLS
}

func (n *Namespace) GetRouter() driver.Router {
func (n *Namespace) GetRouter() router.Router {
return n.router
}

Expand Down
29 changes: 23 additions & 6 deletions pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ import (
"time"

"github.com/pingcap/TiProxy/pkg/config"
"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type Router interface {
Route(RedirectableConn) (string, error)
RedirectConnections() error
Close()
}

var (
ErrNoInstanceToSelect = errors.New("no instances to route")
)
Expand All @@ -46,6 +51,18 @@ const (
rebalanceMaxScoreRatio = 1.1
)

type ConnEventReceiver interface {
OnRedirectSucceed(from, to string, conn RedirectableConn)
OnRedirectFail(from, to string, conn RedirectableConn)
OnConnClosed(addr string, conn RedirectableConn)
}

type RedirectableConn interface {
SetEventReceiver(receiver ConnEventReceiver)
Redirect(addr string)
ConnectionID() uint64
}

type BackendWrapper struct {
BackendInfo
addr string
Expand All @@ -59,7 +76,7 @@ func (b *BackendWrapper) score() int {
}

type ConnWrapper struct {
driver.RedirectableConn
RedirectableConn
phase int
}

Expand Down Expand Up @@ -88,7 +105,7 @@ func NewRandomRouter(cfg *config.BackendNamespace, client *clientv3.Client) (*Ra
return router, err
}

func (router *RandomRouter) Route(conn driver.RedirectableConn) (string, error) {
func (router *RandomRouter) Route(conn RedirectableConn) (string, error) {
router.Lock()
defer router.Unlock()
be := router.backends.Back()
Expand Down Expand Up @@ -187,7 +204,7 @@ func (router *RandomRouter) lookupBackend(addr string, forward bool) *list.Eleme
return nil
}

func (router *RandomRouter) OnRedirectSucceed(from, to string, conn driver.RedirectableConn) {
func (router *RandomRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
Expand All @@ -205,7 +222,7 @@ func (router *RandomRouter) OnRedirectSucceed(from, to string, conn driver.Redir
connWrapper.phase = phaseRedirectEnd
}

func (router *RandomRouter) OnRedirectFail(from, to string, conn driver.RedirectableConn) {
func (router *RandomRouter) OnRedirectFail(from, to string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
Expand All @@ -231,7 +248,7 @@ func (router *RandomRouter) OnRedirectFail(from, to string, conn driver.Redirect
router.addConn(be, connWrapper)
}

func (router *RandomRouter) OnConnClosed(addr string, conn driver.RedirectableConn) {
func (router *RandomRouter) OnConnClosed(addr string, conn RedirectableConn) {
connID := conn.ConnectionID()
router.Lock()
defer router.Unlock()
Expand Down
21 changes: 7 additions & 14 deletions pkg/proxy/backend/backend_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,26 @@ const (

type connectionPhase byte

type BackendConnection interface {
Addr() string
Connect() error
PacketIO() *pnet.PacketIO
Close() error
}

type BackendConnectionImpl struct {
type BackendConnection struct {
pkt *pnet.PacketIO // a helper to read and write data in packet format.
alloc arena.Allocator
phase connectionPhase
capability uint32
address string
}

func NewBackendConnectionImpl(address string) *BackendConnectionImpl {
return &BackendConnectionImpl{
func NewBackendConnection(address string) *BackendConnection {
return &BackendConnection{
address: address,
alloc: arena.NewAllocator(32 * 1024),
}
}

func (bc *BackendConnectionImpl) Addr() string {
func (bc *BackendConnection) Addr() string {
return bc.address
}

func (bc *BackendConnectionImpl) Connect() error {
func (bc *BackendConnection) Connect() error {
cn, err := net.DialTimeout("tcp", bc.address, DialTimeout)
if err != nil {
return errors.New("dial backend error")
Expand All @@ -66,11 +59,11 @@ func (bc *BackendConnectionImpl) Connect() error {
return nil
}

func (bc *BackendConnectionImpl) PacketIO() *pnet.PacketIO {
func (bc *BackendConnection) PacketIO() *pnet.PacketIO {
return bc.pkt
}

func (bc *BackendConnectionImpl) Close() error {
func (bc *BackendConnection) Close() error {
if bc.pkt != nil {
return bc.pkt.Close()
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sync/atomic"
"unsafe"

"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/TiProxy/pkg/manager/router"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/mysql"
Expand All @@ -47,15 +47,15 @@ type BackendConnManager struct {
connectionID uint64
authenticator *Authenticator
cmdProcessor *CmdProcessor
eventReceiver driver.ConnEventReceiver
backendConn BackendConnection
eventReceiver router.ConnEventReceiver
backendConn *BackendConnection
processLock sync.Mutex // to make redirecting and command processing exclusive
signalReceived chan struct{}
signal unsafe.Pointer // type *signalRedirect
cancelFunc context.CancelFunc
}

func NewBackendConnManager(connectionID uint64) driver.BackendConnManager {
func NewBackendConnManager(connectionID uint64) *BackendConnManager {
return &BackendConnManager{
connectionID: connectionID,
cmdProcessor: NewCmdProcessor(),
Expand All @@ -71,7 +71,7 @@ func (mgr *BackendConnManager) ConnectionID() uint64 {
func (mgr *BackendConnManager) Connect(ctx context.Context, serverAddr string, clientIO *pnet.PacketIO, serverTLSConfig, backendTLSConfig *tls.Config) error {
mgr.processLock.Lock()
defer mgr.processLock.Unlock()
mgr.backendConn = NewBackendConnectionImpl(serverAddr)
mgr.backendConn = NewBackendConnection(serverAddr)
if err := mgr.backendConn.Connect(); err != nil {
return err
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, c
return err
}

func (mgr *BackendConnManager) SetEventReceiver(receiver driver.ConnEventReceiver) {
func (mgr *BackendConnManager) SetEventReceiver(receiver router.ConnEventReceiver) {
mgr.eventReceiver = receiver
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) (err error) {
return
}

newConn := NewBackendConnectionImpl(to)
newConn := NewBackendConnection(to)
if err = newConn.Connect(); err != nil {
return
}
Expand Down
52 changes: 38 additions & 14 deletions pkg/proxy/client/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
"crypto/tls"
"net"

"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/TiProxy/pkg/manager/namespace"
"github.com/pingcap/TiProxy/pkg/proxy/backend"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/metrics"
Expand All @@ -29,35 +30,55 @@ import (
"go.uber.org/zap"
)

type ClientConnectionImpl struct {
type ClientConnection struct {
serverTLSConfig *tls.Config // the TLS config to connect to clients.
backendTLSConfig *tls.Config // the TLS config to connect to TiDB server.
pkt *pnet.PacketIO // a helper to read and write data in packet format.
queryCtx driver.QueryCtx
connectionID uint64
nsmgr *namespace.NamespaceManager
ns *namespace.Namespace
connMgr *backend.BackendConnManager
}

func NewClientConnectionImpl(queryCtx driver.QueryCtx, conn net.Conn, connectionID uint64, serverTLSConfig *tls.Config, backendTLSConfig *tls.Config) driver.ClientConnection {
func NewClientConnection(conn net.Conn, connectionID uint64, serverTLSConfig *tls.Config, backendTLSConfig *tls.Config, nsmgr *namespace.NamespaceManager, bemgr *backend.BackendConnManager) *ClientConnection {
pkt := pnet.NewPacketIO(conn)
return &ClientConnectionImpl{
queryCtx: queryCtx,
return &ClientConnection{
serverTLSConfig: serverTLSConfig,
backendTLSConfig: backendTLSConfig,
pkt: pkt,
connectionID: connectionID,
nsmgr: nsmgr,
connMgr: bemgr,
}
}

func (cc *ClientConnectionImpl) ConnectionID() uint64 {
func (cc *ClientConnection) ConnectionID() uint64 {
return cc.connectionID
}

func (cc *ClientConnectionImpl) Addr() string {
func (cc *ClientConnection) Addr() string {
return cc.pkt.RemoteAddr().String()
}

func (cc *ClientConnectionImpl) Run(ctx context.Context) {
if err := cc.queryCtx.ConnectBackend(ctx, cc.pkt, cc.serverTLSConfig, cc.backendTLSConfig); err != nil {
func (cc *ClientConnection) ConnectBackend(ctx context.Context) error {
ns, ok := cc.nsmgr.GetNamespace("")
if !ok {
return errors.New("failed to find a namespace")
}
cc.ns = ns
router := ns.GetRouter()
addr, err := router.Route(cc.connMgr)
if err != nil {
return err
}
if err = cc.connMgr.Connect(ctx, addr, cc.pkt, cc.serverTLSConfig, cc.backendTLSConfig); err != nil {
return err
}
return nil
}

func (cc *ClientConnection) Run(ctx context.Context) {
if err := cc.ConnectBackend(ctx); err != nil {
logutil.Logger(ctx).Info("new connection fails", zap.String("remoteAddr", cc.Addr()), zap.Error(err))
metrics.HandShakeErrorCounter.Inc()
err = cc.Close()
Expand All @@ -72,7 +93,7 @@ func (cc *ClientConnectionImpl) Run(ctx context.Context) {
}
}

func (cc *ClientConnectionImpl) processMsg(ctx context.Context) error {
func (cc *ClientConnection) processMsg(ctx context.Context) error {
defer func() {
err := cc.Close()
terror.Log(errors.Trace(err))
Expand All @@ -83,7 +104,7 @@ func (cc *ClientConnectionImpl) processMsg(ctx context.Context) error {
if err != nil {
return err
}
err = cc.queryCtx.ExecuteCmd(ctx, clientPkt, cc.pkt)
err = cc.connMgr.ExecuteCmd(ctx, clientPkt, cc.pkt)
if err != nil {
return err
}
Expand All @@ -95,9 +116,12 @@ func (cc *ClientConnectionImpl) processMsg(ctx context.Context) error {
}
}

func (cc *ClientConnectionImpl) Close() error {
func (cc *ClientConnection) Close() error {
if err := cc.pkt.Close(); err != nil {
terror.Log(err)
}
return cc.queryCtx.Close()
if err := cc.connMgr.Close(); err != nil {
terror.Log(err)
}
return nil
}
Loading

0 comments on commit a6db638

Please sign in to comment.