-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate Workflow: Scope vindex names correctly when target and source keyspace have different names #16769
base: main
Are you sure you want to change the base?
Migrate Workflow: Scope vindex names correctly when target and source keyspace have different names #16769
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,13 @@ package vreplication | |
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/tidwall/gjson" | ||
|
||
"vitess.io/vitess/go/test/endtoend/cluster" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"vitess.io/vitess/go/mysql" | ||
|
@@ -165,11 +168,18 @@ func TestVtctlMigrate(t *testing.T) { | |
// However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and | ||
// hence the VTDATAROOT env variable gets overwritten. | ||
// Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT | ||
func TestVtctldMigrate(t *testing.T) { | ||
func TestVtctldMigrateUnsharded(t *testing.T) { | ||
vc = NewVitessCluster(t, nil) | ||
|
||
oldDefaultReplicas := defaultReplicas | ||
oldDefaultRdonly := defaultRdonly | ||
defaultReplicas = 0 | ||
defaultRdonly = 0 | ||
defer func() { | ||
defaultReplicas = oldDefaultReplicas | ||
defaultRdonly = oldDefaultRdonly | ||
}() | ||
|
||
defer vc.TearDown() | ||
|
||
defaultCell := vc.Cells[vc.CellNames[0]] | ||
|
@@ -299,3 +309,80 @@ func TestVtctldMigrate(t *testing.T) { | |
require.Errorf(t, err, "there is no vitess cluster named ext1") | ||
}) | ||
} | ||
|
||
// TestVtctldMigrate adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name | ||
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external" | ||
// cluster with keyspace rating. | ||
func TestVtctldMigrateSharded(t *testing.T) { | ||
oldDefaultReplicas := defaultReplicas | ||
oldDefaultRdonly := defaultRdonly | ||
defaultReplicas = 1 | ||
defaultRdonly = 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why set this? I don't see where we use it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is that global variable that we have not refactored yet. It is used in creating future clusters in the test. Added code to keep previous values and reset in a defer. |
||
defer func() { | ||
defaultReplicas = oldDefaultReplicas | ||
defaultRdonly = oldDefaultRdonly | ||
}() | ||
|
||
setSidecarDBName("_vt") | ||
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables | ||
vc = setupCluster(t) | ||
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) | ||
defer vtgateConn.Close() | ||
defer vc.TearDown() | ||
setupCustomerKeyspace(t) | ||
createMoveTablesWorkflow(t, "customer,Lead,datze,customer2") | ||
tstWorkflowSwitchReadsAndWrites(t) | ||
tstWorkflowComplete(t) | ||
|
||
var err error | ||
// create external cluster | ||
extCell := "extcell1" | ||
extCells := []string{extCell} | ||
extVc := NewVitessCluster(t, &clusterOptions{ | ||
cells: extCells, | ||
clusterConfig: externalClusterConfig, | ||
}) | ||
defer extVc.TearDown() | ||
|
||
setupExtKeyspace(t, extVc, "rating", extCell) | ||
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "-80") | ||
require.NoError(t, err) | ||
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "80-") | ||
require.NoError(t, err) | ||
verifyClusterHealth(t, extVc) | ||
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort) | ||
defer extVtgateConn.Close() | ||
|
||
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate | ||
var output string | ||
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--name=external", "--topo-type=etcd2", | ||
fmt.Sprintf("--topo-server=localhost:%d", vc.ClusterConfig.topoPort), "--topo-root=/vitess/global"); err != nil { | ||
require.FailNow(t, "Mount command failed with %+v : %s\n", err, output) | ||
} | ||
ksWorkflow := "rating.e1" | ||
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate", | ||
"--target-keyspace", "rating", "--workflow", "e1", | ||
"create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1", | ||
"--tablet-types=primary,replica"); err != nil { | ||
require.FailNow(t, "Migrate command failed with %+v : %s\n", err, output) | ||
} | ||
waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) | ||
// this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster | ||
vc = extVc | ||
doVtctldclientVDiff(t, "rating", "e1", "zone1", nil) | ||
} | ||
|
||
func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) { | ||
numReplicas := 1 | ||
shards := []string{"-80", "80-"} | ||
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","), | ||
customerVSchema, customerSchema, numReplicas, 0, 1200, nil); err != nil { | ||
t.Fatal(err) | ||
} | ||
vtgate := vc.Cells[cellName].Vtgates[0] | ||
for _, shard := range shards { | ||
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard) | ||
require.NoError(t, err) | ||
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), numReplicas, waitTimeout)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -227,7 +227,20 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard * | |
for _, mappedCol := range mappedCols { | ||
subExprs = append(subExprs, mappedCol) | ||
} | ||
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) | ||
var vindexName string | ||
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate { | ||
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the | ||
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source. | ||
// Note: it is expected that the source and target keyspaces have the same vindex name and data type. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that we should check and confirm this expectation and error out if it's not true. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is assumed that the vschemas and tables etc are identical on source and target for |
||
keyspace := mz.ms.TargetKeyspace | ||
if mz.ms.ExternalCluster != "" { | ||
keyspace = mz.ms.SourceKeyspace | ||
} | ||
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name) | ||
} else { | ||
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) | ||
} | ||
|
||
subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName)) | ||
subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange))) | ||
inKeyRange := &sqlparser.FuncExpr{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, better to update this here as well:
It should still get run because the value is a regex but I'm not sure that we've intentionally relied on that in the CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed name of second test to use
TestVtctldMigrate
prefix.go test
is run here, so all tests with that prefix will be run. It keeps theconfig.json
smaller, esp since we are running both tests on same shard.