Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Initialize Sequence Tables Used By Tables Being Moved #13656

Merged
merged 62 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
5d5e6d7
Get initial PoC implementation working for initializing sequence
mattlord Jul 26, 2023
31d3895
Reset cache per table, only when needed
mattlord Jul 28, 2023
ed62652
Gather sequence details after keyspace lock but before blocking writes
mattlord Jul 28, 2023
5ac2e4d
Check for context cancellation often
mattlord Jul 28, 2023
52ab8b7
Skip sequence work for sharded to sharded migrations
mattlord Jul 28, 2023
f2b7313
Efficiency improvements
mattlord Jul 28, 2023
ca06b32
More tweaks
mattlord Jul 28, 2023
91112ae
Address todo
mattlord Jul 28, 2023
0c3e0f6
Search keyspaces in parallel
mattlord Jul 29, 2023
fcab612
Parallelize init work
mattlord Jul 29, 2023
b4037da
Consolidate selects
mattlord Jul 29, 2023
f1c95b7
Minor tweaks after self review
mattlord Jul 30, 2023
fecfc49
Add flag and switcher ifc impl
mattlord Jul 30, 2023
72a5c21
Add flag and vtctldclient impl
mattlord Jul 30, 2023
4ccd341
Merge remote-tracking branch 'origin/main' into vrepl_seq_init
mattlord Jul 30, 2023
142109a
Add missing returns
mattlord Jul 30, 2023
e92b498
Concurrency improvements
mattlord Jul 31, 2023
65c17d1
Update wrangler unit tests
mattlord Jul 31, 2023
cacf7d9
Enable new flag in e2e test
mattlord Jul 31, 2023
e352088
Deflake SwitchTraffic dry run unit tests
mattlord Jul 31, 2023
bee2a67
Tweakin' and deflakin'
mattlord Jul 31, 2023
252b791
Add e2e test integration
mattlord Aug 1, 2023
36f9059
Tweaks and bug fixes
mattlord Aug 1, 2023
17227ff
Fix vtctldclient bug and enable usage for ReverseTraffic
mattlord Aug 1, 2023
026bc4b
Adjust VDiff2 e2e tests
mattlord Aug 1, 2023
dbf7d84
Move keyspace search concurrency to errgroup
mattlord Aug 1, 2023
c40e6cc
Move init function to errgroup
mattlord Aug 2, 2023
c0e5b97
Move vtctldclient impl to errgroup
mattlord Aug 2, 2023
331d60f
Fix duh bug
mattlord Aug 2, 2023
f52ac01
Use slices.Sort in dry run and remove DEBUG logs
mattlord Aug 2, 2023
7ac0d7a
Merge remote-tracking branch 'origin/main' into vrepl_seq_init
mattlord Aug 2, 2023
4f428d7
WiP unit test work
mattlord Aug 2, 2023
4226af4
Address some review comments
mattlord Aug 3, 2023
2e5822a
Address unit test race
mattlord Aug 3, 2023
2fed344
Finish unit test todos
mattlord Aug 3, 2023
8a77582
use correct ctx
mattlord Aug 3, 2023
bae88b8
Merge remote-tracking branch 'origin/main' into vrepl_seq_init
mattlord Aug 3, 2023
28af2ee
Implement a comment suggestion
mattlord Aug 4, 2023
2ca52b2
Make same switch to Cut in wrangler impl
mattlord Aug 4, 2023
eb667fc
Align vtctlclient help output with reality
mattlord Aug 4, 2023
84a0bdf
Merge remote-tracking branch 'origin/main' into vrepl_seq_init
mattlord Aug 4, 2023
a82d853
Fix after merging main
mattlord Aug 4, 2023
a17bef6
Address another review comment
mattlord Aug 5, 2023
29f978d
Set the backing table's default DB name when fully qualified
mattlord Aug 5, 2023
71ff024
Minor tweaks after self review of recent changes
mattlord Aug 6, 2023
1898433
Merge remote-tracking branch 'origin/main' into vrepl_seq_init
mattlord Aug 8, 2023
22e21d0
Deflake tests that use tabletconntest.SetProtocol()
mattlord Aug 8, 2023
341b8be
Merge remote-tracking branch 'origin/main' into vrepl_seq_init
mattlord Aug 8, 2023
4a1f2eb
Register grpc dialer for healthcheck test
mattlord Aug 8, 2023
4b53f66
More deflaking
mattlord Aug 8, 2023
a04b965
Add another bounds safety check
mattlord Aug 8, 2023
c00925c
Try to deflake the vtctldclient MoveTables unit test
mattlord Aug 8, 2023
89295b5
Improve and unify error handling when switching reads/writes
mattlord Aug 9, 2023
9600566
Address review comments
mattlord Aug 9, 2023
fb01b93
Use closure to more safely manage mutex
mattlord Aug 9, 2023
0663f6f
Forgot to remove the unlock...
mattlord Aug 9, 2023
c147748
Improve error msg and update unit test expectations
mattlord Aug 9, 2023
a875009
Tweaks for unit tests
mattlord Aug 9, 2023
7de744f
We don't need the explicit ignore any more.
mattlord Aug 9, 2023
9202a4c
Attempts to do more deflaking
mattlord Aug 9, 2023
7f5d0a1
Squeeze in another vtctldclient fix
mattlord Aug 9, 2023
4d6d05c
Building on the last commit to cleanup option handling
mattlord Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions go/cmd/vtctldclient/command/movetables.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,14 @@ var (
StopAfterCopy bool
}{}
moveTablesSwitchTrafficOptions = struct {
Cells []string
TabletTypes []topodatapb.TabletType
MaxReplicationLagAllowed time.Duration
EnableReverseReplication bool
Timeout time.Duration
DryRun bool
Direction workflow.TrafficSwitchDirection
Cells []string
TabletTypes []topodatapb.TabletType
MaxReplicationLagAllowed time.Duration
EnableReverseReplication bool
Timeout time.Duration
DryRun bool
InitializeTargetSequences bool
Direction workflow.TrafficSwitchDirection
}{}
)

