Skip to content

Commit

Permalink
fix: pass e2e; fix vc vschema and target problem
Browse files Browse the repository at this point in the history
  • Loading branch information
newborn22 committed Dec 15, 2024
1 parent 40ca83c commit b3600cd
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 161 deletions.
304 changes: 155 additions & 149 deletions endtoend/branch/branch_test.go

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions go/vt/vtgate/engine/branch_vtgate_mysql_serivce.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type VTGateMysqlService struct {
}

func (v *VTGateMysqlService) Query(query string) (branch.Rows, error) {
err := v.VCursor.Session().SetTarget("mysql", true)
if err != nil {
return nil, err
}

// AUTOCOMMIT is used to run the statement as autocommitted transaction.
// AUTOCOMMIT = 3;
rst, err := v.VCursor.Execute(context.Background(), "Execute", query, make(map[string]*querypb.BindVariable), true, 3)
Expand Down Expand Up @@ -41,7 +46,10 @@ func (v *VTGateMysqlService) Query(query string) (branch.Rows, error) {

func (v *VTGateMysqlService) Exec(database, query string) (*branch.Result, error) {
if database != "" {
err := v.VCursor.Session().SetTarget(database)
oldTarget := v.VCursor.Session().GetTarget()
defer v.VCursor.Session().SetTarget(oldTarget, true)

err := v.VCursor.Session().SetTarget(database, false)
if err != nil {
return nil, err
}
Expand All @@ -56,6 +64,10 @@ func (v *VTGateMysqlService) Exec(database, query string) (*branch.Result, error
}

func (v *VTGateMysqlService) ExecuteInTxn(queries ...string) error {
err := v.VCursor.Session().SetTarget("mysql", true)
if err != nil {
return err
}
first := true
defer v.VCursor.Execute(context.Background(), "Execute", "ROLLBACK;", make(map[string]*querypb.BindVariable), true, 0)
for _, query := range queries {
Expand All @@ -74,6 +86,6 @@ func (v *VTGateMysqlService) ExecuteInTxn(queries ...string) error {
}
}

_, err := v.VCursor.Execute(context.Background(), "Execute", "COMMIT;", make(map[string]*querypb.BindVariable), true, 0)
_, err = v.VCursor.Execute(context.Background(), "Execute", "COMMIT;", make(map[string]*querypb.BindVariable), true, 0)
return err
}
5 changes: 5 additions & 0 deletions go/vt/vtgate/engine/dbddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (c *DBDDL) createDatabase(ctx context.Context, vcursor VCursor, plugin DBDD
break
}
}

newKSSchema := &vindexes.KeyspaceSchema{Keyspace: &vindexes.Keyspace{Name: c.name, Sharded: false}, Tables: make(map[string]*vindexes.Table)}
vcursor.GetExecutorVSchema().Keyspaces[c.name] = newKSSchema
//vcursor.GetVSchema().Keyspaces[c.name] = newKSSchema

return &sqltypes.Result{RowsAffected: 1}, nil
}

Expand Down
33 changes: 29 additions & 4 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ import (
"strings"
"sync"
"testing"
"vitess.io/vitess/go/vt/discovery"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -106,6 +105,16 @@ func (t *noopVCursor) AnyAdvisoryLockTaken() bool {
panic("implement me")
}

func (t *noopVCursor) GetExecutorVSchema() *vindexes.VSchema {
// TODO implement me
panic("implement me")
}

func (t *noopVCursor) GetVSchema() *vindexes.VSchema {
// TODO implement me
panic("implement me")
}

func (t *noopVCursor) AddAdvisoryLock(_ string) {
// TODO implement me
panic("implement me")
Expand Down Expand Up @@ -341,7 +350,11 @@ func (t *noopVCursor) SetConsolidator(querypb.ExecuteOptions_Consolidator) {
panic("implement me")
}

func (t *noopVCursor) SetTarget(string) error {
func (t *noopVCursor) SetTarget(target string, check bool) error {
panic("implement me")
}

func (t *noopVCursor) GetTarget() string {
panic("implement me")
}

Expand Down Expand Up @@ -519,11 +532,15 @@ func (f *loggingVCursor) FindHealthyPrimaryTablet() (*discovery.TabletHealth, er
panic("implement me")
}

func (f *loggingVCursor) SetTarget(target string) error {
func (f *loggingVCursor) SetTarget(target string, check bool) error {
f.log = append(f.log, fmt.Sprintf("Target set to %s", target))
return nil
}

func (f *loggingVCursor) GetTarget() string {
panic("implement me")
}

func (f *loggingVCursor) GetKeyspace() string {
return ""
}
Expand Down Expand Up @@ -815,6 +832,14 @@ func (f *loggingVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion
panic("implement me")
}

func (f *loggingVCursor) GetExecutorVSchema() *vindexes.VSchema {
panic("implement me")
}

func (f *loggingVCursor) GetVSchema() {
panic("implement me")
}

func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) {
f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl)))
return f.tableRoutes.tbl, nil
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,20 @@ type (
ReloadExec(ctx context.Context, command sqlparser.ReloadType) (*sqltypes.Result, error)

FindHealthyPrimaryTablet() (*discovery.TabletHealth, error)

GetExecutorVSchema() *vindexes.VSchema

GetVSchema() *vindexes.VSchema
}

// SessionActions gives primitives ability to interact with the session state
SessionActions interface {
// RecordWarning stores the given warning in the current session
RecordWarning(warning *querypb.QueryWarning)

SetTarget(target string) error
SetTarget(target string, check bool) error

GetTarget() string

SetUDV(key string, value any) error

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/update_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (updTarget *UpdateTarget) GetTableName() string {

// TryExecute implements the Primitive interface
func (updTarget *UpdateTarget) TryExecute(_ context.Context, vcursor VCursor, _ map[string]*query.BindVariable, _ bool) (*sqltypes.Result, error) {
err := vcursor.Session().SetTarget(updTarget.Target)
err := vcursor.Session().SetTarget(updTarget.Target, true)
if err != nil {
return nil, err
}
Expand Down
16 changes: 13 additions & 3 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,13 +707,19 @@ func (vc *vcursorImpl) Session() engine.SessionActions {
return vc
}

func (vc *vcursorImpl) SetTarget(target string) error {
func (vc *vcursorImpl) GetTarget() string {
return vc.safeSession.TargetString
}

func (vc *vcursorImpl) SetTarget(target string, check bool) error {
keyspace, tabletType, _, err := topoprotopb.ParseDestination(target, defaultTabletType)
if err != nil {
return err
}
if _, ok := vc.vschema.Keyspaces[keyspace]; !ignoreKeyspace(keyspace) && !ok {
return vterrors.VT05003(keyspace)
if check {
if _, ok := vc.vschema.Keyspaces[keyspace]; !ignoreKeyspace(keyspace) && !ok {
return vterrors.VT05003(keyspace)
}
}

if vc.safeSession.InTransaction() && tabletType != topodatapb.TabletType_PRIMARY {
Expand Down Expand Up @@ -1210,6 +1216,10 @@ func (vc *vcursorImpl) ShowExec(ctx context.Context, command sqlparser.ShowComma
}
}

func (vc *vcursorImpl) GetExecutorVSchema() *vindexes.VSchema {
return vc.executor.VSchema()
}

func (vc *vcursorImpl) GetVSchema() *vindexes.VSchema {
return vc.vschema
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/vcursor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestSetTarget(t *testing.T) {
t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) {
vc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), "", sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4)
vc.vschema = tc.vschema
err := vc.SetTarget(tc.targetString)
err := vc.SetTarget(tc.targetString, true)
if tc.expectedError == "" {
require.NoError(t, err)
require.Equal(t, vc.safeSession.TargetString, tc.targetString)
Expand Down

0 comments on commit b3600cd

Please sign in to comment.