Skip to content

Commit

Permalink
proxy: use pnet.Command (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored Jun 2, 2023
1 parent b2b6cdc commit e89da8f
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 178 deletions.
8 changes: 4 additions & 4 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (
err = mysql.ErrMalformPacket
return
}
cmd := request[0]
cmd := pnet.Command(request[0])
startTime := time.Now()
mgr.processLock.Lock()
defer mgr.processLock.Unlock()
Expand All @@ -285,9 +285,9 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (
}
if err == nil {
switch cmd {
case mysql.ComQuit:
case pnet.ComQuit:
return
case mysql.ComSetOption:
case pnet.ComSetOption:
val := binary.LittleEndian.Uint16(request[1:])
switch val {
case 0:
Expand All @@ -300,7 +300,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (
err = errors.Errorf("unrecognized set_option value:%d", val)
return
}
case mysql.ComChangeUser:
case pnet.ComChangeUser:
username, db := pnet.ParseChangeUser(request)
mgr.authenticator.changeUser(username, db)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/proxy/backend/backend_conn_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ func (ts *backendMgrTester) forwardCmd4Proxy(clientIO, backendIO *pnet.PacketIO)
clientIO.ResetSequence()
request, err := clientIO.ReadPacket()
require.NoError(ts.t, err)
prevCounter, err := readCmdCounter(request[0], ts.tc.backendListener.Addr().String())
prevCounter, err := readCmdCounter(pnet.Command(request[0]), ts.tc.backendListener.Addr().String())
require.NoError(ts.t, err)
rsErr := ts.mp.ExecuteCmd(context.Background(), request)
curCounter, err := readCmdCounter(request[0], ts.tc.backendListener.Addr().String())
curCounter, err := readCmdCounter(pnet.Command(request[0]), ts.tc.backendListener.Addr().String())
require.NoError(ts.t, err)
require.Equal(ts.t, prevCounter+1, curCounter)
return rsErr
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestSpecialCmds(t *testing.T) {
// change user
{
client: func(packetIO *pnet.PacketIO) error {
ts.mc.cmd = mysql.ComChangeUser
ts.mc.cmd = pnet.ComChangeUser
ts.mc.username = "another_user"
ts.mc.dbName = "another_db"
return ts.mc.request(packetIO)
Expand All @@ -481,7 +481,7 @@ func TestSpecialCmds(t *testing.T) {
// disable multi-stmts
{
client: func(packetIO *pnet.PacketIO) error {
ts.mc.cmd = mysql.ComSetOption
ts.mc.cmd = pnet.ComSetOption
ts.mc.dataBytes = []byte{1, 0}
return ts.mc.request(packetIO)
},
Expand Down
12 changes: 6 additions & 6 deletions pkg/proxy/backend/cmd_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ func (cp *CmdProcessor) updatePrepStmtStatus(request []byte, serverStatus uint16
stmtID int
prepStmtStatus uint32
)
cmd := request[0]
cmd := pnet.Command(request[0])
switch cmd {
case mysql.ComStmtSendLongData, mysql.ComStmtExecute, mysql.ComStmtFetch, mysql.ComStmtReset, mysql.ComStmtClose:
case pnet.ComStmtSendLongData, pnet.ComStmtExecute, pnet.ComStmtFetch, pnet.ComStmtReset, pnet.ComStmtClose:
stmtID = int(binary.LittleEndian.Uint32(request[1:5]))
case mysql.ComResetConnection, mysql.ComChangeUser:
case pnet.ComResetConnection, pnet.ComChangeUser:
cp.preparedStmtStatus = make(map[int]uint32)
return
default:
return
}
switch cmd {
case mysql.ComStmtSendLongData:
case pnet.ComStmtSendLongData:
prepStmtStatus = StatusPrepareWaitExecute
case mysql.ComStmtExecute:
case pnet.ComStmtExecute:
if serverStatus&mysql.ServerStatusCursorExists > 0 {
prepStmtStatus = StatusPrepareWaitFetch
}
case mysql.ComStmtFetch:
case pnet.ComStmtFetch:
if serverStatus&mysql.ServerStatusLastRowSend == 0 {
prepStmtStatus = StatusPrepareWaitFetch
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/proxy/backend/cmd_processor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (cp *CmdProcessor) executeCmd(request []byte, clientIO, backendIO *pnet.Pac
}

func (cp *CmdProcessor) forwardCommand(clientIO, backendIO *pnet.PacketIO, request []byte) error {
cmd := request[0]
if cmd != mysql.ComChangeUser {
cmd := pnet.Command(request[0])
if cmd != pnet.ComChangeUser {
if err := backendIO.WritePacket(request, true); err != nil {
return err
}
Expand All @@ -49,23 +49,23 @@ func (cp *CmdProcessor) forwardCommand(clientIO, backendIO *pnet.PacketIO, reque
}
}
switch cmd {
case mysql.ComStmtPrepare:
case pnet.ComStmtPrepare:
return cp.forwardPrepareCmd(clientIO, backendIO)
case mysql.ComStmtFetch:
case pnet.ComStmtFetch:
return cp.forwardFetchCmd(clientIO, backendIO, request)
case mysql.ComQuery, mysql.ComStmtExecute, mysql.ComProcessInfo:
case pnet.ComQuery, pnet.ComStmtExecute, pnet.ComProcessInfo:
return cp.forwardQueryCmd(clientIO, backendIO, request)
case mysql.ComStmtClose:
case pnet.ComStmtClose:
return cp.forwardCloseCmd(request)
case mysql.ComStmtSendLongData:
case pnet.ComStmtSendLongData:
return cp.forwardSendLongDataCmd(request)
case mysql.ComChangeUser:
case pnet.ComChangeUser:
return cp.forwardChangeUserCmd(clientIO, backendIO, request)
case mysql.ComStatistics:
case pnet.ComStatistics:
return cp.forwardStatisticsCmd(clientIO, backendIO)
case mysql.ComFieldList:
case pnet.ComFieldList:
return cp.forwardFieldListCmd(clientIO, backendIO, request)
case mysql.ComQuit:
case pnet.ComQuit:
return cp.forwardQuitCmd()
}

Expand Down Expand Up @@ -158,7 +158,7 @@ func (cp *CmdProcessor) forwardPrepareCmd(clientIO, backendIO *pnet.PacketIO) er
return cp.handleErrorPacket(response)
}
// impossible here
return errors.Errorf("unexpected response, cmd:%d resp:%d", mysql.ComStmtPrepare, response[0])
return errors.Errorf("unexpected response, cmd:%d resp:%d", pnet.ComStmtPrepare, response[0])
}

func (cp *CmdProcessor) forwardFetchCmd(clientIO, backendIO *pnet.PacketIO, request []byte) error {
Expand Down Expand Up @@ -231,7 +231,7 @@ func (cp *CmdProcessor) forwardLoadInFile(clientIO, backendIO *pnet.PacketIO, re
return serverStatus, cp.handleErrorPacket(response)
}
// impossible here
return serverStatus, errors.Errorf("unexpected response, cmd:%d resp:%d", mysql.ComQuery, response[0])
return serverStatus, errors.Errorf("unexpected response, cmd:%d resp:%d", pnet.ComQuery, response[0])
}

func (cp *CmdProcessor) forwardResultSet(clientIO, backendIO *pnet.PacketIO, request []byte) (uint16, error) {
Expand Down Expand Up @@ -310,9 +310,9 @@ func (cp *CmdProcessor) forwardQuitCmd() error {
// The application may always omit `COMMIT` and thus the session can never be redirected.
// We can send a `COMMIT` statement to the current backend and then forward the `BEGIN` statement to the new backend.
func (cp *CmdProcessor) needHoldRequest(request []byte) bool {
cmd, data := request[0], request[1:]
cmd, data := pnet.Command(request[0]), request[1:]
// BEGIN/START TRANSACTION statements cannot be prepared.
if cmd != mysql.ComQuery {
if cmd != pnet.ComQuery {
return false
}
// Hold request only when it's waiting for the end of the transaction.
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/backend/cmd_processor_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (cp *CmdProcessor) query(packetIO *pnet.PacketIO, sql string) (result *gomy
packetIO.ResetSequence()
data := hack.Slice(sql)
request := make([]byte, 0, 1+len(data))
request = append(request, mysql.ComQuery)
request = append(request, pnet.ComQuery.Byte())
request = append(request, data...)
if err = packetIO.WritePacket(request, true); err != nil {
return
Expand Down
Loading

0 comments on commit e89da8f

Please sign in to comment.