Expand Down Expand Up @@ -450,14 +451,15 @@ func commandMoveTablesSwitchTraffic(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: moveTablesOptions.TargetKeyspace,
Workflow: moveTablesOptions.Workflow,
TabletTypes: moveTablesSwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(moveTablesSwitchTrafficOptions.Timeout),
DryRun: moveTablesSwitchTrafficOptions.DryRun,
EnableReverseReplication: moveTablesSwitchTrafficOptions.EnableReverseReplication,
Direction: int32(moveTablesSwitchTrafficOptions.Direction),
Keyspace: moveTablesOptions.TargetKeyspace,
Workflow: moveTablesOptions.Workflow,
TabletTypes: moveTablesSwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(moveTablesSwitchTrafficOptions.Timeout),
DryRun: moveTablesSwitchTrafficOptions.DryRun,
EnableReverseReplication: moveTablesSwitchTrafficOptions.EnableReverseReplication,
InitializeTargetSequences: moveTablesSwitchTrafficOptions.InitializeTargetSequences,
Direction: int32(moveTablesSwitchTrafficOptions.Direction),
}
resp, err := client.WorkflowSwitchTraffic(commandCtx, req)
if err != nil {
Expand Down Expand Up @@ -532,14 +534,16 @@ func init() {
MoveTablesSwitchTraffic.Flags().Var((*topoproto.TabletTypeListFlag)(&moveTablesSwitchTrafficOptions.TabletTypes), "tablet-types", "Tablet types to switch traffic for")
MoveTablesSwitchTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.Timeout, "timeout", timeoutDefault, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.")
MoveTablesSwitchTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", maxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this")
MoveTablesSwitchTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover")
MoveTablesSwitchTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred")
MoveTablesSwitchTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.InitializeTargetSequences, "initialize-target-sequences", false, "When moving tables from an unsharded keyspace to a sharded keyspace, initialize any sequences that are being used on the target when switching writes.")
MoveTables.AddCommand(MoveTablesSwitchTraffic)

