Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnlineDDL: "cancel-all" command to cancel all pending migrations in keyspace #7099

Merged
merged 7 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ func (vtctlclient *VtctlClientProcess) OnlineDDLCancelMigration(Keyspace, uuid s
)
}

// OnlineDDLCancelMigration cancels a given migration uuid
func (vtctlclient *VtctlClientProcess) OnlineDDLCancelAllMigrations(Keyspace string) (result string, err error) {
return vtctlclient.ExecuteCommandWithOutput(
"OnlineDDL",
Keyspace,
"cancel-all",
)
}

// OnlineDDLRetryMigration retries a given migration uuid
func (vtctlclient *VtctlClientProcess) OnlineDDLRetryMigration(Keyspace, uuid string) (result string, err error) {
return vtctlclient.ExecuteCommandWithOutput(
Expand Down
33 changes: 33 additions & 0 deletions go/test/endtoend/onlineddl/onlineddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path"
"regexp"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -182,6 +183,26 @@ func TestSchemaChange(t *testing.T) {
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
// migration will fail again
}
{
// no migrations pending at this time
time.Sleep(10 * time.Second)
checkCancelAllMigrations(t, 0)
Comment on lines +190 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this sleep time be reduced. It just delays the e2e test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately all the online-DDL flow works asynchronously, with check intervals. In this endtoend tests intervals are reduced to just a few secs, and still you have to give time for the check, for gh-ost, for callback...

This is why I extracted online-ddl to be on its own CI shard; it's still shorter than our other endtoend tests so right now I feel like it's not worth investing the optimization effort.

}
{
// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
count := 4
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = testAlterTable(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col")
}()
}
wg.Wait()
checkCancelAllMigrations(t, count)
}
}

Expand Down Expand Up @@ -279,6 +300,18 @@ func checkCancelMigration(t *testing.T, uuid string, expectCancelPossible bool)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}

// checkCancelAllMigrations all pending migrations
func checkCancelAllMigrations(t *testing.T, expectCount int) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLCancelAllMigrations(keyspaceName)
fmt.Println("# 'vtctlclient OnlineDDL cancel-all' output (for debug purposes):")
fmt.Println(result)
assert.NoError(t, err)

r := fullWordRegexp(fmt.Sprintf("%d", expectCount))
m := r.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}

// checkRetryMigration attempts to retry a migration, and expects rejection
func checkRetryMigration(t *testing.T, uuid string, expectRetryPossible bool) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLRetryMigration(keyspaceName, uuid)
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2547,6 +2547,13 @@ func commandOnlineDDL(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag
uuid = arg
query = fmt.Sprintf(`update _vt.schema_migrations set migration_status='cancel' where migration_uuid='%s'`, uuid)
}
case "cancel-all":
{
if arg != "" {
return fmt.Errorf("UUID not allowed in %s", command)
}
query = `update _vt.schema_migrations set migration_status='cancel-all'`
}
default:
return fmt.Errorf("Unknown OnlineDDL command: %s", command)
}
Expand Down
38 changes: 37 additions & 1 deletion go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var (
)

var vexecUpdateTemplates = []string{
`update _vt.schema_migrations set migration_status='val' where mysql_schema='val'`,
`update _vt.schema_migrations set migration_status='val' where migration_uuid='val' and mysql_schema='val'`,
`update _vt.schema_migrations set migration_status='val' where migration_uuid='val' and mysql_schema='val' and shard='val'`,
}
Expand Down Expand Up @@ -894,6 +895,32 @@ func (e *Executor) cancelMigrations(ctx context.Context, uuids []string) (err er
return nil
}

// cancelPendingMigrations cancels all pending migrations (that are expected to run or are running)
// for this keyspace
func (e *Executor) cancelPendingMigrations(ctx context.Context) (result *sqltypes.Result, err error) {
parsed := sqlparser.BuildParsedQuery(sqlSelectPendingMigrations, "_vt")
r, err := e.execQuery(ctx, parsed.Query)
if err != nil {
return result, err
}
Comment on lines +900 to +905
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there can be race condition right between running and completed Migration. For my understanding, what will happen when a running migration just got completed after it is selected in pending migrations Or when cancelPendingMigration is in progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct, and this race condition is unavoidable. If the user chooses to "cancel" or "cancel-all" just when the migration is about to complete, it's possible that the migration will win the race and complete; in which case the "cancel" is a no-op and nothing happens (the migration is just complete)

var uuids []string
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
uuids = append(uuids, uuid)
}

result = &sqltypes.Result{}
for _, uuid := range uuids {
log.Infof("cancelPendingMigrations: cancelling %s", uuid)
res, err := e.cancelMigration(ctx, uuid, true)
if err != nil {
return result, err
}
result.AppendResult(res)
}
return result, nil
}

// scheduleNextMigration attemps to schedule a single migration to run next.
// possibly there's no migrations to run. Possibly there's a migration running right now,
// in which cases nothing happens.
Expand Down Expand Up @@ -1443,7 +1470,7 @@ func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *queryp
return nil, err
}
if !match {
return nil, fmt.Errorf("Query must match one of these templates: %s", strings.Join(vexecUpdateTemplates, "; "))
return nil, fmt.Errorf("Query must match one of these templates: %s; query=%s", strings.Join(vexecUpdateTemplates, "; "), vx.Query)
}
if shard, _ := vx.ColumnStringVal(vx.WhereCols, "shard"); shard != "" {
// shard is specified.
Expand All @@ -1464,7 +1491,16 @@ func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *queryp
if err != nil {
return nil, err
}
if !schema.IsOnlineDDLUUID(uuid) {
return nil, fmt.Errorf("Not an Online DDL UUID: %s", uuid)
}
return response(e.cancelMigration(ctx, uuid, true))
case cancelAllMigrationHint:
uuid, _ := vx.ColumnStringVal(vx.WhereCols, "migration_uuid")
if uuid != "" {
return nil, fmt.Errorf("Unexpetced UUID: %s", uuid)
}
return response(e.cancelPendingMigrations(ctx))
default:
return nil, fmt.Errorf("Unexpected value for migration_status: %v. Supported values are: %s, %s",
statusVal, retryMigrationHint, cancelMigrationHint)
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ const (
migration_status='running'
AND liveness_timestamp < NOW() - INTERVAL %a MINUTE
`
sqlSelectPendingMigrations = `SELECT
migration_uuid
FROM %s.schema_migrations
WHERE
migration_status IN ('queued', 'ready', 'running')
`
sqlSelectUncollectedArtifacts = `SELECT
migration_uuid,
artifacts
Expand Down Expand Up @@ -213,8 +219,9 @@ const (
)

const (
retryMigrationHint = "retry"
cancelMigrationHint = "cancel"
retryMigrationHint = "retry"
cancelMigrationHint = "cancel"
cancelAllMigrationHint = "cancel-all"
)

var (
Expand Down