Skip to content

Commit

Permalink
Fix check in reverse traffic to account for keyspace routing rules. A…
Browse files Browse the repository at this point in the history
…dd ReverseTraffic to e2e tests. Some refactor of e2e test.

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Mar 6, 2024
1 parent 3343c15 commit 6ab42cc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
30 changes: 23 additions & 7 deletions go/test/endtoend/vreplication/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ const (

var (
// channels to coordinate the migration workflow
chNotSetup, chNotCreated, chInProgress, chSwitched, chCompleted chan int64
chNotSetup, chNotCreated, chInProgress chan int64
chSwitched, chReversed, chSwitchedAgain, chCompleted chan int64
// counters to keep track of the number of tenants in each state
numSetup, numInProgress, numSwitched, numCompleted atomic.Int64
numSetup, numInProgress, numSwitched, numReversed, numSwitchedAgain, numCompleted atomic.Int64
)

// multiTenantMigration manages the migration of multiple tenants to a single target keyspace.
Expand Down Expand Up @@ -128,7 +129,7 @@ func newMultiTenantMigration(t *testing.T) *multiTenantMigration {
mtm.setTenantMigrationStatus(int64(i), tenantMigrationStatusNotMigrated)
}
channelSize := numTenants + 1 // +1 to make sure the channels never block
for _, ch := range []*chan int64{&chNotSetup, &chNotCreated, &chInProgress, &chSwitched, &chCompleted} {
for _, ch := range []*chan int64{&chNotSetup, &chNotCreated, &chInProgress, &chReversed, &chSwitchedAgain, &chSwitched, &chCompleted} {
*ch = make(chan int64, channelSize)
}
return mtm
Expand Down Expand Up @@ -227,12 +228,25 @@ func (mtm *multiTenantMigration) insertSomeData(t *testing.T, tenantId int64, so
func (mtm *multiTenantMigration) switchTraffic(tenantId int64) {
t := mtm.t
sourceAliasKeyspace := getSourceAliasKeyspace(tenantId)
sourceKeyspaceName := getSourceKeyspace(tenantId)
mt := mtm.activeMoveTables[tenantId]
ksWorkflow := fmt.Sprintf("%s.%s", mtm.targetKeyspace, mt.workflowName)
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
// we intentionally insert first into the source alias keyspace and then the source keyspace to test routing rules for both.
mtm.insertSomeData(t, tenantId, sourceAliasKeyspace, numAdditionalRowsPerTenant)
mt.SwitchReadsAndWrites()
mtm.insertSomeData(t, tenantId, sourceAliasKeyspace, numAdditionalRowsPerTenant)
mtm.insertSomeData(t, tenantId, sourceKeyspaceName, numAdditionalRowsPerTenant)
}

func (mtm *multiTenantMigration) reverseTraffic(tenantId int64) {
t := mtm.t
mt := mtm.activeMoveTables[tenantId]
sourceKeyspaceName := getSourceKeyspace(tenantId)
reverseKsWorkflow := fmt.Sprintf("%s.%s_reverse", sourceKeyspaceName, mt.workflowName)
waitForWorkflowState(t, vc, reverseKsWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
mtm.insertSomeData(t, tenantId, targetKeyspaceName, numAdditionalRowsPerTenant)
mt.ReverseReadsAndWrites()
mtm.insertSomeData(t, tenantId, targetKeyspaceName, numAdditionalRowsPerTenant)
}

func (mtm *multiTenantMigration) complete(tenantId int64) {
Expand Down Expand Up @@ -277,7 +291,8 @@ func TestMultiTenant(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
t.Run("Verify all rows have been migrated", func(t *testing.T) {
totalRowsInsertedPerTenant := numInitialRowsPerTenant + numAdditionalRowsPerTenant*2
numAdditionalInserts := 6 // 2 each for Switch, Reverse, and SwitchAgain
totalRowsInsertedPerTenant := numInitialRowsPerTenant + numAdditionalRowsPerTenant*numAdditionalInserts
totalRowsInserted := totalRowsInsertedPerTenant * numTenants
totalActualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", mtm.targetKeyspace, "t1"))
require.Equal(t, totalRowsInserted, totalActualRowsInserted)
Expand Down Expand Up @@ -305,7 +320,6 @@ func (mtm *multiTenantMigration) doStuff(name string, chIn, chOut chan int64, co
}

// run starts the migration process for all tenants. It starts concurrent

func (mtm *multiTenantMigration) run() {
go mtm.doStuff("Setup tenant keyspace/schemas", chNotSetup, chNotCreated, &numSetup, mtm.setup)
for i := int64(1); i <= numTenants; i++ {
Expand All @@ -320,5 +334,7 @@ func (mtm *multiTenantMigration) run() {

go mtm.doStuff("Start Migrations", chNotCreated, chInProgress, &numInProgress, mtm.start)
go mtm.doStuff("Switch Traffic", chInProgress, chSwitched, &numSwitched, mtm.switchTraffic)
go mtm.doStuff("Mark Migrations Complete", chSwitched, chCompleted, &numCompleted, mtm.complete)
go mtm.doStuff("Reverse Traffic", chSwitched, chReversed, &numReversed, mtm.reverseTraffic)
go mtm.doStuff("Switch Traffic Again", chReversed, chSwitchedAgain, &numSwitchedAgain, mtm.switchTraffic)
go mtm.doStuff("Mark Migrations Complete", chSwitchedAgain, chCompleted, &numCompleted, mtm.complete)
}
16 changes: 15 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3166,7 +3166,21 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
if !switchReplica && !switchRdonly {
return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
}
if !ts.isPartialMigration { // shard level traffic switching is all or nothing

// For partial (shard-by-shard migrations) and if keyspace routing rules are used, traffic for all tablet types
// is expected to be switched at once. For other MoveTables migrations where we use table routing rules
// replica/rdonly traffic can be switched first and then primary traffic can be switched later.
trafficSwitchingIsAllOrNothing := false
switch {
case !ts.isPartialMigration:

Check warning on line 3175 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3173-L3175

Added lines #L3173 - L3175 were not covered by tests
// shard level traffic switching is all or nothing
trafficSwitchingIsAllOrNothing = true
case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.options.UseKeyspaceRoutingRules:

Check warning on line 3178 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3177-L3178

Added lines #L3177 - L3178 were not covered by tests
// keyspace routing rules are used, traffic is all or nothing per keyspace
trafficSwitchingIsAllOrNothing = true

Check warning on line 3180 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3180

Added line #L3180 was not covered by tests
}

if !trafficSwitchingIsAllOrNothing {

Check warning on line 3183 in go/vt/vtctl/workflow/server.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/server.go#L3183

Added line #L3183 was not covered by tests
if direction == DirectionBackward && switchReplica && len(state.ReplicaCellsSwitched) == 0 {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
}
Expand Down

0 comments on commit 6ab42cc

Please sign in to comment.