Skip to content

Commit

Permalink
backend: add tests for transaction status (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Aug 11, 2022
1 parent 898d46a commit 5bd467f
Show file tree
Hide file tree
Showing 7 changed files with 446 additions and 48 deletions.
18 changes: 8 additions & 10 deletions pkg/proxy/backend/cmd_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (

const (
StatusInTrans uint32 = 1 << iota
StatusAutoCommit
StatusQuit
StatusPrepareWaitExecute
StatusPrepareWaitFetch
StatusQuit
)

// CmdProcessor maintains the transaction and prepared statement status and decides whether the session can be redirected.
Expand All @@ -39,7 +38,7 @@ type CmdProcessor struct {

func NewCmdProcessor() *CmdProcessor {
return &CmdProcessor{
serverStatus: StatusAutoCommit,
serverStatus: 0,
preparedStmtStatus: make(map[int]uint32),
}
}
Expand All @@ -66,11 +65,6 @@ func (cp *CmdProcessor) updateServerStatus(request []byte, serverStatus uint16)
}

func (cp *CmdProcessor) updateTxnStatus(serverStatus uint16) {
if serverStatus&mysql.ServerStatusAutocommit > 0 {
cp.serverStatus |= StatusAutoCommit
} else {
cp.serverStatus &^= StatusAutoCommit
}
if serverStatus&mysql.ServerStatusInTrans > 0 {
cp.serverStatus |= StatusInTrans
} else {
Expand Down Expand Up @@ -117,10 +111,14 @@ func (cp *CmdProcessor) canRedirect() bool {
return false
}
// If any result of the prepared statements is not fetched, we should wait.
return !cp.hasPendingPreparedStmts()
}

func (cp *CmdProcessor) hasPendingPreparedStmts() bool {
for _, serverStatus := range cp.preparedStmtStatus {
if serverStatus > 0 {
return false
return true
}
}
return true
return false
}
12 changes: 7 additions & 5 deletions pkg/proxy/backend/cmd_processor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,18 @@ func (cp *CmdProcessor) forwardQuitCmd() (succeed bool, err error) {
// We can send a `COMMIT` statement to the current backend and then forward the `BEGIN` statement to the new backend.
func (cp *CmdProcessor) needHoldRequest(request []byte) bool {
cmd, data := request[0], request[1:]
// BEGIN/START TRANSACTION statements cannot be prepared.
if cmd != mysql.ComQuery {
return false
}
// Skip checking prepared statements because the cursor will be discarded.
// Hold request only when it's waiting for the end of the transaction.
if cp.serverStatus&StatusInTrans == 0 {
return false
}
// Opening result sets can still be fetched after COMMIT/ROLLBACK, so don't hold.
if cp.hasPendingPreparedStmts() {
return false
}
if len(data) > 0 && data[len(data)-1] == 0 {
data = data[:len(data)-1]
}
Expand All @@ -330,8 +335,5 @@ func (cp *CmdProcessor) needHoldRequest(request []byte) bool {

func isBeginStmt(query string) bool {
normalized := parser.Normalize(query)
if strings.HasPrefix(normalized, "begin") || strings.HasPrefix(normalized, "start transaction") {
return true
}
return false
return strings.HasPrefix(normalized, "begin") || strings.HasPrefix(normalized, "start transaction")
}
Loading

0 comments on commit 5bd467f

Please sign in to comment.