Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Workflow: Scope vindex names correctly when target and source keyspace have different names #16769

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package vreplication

import (
"fmt"
"strings"
"testing"

"github.com/tidwall/gjson"

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -165,11 +168,18 @@ func TestVtctlMigrate(t *testing.T) {
// However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and
// hence the VTDATAROOT env variable gets overwritten.
// Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT
func TestVtctldMigrate(t *testing.T) {
func TestVtctldMigrateUnsharded(t *testing.T) {
Copy link
Contributor

@mattlord mattlord Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, better to update this here as well:

test/config.json:                       "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldMigrate", "-timeout", "30m"],

It should still get run because the value is a regex but I'm not sure that we've intentionally relied on that in the CI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed name of second test to use TestVtctldMigrate prefix. go test is run here, so all tests with that prefix will be run. It keeps the config.json smaller, esp since we are running both tests on same shard.

vc = NewVitessCluster(t, nil)

oldDefaultReplicas := defaultReplicas
oldDefaultRdonly := defaultRdonly
defaultReplicas = 0
defaultRdonly = 0
defer func() {
defaultReplicas = oldDefaultReplicas
defaultRdonly = oldDefaultRdonly
}()

defer vc.TearDown()

defaultCell := vc.Cells[vc.CellNames[0]]
Expand Down Expand Up @@ -299,3 +309,80 @@ func TestVtctldMigrate(t *testing.T) {
require.Errorf(t, err, "there is no vitess cluster named ext1")
})
}

// TestVtctldMigrate adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external"
// cluster with keyspace rating.
func TestVtctldMigrateSharded(t *testing.T) {
oldDefaultReplicas := defaultReplicas
oldDefaultRdonly := defaultRdonly
defaultReplicas = 1
defaultRdonly = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set this? I don't see where we use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is that global variable that we have not refactored yet. It is used in creating future clusters in the test. Added code to keep previous values and reset in a defer.

defer func() {
defaultReplicas = oldDefaultReplicas
defaultRdonly = oldDefaultRdonly
}()

setSidecarDBName("_vt")
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
vc = setupCluster(t)
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
defer vc.TearDown()
setupCustomerKeyspace(t)
createMoveTablesWorkflow(t, "customer,Lead,datze,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)

var err error
// create external cluster
extCell := "extcell1"
extCells := []string{extCell}
extVc := NewVitessCluster(t, &clusterOptions{
cells: extCells,
clusterConfig: externalClusterConfig,
})
defer extVc.TearDown()

setupExtKeyspace(t, extVc, "rating", extCell)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "-80")
require.NoError(t, err)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "80-")
require.NoError(t, err)
verifyClusterHealth(t, extVc)
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort)
defer extVtgateConn.Close()

currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate
var output string
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--name=external", "--topo-type=etcd2",
fmt.Sprintf("--topo-server=localhost:%d", vc.ClusterConfig.topoPort), "--topo-root=/vitess/global"); err != nil {
require.FailNow(t, "Mount command failed with %+v : %s\n", err, output)
}
ksWorkflow := "rating.e1"
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "rating", "--workflow", "e1",
"create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1",
"--tablet-types=primary,replica"); err != nil {
require.FailNow(t, "Migrate command failed with %+v : %s\n", err, output)
}
waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
// this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster
vc = extVc
doVtctldclientVDiff(t, "rating", "e1", "zone1", nil)
}

func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) {
numReplicas := 1
shards := []string{"-80", "80-"}
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","),
customerVSchema, customerSchema, numReplicas, 0, 1200, nil); err != nil {
t.Fatal(err)
}
vtgate := vc.Cells[cellName].Vtgates[0]
for _, shard := range shards {
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard)
require.NoError(t, err)
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), numReplicas, waitTimeout))
}
}
15 changes: 14 additions & 1 deletion go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,20 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
var vindexName string
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should check and confirm this expectation and error out if it's not true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is assumed that the vschemas and tables etc are identical on source and target for Migrate. We can/should add all vaidations for this and all other workflow types in another PR. I don't want to add that to this PR.

keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}

subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange)))
inKeyRange := &sqlparser.FuncExpr{
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2134,7 +2134,6 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
if err != nil {
return nil, err
}

// The stream key is target keyspace/tablet alias, e.g. 0/test-0000000100.
// We sort the keys for intuitive and consistent output.
streamKeys := make([]string, 0, len(workflow.ShardStreams))
Expand Down Expand Up @@ -2190,9 +2189,13 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
return resp, nil
}

// GetCopyProgress returns the progress of all tables being copied in the
// workflow.
// GetCopyProgress returns the progress of all tables being copied in the workflow.
func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error) {
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// The logic below expects the source primaries to be in the same cluster as the target.
// For now we don't report progress for Migrate workflows.
return nil, nil
}
getTablesQuery := "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d"
getRowCountQuery := "select table_name, table_rows, data_length from information_schema.tables where table_schema = %s and table_name in (%s)"
tables := make(map[string]bool)
Expand Down
14 changes: 13 additions & 1 deletion go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,19 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
var vindexName string
if mz.getWorkflowType() == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}
subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral("{{.keyrange}}"))
inKeyRange := &sqlparser.FuncExpr{
Expand Down
2 changes: 1 addition & 1 deletion test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@
},
"vreplication_materialize": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMaterialize"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "Materialize"],
"Command": [],
"Manual": false,
"Shard": "vreplication_partial_movetables_and_materialize",
Expand Down
Loading