Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: remove domain types #16

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/manager/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pingcap/TiProxy/pkg/config"
"github.com/pingcap/TiProxy/pkg/manager/router"
"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/TiProxy/pkg/util/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -146,7 +145,7 @@ func (mgr *NamespaceManager) Init(logger *zap.Logger, nss []*config.Namespace, c
return mgr.CommitNamespaces(nss, nil)
}

func (n *NamespaceManager) GetNamespace(nm string) (driver.Namespace, bool) {
func (n *NamespaceManager) GetNamespace(nm string) (*Namespace, bool) {
n.RLock()
defer n.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package namespace
import (
"crypto/tls"

"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/TiProxy/pkg/manager/router"
)

type Namespace struct {
Expand All @@ -40,7 +40,7 @@ func (n *Namespace) BackendTLSConfig() *tls.Config {
return n.backendTLS
}

func (n *Namespace) GetRouter() driver.Router {
func (n *Namespace) GetRouter() router.Router {
return n.router
}

Expand Down
29 changes: 23 additions & 6 deletions pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ import (
"time"

"github.com/pingcap/TiProxy/pkg/config"
"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type Router interface {
Route(RedirectableConn) (string, error)
RedirectConnections() error
Close()
}

var (
ErrNoInstanceToSelect = errors.New("no instances to route")
)
Expand All @@ -46,6 +51,18 @@ const (
rebalanceMaxScoreRatio = 1.1
)

type ConnEventReceiver interface {
OnRedirectSucceed(from, to string, conn RedirectableConn)
OnRedirectFail(from, to string, conn RedirectableConn)
OnConnClosed(addr string, conn RedirectableConn)
}

type RedirectableConn interface {
SetEventReceiver(receiver ConnEventReceiver)
Redirect(addr string)
ConnectionID() uint64
}

type BackendWrapper struct {
BackendInfo
addr string
Expand All @@ -59,7 +76,7 @@ func (b *BackendWrapper) score() int {
}

type ConnWrapper struct {
driver.RedirectableConn
RedirectableConn
phase int
}

Expand Down Expand Up @@ -88,7 +105,7 @@ func NewRandomRouter(cfg *config.BackendNamespace, client *clientv3.Client) (*Ra
return router, err
}

func (router *RandomRouter) Route(conn driver.RedirectableConn) (string, error) {
func (router *RandomRouter) Route(conn RedirectableConn) (string, error) {
router.Lock()
defer router.Unlock()
be := router.backends.Back()
Expand Down Expand Up @@ -187,7 +204,7 @@ func (router *RandomRouter) lookupBackend(addr string, forward bool) *list.Eleme
return nil
}

func (router *RandomRouter) OnRedirectSucceed(from, to string, conn driver.RedirectableConn) {
func (router *RandomRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
Expand All @@ -205,7 +222,7 @@ func (router *RandomRouter) OnRedirectSucceed(from, to string, conn driver.Redir
connWrapper.phase = phaseRedirectEnd
}

func (router *RandomRouter) OnRedirectFail(from, to string, conn driver.RedirectableConn) {
func (router *RandomRouter) OnRedirectFail(from, to string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
Expand All @@ -231,7 +248,7 @@ func (router *RandomRouter) OnRedirectFail(from, to string, conn driver.Redirect
router.addConn(be, connWrapper)
}

func (router *RandomRouter) OnConnClosed(addr string, conn driver.RedirectableConn) {
func (router *RandomRouter) OnConnClosed(addr string, conn RedirectableConn) {
connID := conn.ConnectionID()
router.Lock()
defer router.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sync/atomic"
"unsafe"

"github.com/pingcap/TiProxy/pkg/proxy/driver"
"github.com/pingcap/TiProxy/pkg/manager/router"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/mysql"
Expand All @@ -47,15 +47,15 @@ type BackendConnManager struct {
connectionID uint64
authenticator *Authenticator
cmdProcessor *CmdProcessor
eventReceiver driver.ConnEventReceiver
eventReceiver router.ConnEventReceiver
backendConn BackendConnection
processLock sync.Mutex // to make redirecting and command processing exclusive
signalReceived chan struct{}
signal unsafe.Pointer // type *signalRedirect
cancelFunc context.CancelFunc
}

func NewBackendConnManager(connectionID uint64) driver.BackendConnManager {
func NewBackendConnManager(connectionID uint64) *BackendConnManager {
return &BackendConnManager{
connectionID: connectionID,
cmdProcessor: NewCmdProcessor(),
Expand Down Expand Up @@ -119,7 +119,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, c
return err
}

func (mgr *BackendConnManager) SetEventReceiver(receiver driver.ConnEventReceiver) {
func (mgr *BackendConnManager) SetEventReceiver(receiver router.ConnEventReceiver) {
mgr.eventReceiver = receiver
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/proxy/client/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"crypto/tls"
"net"

"github.com/pingcap/TiProxy/pkg/proxy/driver"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/metrics"
Expand All @@ -29,19 +28,19 @@ import (
"go.uber.org/zap"
)

type ClientConnectionImpl struct {
type ClientConnection struct {
serverTLSConfig *tls.Config // the TLS config to connect to clients.
backendTLSConfig *tls.Config // the TLS config to connect to TiDB server.
pkt *pnet.PacketIO // a helper to read and write data in packet format.
bufReadConn *pnet.BufferedReadConn // a buffered-read net.Conn or buffered-read tls.Conn.
queryCtx driver.QueryCtx
queryCtx *QueryCtxImpl
connectionID uint64
}

func NewClientConnectionImpl(queryCtx driver.QueryCtx, conn net.Conn, connectionID uint64, serverTLSConfig, backendTLSConfig *tls.Config) driver.ClientConnection {
func NewClientConnectionImpl(queryCtx *QueryCtxImpl, conn net.Conn, connectionID uint64, serverTLSConfig, backendTLSConfig *tls.Config) *ClientConnection {
bufReadConn := pnet.NewBufferedReadConn(conn)
pkt := pnet.NewPacketIO(bufReadConn)
return &ClientConnectionImpl{
return &ClientConnection{
queryCtx: queryCtx,
serverTLSConfig: serverTLSConfig,
backendTLSConfig: backendTLSConfig,
Expand All @@ -51,15 +50,15 @@ func NewClientConnectionImpl(queryCtx driver.QueryCtx, conn net.Conn, connection
}
}

func (cc *ClientConnectionImpl) ConnectionID() uint64 {
func (cc *ClientConnection) ConnectionID() uint64 {
return cc.connectionID
}

func (cc *ClientConnectionImpl) Addr() string {
func (cc *ClientConnection) Addr() string {
return cc.bufReadConn.RemoteAddr().String()
}

func (cc *ClientConnectionImpl) Run(ctx context.Context) {
func (cc *ClientConnection) Run(ctx context.Context) {
if err := cc.queryCtx.ConnectBackend(ctx, cc.pkt, cc.serverTLSConfig, cc.backendTLSConfig); err != nil {
logutil.Logger(ctx).Info("new connection fails", zap.String("remoteAddr", cc.Addr()), zap.Error(err))
metrics.HandShakeErrorCounter.Inc()
Expand All @@ -75,7 +74,7 @@ func (cc *ClientConnectionImpl) Run(ctx context.Context) {
}
}

func (cc *ClientConnectionImpl) processMsg(ctx context.Context) error {
func (cc *ClientConnection) processMsg(ctx context.Context) error {
defer func() {
err := cc.Close()
terror.Log(errors.Trace(err))
Expand All @@ -98,7 +97,7 @@ func (cc *ClientConnectionImpl) processMsg(ctx context.Context) error {
}
}

func (cc *ClientConnectionImpl) Close() error {
func (cc *ClientConnection) Close() error {
if err := cc.pkt.Close(); err != nil {
terror.Log(err)
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/proxy/driver/queryctx.go → pkg/proxy/client/queryctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package driver
package client

import (
"context"
"crypto/tls"
"errors"

mgrns "github.com/pingcap/TiProxy/pkg/manager/namespace"
"github.com/pingcap/TiProxy/pkg/proxy/backend"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
)

Expand All @@ -41,12 +43,12 @@ const (

type QueryCtxImpl struct {
connId uint64
nsmgr NamespaceManager
ns Namespace
connMgr BackendConnManager
nsmgr *mgrns.NamespaceManager
ns *mgrns.Namespace
connMgr *backend.BackendConnManager
}

func NewQueryCtxImpl(nsmgr NamespaceManager, backendConnMgr BackendConnManager, connId uint64) *QueryCtxImpl {
func NewQueryCtxImpl(nsmgr *mgrns.NamespaceManager, backendConnMgr *backend.BackendConnManager, connId uint64) *QueryCtxImpl {
return &QueryCtxImpl{
connId: connId,
nsmgr: nsmgr,
Expand Down
86 changes: 0 additions & 86 deletions pkg/proxy/driver/domain.go

This file was deleted.

20 changes: 12 additions & 8 deletions pkg/proxy/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,31 @@ package driver
import (
"crypto/tls"
"net"

mgrns "github.com/pingcap/TiProxy/pkg/manager/namespace"
"github.com/pingcap/TiProxy/pkg/proxy/backend"
"github.com/pingcap/TiProxy/pkg/proxy/client"
)

type createClientConnFunc func(QueryCtx, net.Conn, uint64, *tls.Config, *tls.Config) ClientConnection
type createBackendConnMgrFunc func(connectionID uint64) BackendConnManager
type createClientConnFunc func(*client.QueryCtxImpl, net.Conn, uint64, *tls.Config, *tls.Config) *client.ClientConnection
type createBackendConnMgrFunc func(connectionID uint64) *backend.BackendConnManager

type DriverImpl struct {
nsmgr NamespaceManager
type Driver struct {
nsmgr *mgrns.NamespaceManager
createClientConnFunc createClientConnFunc
createBackendConnMgrFunc createBackendConnMgrFunc
}

func NewDriverImpl(nsmgr NamespaceManager, createClientConnFunc createClientConnFunc, createBackendConnMgrFunc createBackendConnMgrFunc) *DriverImpl {
return &DriverImpl{
func NewDriverImpl(nsmgr *mgrns.NamespaceManager, createClientConnFunc createClientConnFunc, createBackendConnMgrFunc createBackendConnMgrFunc) *Driver {
return &Driver{
nsmgr: nsmgr,
createClientConnFunc: createClientConnFunc,
createBackendConnMgrFunc: createBackendConnMgrFunc,
}
}

func (d *DriverImpl) CreateClientConnection(conn net.Conn, connectionID uint64, serverTLSConfig, clusterTLSConfig *tls.Config) ClientConnection {
func (d *Driver) CreateClientConnection(conn net.Conn, connectionID uint64, serverTLSConfig, clusterTLSConfig *tls.Config) *client.ClientConnection {
backendConnMgr := d.createBackendConnMgrFunc(connectionID)
queryCtx := NewQueryCtxImpl(d.nsmgr, backendConnMgr, connectionID)
queryCtx := client.NewQueryCtxImpl(d.nsmgr, backendConnMgr, connectionID)
return d.createClientConnFunc(queryCtx, conn, connectionID, serverTLSConfig, clusterTLSConfig)
}
Loading