Skip to content

Commit

Permalink
fix: add txn test
Browse files Browse the repository at this point in the history
  • Loading branch information
newborn22 committed Dec 16, 2024
1 parent a2dee82 commit cd1f489
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
13 changes: 11 additions & 2 deletions endtoend/branch/branch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func printBranchShowMergeBackDDL(rows *sql.Rows) {
}

func TestBranchBasic(t *testing.T) {
for i := 0; i < 5; i++ {
for i := 0; i < 0; i++ {
testBranchBasic(t)
}
}
Expand Down Expand Up @@ -484,7 +484,16 @@ func TestBranchBasicWithFailPoint(t *testing.T) {
defer rows.Close()
printBranchDiff(rows)

// branch prepare merge back
// test branch prepare merge back

// first let ExecInTxn always rollback, so the merge back ddl table should be empty, because it is inserted in a txn
framework.EnableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/engine/VTGateExecuteInTxnRollback", "return(true)")
framework.QueryNoError(t, targetCluster.WescaleDb, getBranchPrepareMergeBackCMD())
rowsShouleBeEmpty := framework.QueryNoError(t, targetCluster.WescaleDb, "select * from mysql.branch_patch")
defer rowsShouleBeEmpty.Close()
assert.Equal(t, false, rowsShouleBeEmpty.Next())
framework.DisableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/engine/VTGateExecuteInTxnRollback")

framework.EnableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchInsertMergeBackDDLError", "return(true)")
framework.ExecWithErrorContains(t, targetCluster.WescaleDb, "failpoint", getBranchPrepareMergeBackCMD())
expectBranchStatus(t, "origin", "preparing")
Expand Down
6 changes: 6 additions & 0 deletions go/vt/failpointkey/failpoint_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ var (
Name: "BranchExecuteMergeBackDDLError",
ExampleStr: "return(true)",
}
VTGateExecuteInTxnRollback = FailpointValue{
FullName: "vitess.io/vitess/go/vt/vtgate/engine/VTGateExecuteInTxnRollback",
Name: "VTGateExecuteInTxnRollback",
ExampleStr: "return(true)",
}
)

func init() {
Expand All @@ -108,4 +113,5 @@ func init() {
FailpointTable[BranchApplySnapshotError.FullName] = BranchApplySnapshotError
FailpointTable[BranchInsertMergeBackDDLError.FullName] = BranchInsertMergeBackDDLError
FailpointTable[BranchExecuteMergeBackDDLError.FullName] = BranchExecuteMergeBackDDLError
FailpointTable[VTGateExecuteInTxnRollback.FullName] = VTGateExecuteInTxnRollback
}
25 changes: 18 additions & 7 deletions go/vt/vtgate/engine/branch_vtgate_mysql_serivce.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package engine
import (
"context"
"fmt"
"github.com/pingcap/failpoint"
"vitess.io/vitess/go/vt/failpointkey"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vtgate/branch"
)
Expand All @@ -12,14 +14,17 @@ type VTGateMysqlService struct {
}

func (v *VTGateMysqlService) Query(query string) (branch.Rows, error) {
oldTarget := v.VCursor.Session().GetTarget()
defer v.VCursor.Session().SetTarget(oldTarget, true)

err := v.VCursor.Session().SetTarget("mysql", true)
if err != nil {
return nil, err
}

// AUTOCOMMIT is used to run the statement as autocommitted transaction.
// AUTOCOMMIT = 3;
rst, err := v.VCursor.Execute(context.Background(), "Execute", query, make(map[string]*querypb.BindVariable), true, 3)
rst, err := v.VCursor.Execute(context.Background(), "Branch Query", query, make(map[string]*querypb.BindVariable), true, 3)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -55,7 +60,7 @@ func (v *VTGateMysqlService) Exec(database, query string) (*branch.Result, error
}
}

rst, err := v.VCursor.Execute(context.Background(), "Execute", query, make(map[string]*querypb.BindVariable), true, 3)
rst, err := v.VCursor.Execute(context.Background(), "Branch Execute", query, make(map[string]*querypb.BindVariable), true, 3)
if err != nil {
return nil, err
}
Expand All @@ -64,28 +69,34 @@ func (v *VTGateMysqlService) Exec(database, query string) (*branch.Result, error
}

func (v *VTGateMysqlService) ExecuteInTxn(queries ...string) error {
oldTarget := v.VCursor.Session().GetTarget()
defer v.VCursor.Session().SetTarget(oldTarget, true)

err := v.VCursor.Session().SetTarget("mysql", true)
if err != nil {
return err
}
first := true
defer v.VCursor.Execute(context.Background(), "Execute", "ROLLBACK;", make(map[string]*querypb.BindVariable), true, 0)
defer v.VCursor.Execute(context.Background(), "Branch ExecuteInTxn", "ROLLBACK;", make(map[string]*querypb.BindVariable), true, 0)
for _, query := range queries {
if first {
_, err := v.VCursor.Execute(context.Background(), "Execute", "start transaction;", make(map[string]*querypb.BindVariable), true, 0)
_, err := v.VCursor.Execute(context.Background(), "Branch ExecuteInTxn", "start transaction;", make(map[string]*querypb.BindVariable), true, 0)
if err != nil {
return err
}
first = false
}
// NORMAL is the default commit order.
// NORMAL = 0;
_, err := v.VCursor.Execute(context.Background(), "Execute", query, make(map[string]*querypb.BindVariable), true, 0)
_, err := v.VCursor.Execute(context.Background(), "Branch ExecuteInTxn", query, make(map[string]*querypb.BindVariable), true, 0)
if err != nil {
return err
}
}

_, err = v.VCursor.Execute(context.Background(), "Execute", "COMMIT;", make(map[string]*querypb.BindVariable), true, 0)
failpoint.Inject(failpointkey.VTGateExecuteInTxnRollback.Name, func() {
_, err := v.VCursor.Execute(context.Background(), "Branch ExecuteInTxn", "ROLLBACK;", make(map[string]*querypb.BindVariable), true, 0)
failpoint.Return(err)
})
_, err = v.VCursor.Execute(context.Background(), "Branch ExecuteInTxn", "COMMIT;", make(map[string]*querypb.BindVariable), true, 0)
return err
}

0 comments on commit cd1f489

Please sign in to comment.