MoveTablesReverseTraffic.Flags().StringSliceVarP(&moveTablesSwitchTrafficOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to switch traffic in")
MoveTablesReverseTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred")
MoveTablesReverseTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", maxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this")
MoveTablesReverseTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover")
MoveTablesReverseTraffic.Flags().Var((*topoproto.TabletTypeListFlag)(&moveTablesSwitchTrafficOptions.TabletTypes), "tablet-types", "Tablet types to switch traffic for")
MoveTablesReverseTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.Timeout, "timeout", timeoutDefault, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.")
MoveTablesReverseTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", maxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this")
MoveTablesReverseTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original target keyspace to support switching traffic again")
MoveTablesReverseTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred")
MoveTables.AddCommand(MoveTablesReverseTraffic)
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
switch currentWorkflowType {
case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow:
args = append(args, "--defer-secondary-keys")
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
}
if cells != "" {
Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
insert into customer(cid, name, typ, sport, meta) values(1, 'Jøhn "❤️" Rizzolo',1,'football,baseball','{}');
insert into customer(cid, name, typ, sport, meta) values(2, 'Paül','soho','cricket',convert(x'7b7d' using utf8mb4));
insert into customer(cid, name, typ, sport, blb) values(3, 'ringo','enterprise','','blob data');
-- We use a high cid value here to test the target sequence initialization.
insert into customer(cid, name, typ, sport, blb) values(999999, 'ringo','enterprise','','blob data');
insert into merchant(mname, category) values('Monoprice', 'eléctronics');
insert into merchant(mname, category) values('newegg', 'elec†ronics');
insert into product(pid, description) values(1, 'keyböard ⌨️');
Expand Down
12 changes: 6 additions & 6 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ var testCases = []*testCase{
tabletBaseID: 200,
tables: "customer,Lead,Lead-1",
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(91234, 'Testy McTester', 'soho')`,
retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(92234, 'Testy McTester (redux)', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(1992234, 'Testy McTester (redux)', 'enterprise')`,
testCLIErrors: true, // test for errors in the simplest workflow
testCLICreateWait: true, // test wait on create feature against simplest workflow
},
Expand All @@ -81,9 +81,9 @@ var testCases = []*testCase{
targetShards: "-40,40-a0,a0-",
tabletBaseID: 400,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(93234, 'Testy McTester Jr', 'enterprise'), (94234, 'Testy McTester II', 'enterprise')`,
retryInsert: `insert into customer(cid, name, typ) values(1993234, 'Testy McTester Jr', 'enterprise'), (1993235, 'Testy McTester II', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(95234, 'Testy McTester III', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(1994234, 'Testy McTester III', 'enterprise')`,
stop: true,
},
{
Expand All @@ -96,9 +96,9 @@ var testCases = []*testCase{
targetShards: "0",
tabletBaseID: 700,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(96234, 'Testy McTester IV', 'enterprise')`,
retryInsert: `insert into customer(cid, name, typ) values(1995234, 'Testy McTester IV', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(97234, 'Testy McTester V', 'enterprise'), (98234, 'Testy McTester VI', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(1996234, 'Testy McTester V', 'enterprise'), (1996235, 'Testy McTester VI', 'enterprise')`,
stop: true,
},
}
Expand Down
71 changes: 46 additions & 25 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,15 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
}
switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)

// The original unsharded customer data included an insert with the
// vindex column (cid) of 999999, so the backing sequence table should
// now have a next_id of 1000000 after SwitchTraffic.
res := execVtgateQuery(t, vtgateConn, sourceKs, "select next_id from customer_seq where id = 0")
require.Equal(t, "1000000", res.Rows[0][0].ToString())

if withOpenTx && commit != nil {
commit(t)
}
Expand Down Expand Up @@ -1358,11 +1365,18 @@ func catchup(t *testing.T, vttablet *cluster.VttabletProcess, workflow, info str
func moveTablesAction(t *testing.T, action, cell, workflow, sourceKs, targetKs, tables string, extraFlags ...string) {
var err error
args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, action}
if strings.EqualFold(action, strings.ToLower(workflowActionCreate)) {
switch strings.ToLower(action) {
case strings.ToLower(workflowActionCreate):
extraFlags = append(extraFlags, "--source-keyspace="+sourceKs, "--tables="+tables, "--cells="+cell, "--tablet-types=primary,replica,rdonly")
case strings.ToLower(workflowActionSwitchTraffic):
extraFlags = append(extraFlags, "--initialize-target-sequences")
}
args = append(args, extraFlags...)
err = vc.VtctldClient.ExecuteCommand(args...)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
if output != "" {
fmt.Printf("Output of vtctldclient MoveTables %s for %s workflow:\n++++++\n%s\n--------\n",
action, workflow, output)
}
if err != nil {
t.Fatalf("MoveTables %s command failed with %+v\n", action, err)
}
Expand Down Expand Up @@ -1396,8 +1410,8 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
}

func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse bool) {
if workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_MoveTables)] &&
workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_Reshard)] {
if workflowType != binlogdatapb.VReplicationWorkflowType_MoveTables.String() &&
workflowType != binlogdatapb.VReplicationWorkflowType_Reshard.String() {
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
Expand All @@ -1415,6 +1429,34 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
}

func switchWrites(t *testing.T, workflowType, ksWorkflow string, reverse bool) {
if workflowType != binlogdatapb.VReplicationWorkflowType_MoveTables.String() &&
workflowType != binlogdatapb.VReplicationWorkflowType_Reshard.String() {
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
command := "SwitchTraffic"
if reverse {
command = "ReverseTraffic"
}
const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1
// Use vtctldclient for MoveTables SwitchTraffic ~ 50% of the time.
if workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables.String() && time.Now().Second()%2 == 0 {
parts := strings.Split(ksWorkflow, ".")
require.Equal(t, 2, len(parts))
moveTablesAction(t, command, defaultCellName, parts[1], sourceKs, parts[0], "", "--timeout="+SwitchWritesTimeout, "--tablet-types=primary")
return
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary",
"--timeout="+SwitchWritesTimeout, "--initialize-target-sequences", command, ksWorkflow)
if output != "" {
fmt.Printf("Output of switching writes with vtctlclient for %s:\n++++++\n%s\n--------\n", ksWorkflow, output)
}
// printSwitchWritesExtraDebug is useful when debugging failures in Switch writes due to corner cases/races
_ = printSwitchWritesExtraDebug
require.NoError(t, err, fmt.Sprintf("Switch writes Error: %s: %s", err, output))
}

func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunResults []string) {
if workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_MoveTables)] &&
workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_Reshard)] {
Expand Down Expand Up @@ -1457,27 +1499,6 @@ func printSwitchWritesExtraDebug(t *testing.T, ksWorkflow, msg string) {
}
}

func switchWrites(t *testing.T, workflowType, ksWorkflow string, reverse bool) {
if workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_MoveTables)] &&
workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_Reshard)] {
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
command := "SwitchTraffic"
if reverse {
command = "ReverseTraffic"
}
const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary",
"--timeout="+SwitchWritesTimeout, command, ksWorkflow)
if output != "" {
fmt.Printf("Output of switching writes for %s:\n++++++\n%s\n--------\n", ksWorkflow, output)
}
// printSwitchWritesExtraDebug is useful when debugging failures in Switch writes due to corner cases/races
_ = printSwitchWritesExtraDebug
require.NoError(t, err, fmt.Sprintf("Switch writes Error: %s: %s", err, output))
}

// generateInnoDBRowHistory generates at least maxSourceTrxHistory rollback segment entries.
// This allows us to confirm two behaviors:
// 1. MoveTables blocks on starting its first copy phase until we rollback
Expand Down
1 change: 1 addition & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func testChecksum(t *testing.T, want, got int64) {

func init() {
tabletconn.RegisterDialer("fake_gateway", tabletDialer)
tabletconn.RegisterDialer("grpc", tabletDialer)
tabletconntest.SetProtocol("go.vt.discovery.healthcheck_test", "fake_gateway")
connMap = make(map[string]*fakeConn)
refreshInterval = time.Minute
Expand Down
Loading