Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop replication task before applying changes that require the task to be stopped. #24047

Merged
merged 15 commits into from
Apr 26, 2022
Merged
3 changes: 3 additions & 0 deletions .changelog/24047.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_dms_replication_task: Fix: The task is now stopped before updating it, if required.
```
32 changes: 23 additions & 9 deletions internal/service/dms/replication_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func resourceReplicationTaskCreate(d *schema.ResourceData, meta interface{}) err
}

if d.Get("start_replication_task").(bool) {
if err := startReplicationTask(d.Id(), conn); err != nil {
if err := startReplicationTask(d.Id(), conn, replicationTaskStatusReady); err != nil {
return err
}
}
Expand Down Expand Up @@ -223,6 +223,7 @@ func resourceReplicationTaskRead(d *schema.ResourceData, meta interface{}) error

func resourceReplicationTaskUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).DMSConn
status := d.Get("status").(string)

if d.HasChangesExcept("tags", "tags_all", "start_replication_task") {
input := &dms.ModifyReplicationTaskInput{
Expand Down Expand Up @@ -253,8 +254,14 @@ func resourceReplicationTaskUpdate(d *schema.ResourceData, meta interface{}) err
input.TableMappings = aws.String(d.Get("table_mappings").(string))
}

log.Println("[DEBUG] DMS update replication task:", input)
if status == replicationTaskStatusRunning {
log.Println("[DEBUG] stopping DMS replication task:", input)
if err := stopReplicationTask(d.Id(), conn); err != nil {
return err
}
}

log.Println("[DEBUG] updating DMS replication task:", input)
_, err := conn.ModifyReplicationTask(input)
if err != nil {
return fmt.Errorf("error updating DMS Replication Task (%s): %w", d.Id(), err)
Expand All @@ -265,17 +272,19 @@ func resourceReplicationTaskUpdate(d *schema.ResourceData, meta interface{}) err
}

if d.Get("start_replication_task").(bool) {
if err := startReplicationTask(d.Id(), conn); err != nil {
err := startReplicationTask(d.Id(), conn, replicationTaskStatusStopped)
if err != nil {
return err
} else {
status = replicationTaskStatusRunning
}
}
}

if d.HasChanges("start_replication_task") {
status := d.Get("status").(string)
if d.Get("start_replication_task").(bool) {
if status != replicationTaskStatusRunning {
if err := startReplicationTask(d.Id(), conn); err != nil {
if err := startReplicationTask(d.Id(), conn, status); err != nil {
return err
}
}
Expand Down Expand Up @@ -361,7 +370,7 @@ func dmsReplicationTaskRemoveReadOnlySettings(settings string) (*string, error)
return &cleanedSettingsString, nil
}

func startReplicationTask(id string, conn *dms.DatabaseMigrationService) error {
func startReplicationTask(id string, conn *dms.DatabaseMigrationService, fromStatus string) error {
log.Printf("[DEBUG] Starting DMS Replication Task: (%s)", id)

task, err := FindReplicationTaskByID(conn, id)
Expand All @@ -373,9 +382,14 @@ func startReplicationTask(id string, conn *dms.DatabaseMigrationService) error {
return fmt.Errorf("error reading DMS Replication Task (%s): empty output", id)
}

startReplicationTaskType := dms.StartReplicationTaskTypeValueStartReplication
if fromStatus != replicationTaskStatusReady {
startReplicationTaskType = dms.StartReplicationTaskTypeValueResumeProcessing
}

_, err = conn.StartReplicationTask(&dms.StartReplicationTaskInput{
ReplicationTaskArn: task.ReplicationTaskArn,
StartReplicationTaskType: aws.String(dms.StartReplicationTaskTypeValueStartReplication),
StartReplicationTaskType: aws.String(startReplicationTaskType),
Copy link
Contributor Author

@fermezz fermezz Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author's note: This change is necessary because for tasks of type full-load and full-load-and-cdc you can only use start-replication when it's the first time the task is run. Afterwards, you have to use resume-processing, otherwise it will fail.

Proof of it, is the following error while running the acceptance tests without this change:

replication_task_test.go:100: Step 4/4 error: Error running apply: exit status 1

Error: error starting DMS Replication Task (tf-acc-test-748333755630122031): InvalidParameterCombinationException: Start Type : START_REPLICATION, valid only for tasks running for the first time status code: 400, request id: af7935aa-89d8-49de-b835-8d6a87addc65

with aws_dms_replication_task.test, on terraform_plugin_test.tf line 176, in resource "aws_dms_replication_task" "test":
    176: resource "aws_dms_replication_task" "test" {

--- FAIL: TestAccDMSReplicationTask_startReplicationTask (1373.44s)

This way, we only use start-replication after the task is on status ready, which only happens right after the task has been created.

})

if err != nil {
Expand All @@ -384,7 +398,7 @@ func startReplicationTask(id string, conn *dms.DatabaseMigrationService) error {

err = waitReplicationTaskRunning(conn, id)
if err != nil {
return fmt.Errorf("error wating for DMS Replication Task (%s) start: %w", id, err)
return fmt.Errorf("error waiting for DMS Replication Task (%s) start: %w", id, err)
}

return nil
Expand Down Expand Up @@ -412,7 +426,7 @@ func stopReplicationTask(id string, conn *dms.DatabaseMigrationService) error {

err = waitReplicationTaskStopped(conn, id)
if err != nil {
return fmt.Errorf("error wating for DMS Replication Task (%s) stop: %w", id, err)
return fmt.Errorf("error waiting for DMS Replication Task (%s) stop: %w", id, err)
}

return nil
Expand Down
60 changes: 46 additions & 14 deletions internal/service/dms/replication_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,14 @@ func TestAccDMSReplicationTask_startReplicationTask(t *testing.T) {
CheckDestroy: testAccCheckReplicationTaskDestroy,
Steps: []resource.TestStep{
{
Config: dmsReplicationTaskConfigStartReplicationTask(rName, true),
Config: dmsReplicationTaskConfigStartReplicationTask(rName, true, "testrule"),
Check: resource.ComposeTestCheckFunc(
testAccCheckReplicationTaskExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "status", "running"),
),
},
{
Config: dmsReplicationTaskConfigStartReplicationTask(rName, true, "changedtestrule"),
Check: resource.ComposeTestCheckFunc(
testAccCheckReplicationTaskExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "status", "running"),
Expand All @@ -116,7 +123,7 @@ func TestAccDMSReplicationTask_startReplicationTask(t *testing.T) {
ImportStateVerifyIgnore: []string{"start_replication_task"},
},
{
Config: dmsReplicationTaskConfigStartReplicationTask(rName, false),
Config: dmsReplicationTaskConfigStartReplicationTask(rName, false, "changedtestrule"),
Check: resource.ComposeTestCheckFunc(
testAccCheckReplicationTaskExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "status", "stopped"),
Expand Down Expand Up @@ -288,7 +295,7 @@ resource "aws_dms_replication_task" "test" {
`, cdcStartPosition, rName))
}

func dmsReplicationTaskConfigStartReplicationTask(rName string, startTask bool) string {
func dmsReplicationTaskConfigStartReplicationTask(rName string, startTask bool, ruleName string) string {
return acctest.ConfigCompose(
acctest.ConfigAvailableAZsNoOptIn(),
fmt.Sprintf(`
Expand Down Expand Up @@ -351,16 +358,41 @@ resource "aws_db_subnet_group" "test" {
}
}

resource "aws_rds_cluster_parameter_group" "test" {
name = "%[1]s-pg-cluster"
family = "aurora-mysql5.7"
description = "DMS cluster parameter group"

parameter {
name = "binlog_format"
value = "ROW"
apply_method = "pending-reboot"
}

parameter {
name = "binlog_row_image"
value = "Full"
apply_method = "pending-reboot"
}

parameter {
name = "binlog_checksum"
value = "NONE"
apply_method = "pending-reboot"
}
}

resource "aws_rds_cluster" "test1" {
cluster_identifier = "%[1]s-aurora-cluster-source"
engine = "aurora-mysql"
engine_version = "5.7.mysql_aurora.2.03.2"
database_name = "tftest"
master_username = "tftest"
master_password = "mustbeeightcharaters"
skip_final_snapshot = true
vpc_security_group_ids = [aws_default_security_group.test.id]
db_subnet_group_name = aws_db_subnet_group.test.name
cluster_identifier = "%[1]s-aurora-cluster-source"
engine = "aurora-mysql"
engine_version = "5.7.mysql_aurora.2.03.2"
database_name = "tftest"
master_username = "tftest"
master_password = "mustbeeightcharaters"
skip_final_snapshot = true
vpc_security_group_ids = [aws_default_security_group.test.id]
db_subnet_group_name = aws_db_subnet_group.test.name
db_cluster_parameter_group_name = aws_rds_cluster_parameter_group.test.name
}

resource "aws_rds_cluster_instance" "test1" {
Expand Down Expand Up @@ -438,7 +470,7 @@ resource "aws_dms_replication_task" "test" {
replication_task_id = %[1]q
replication_task_settings = "{\"BeforeImageSettings\":null,\"FailTaskWhenCleanTaskResourceFailed\":false,\"ChangeProcessingDdlHandlingPolicy\":{\"HandleSourceTableAltered\":true,\"HandleSourceTableDropped\":true,\"HandleSourceTableTruncated\":true},\"ChangeProcessingTuning\":{\"BatchApplyMemoryLimit\":500,\"BatchApplyPreserveTransaction\":true,\"BatchApplyTimeoutMax\":30,\"BatchApplyTimeoutMin\":1,\"BatchSplitSize\":0,\"CommitTimeout\":1,\"MemoryKeepTime\":60,\"MemoryLimitTotal\":1024,\"MinTransactionSize\":1000,\"StatementCacheSize\":50},\"CharacterSetSettings\":null,\"ControlTablesSettings\":{\"ControlSchema\":\"\",\"FullLoadExceptionTableEnabled\":false,\"HistoryTableEnabled\":false,\"HistoryTimeslotInMinutes\":5,\"StatusTableEnabled\":false,\"SuspendedTablesTableEnabled\":false},\"ErrorBehavior\":{\"ApplyErrorDeletePolicy\":\"IGNORE_RECORD\",\"ApplyErrorEscalationCount\":0,\"ApplyErrorEscalationPolicy\":\"LOG_ERROR\",\"ApplyErrorFailOnTruncationDdl\":false,\"ApplyErrorInsertPolicy\":\"LOG_ERROR\",\"ApplyErrorUpdatePolicy\":\"LOG_ERROR\",\"DataErrorEscalationCount\":0,\"DataErrorEscalationPolicy\":\"SUSPEND_TABLE\",\"DataErrorPolicy\":\"LOG_ERROR\",\"DataTruncationErrorPolicy\":\"LOG_ERROR\",\"FailOnNoTablesCaptured\":false,\"FailOnTransactionConsistencyBreached\":false,\"FullLoadIgnoreConflicts\":true,\"RecoverableErrorCount\":-1,\"RecoverableErrorInterval\":5,\"RecoverableErrorStopRetryAfterThrottlingMax\":false,\"RecoverableErrorThrottling\":true,\"RecoverableErrorThrottlingMax\":1800,\"TableErrorEscalationCount\":0,\"TableErrorEscalationPolicy\":\"STOP_TASK\",\"TableErrorPolicy\":\"SUSPEND_TABLE\"},\"FullLoadSettings\":{\"CommitRate\":10000,\"CreatePkAfterFullLoad\":false,\"MaxFullLoadSubTasks\":8,\"StopTaskCachedChangesApplied\":false,\"StopTaskCachedChangesNotApplied\":false,\"TargetTablePrepMode\":\"DROP_AND_CREATE\",\"TransactionConsistencyTimeout\":600},\"Logging\":{\"EnableLogging\":false,\"LogComponents\":[{\"Id\":\"TRANSFORMATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"IO\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_LOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"PERFORMANCE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SORTER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"REST_SERVER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"VALIDATOR_EXT\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_APPLY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TASK_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TABLES_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"METADATA_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_FACTORY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMON\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"ADDONS\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"DATA_STRUCTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMUNICATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_TRANSFER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]},\"LoopbackPreventionSettings\":null,\"PostProcessingRules\":null,\"StreamBufferSettings\":{\"CtrlStreamBufferSizeInMB\":5,\"StreamBufferCount\":3,\"StreamBufferSizeInMB\":8},\"TargetMetadata\":{\"BatchApplyEnabled\":false,\"FullLobMode\":false,\"InlineLobMaxSize\":0,\"LimitedSizeLobMode\":true,\"LoadMaxFileSize\":0,\"LobChunkSize\":0,\"LobMaxSize\":32,\"ParallelApplyBufferSize\":0,\"ParallelApplyQueuesPerThread\":0,\"ParallelApplyThreads\":0,\"ParallelLoadBufferSize\":0,\"ParallelLoadQueuesPerThread\":0,\"ParallelLoadThreads\":0,\"SupportLobs\":true,\"TargetSchema\":\"\",\"TaskRecoveryTableEnabled\":false},\"TTSettings\":{\"EnableTT\":false,\"TTRecordSettings\":null,\"TTS3Settings\":null}}"
source_endpoint_arn = aws_dms_endpoint.source.endpoint_arn
table_mappings = "{\"rules\":[{\"rule-type\":\"selection\",\"rule-id\":\"1\",\"rule-name\":\"1\",\"object-locator\":{\"schema-name\":\"%%\",\"table-name\":\"%%\"},\"rule-action\":\"include\"}]}"
table_mappings = "{\"rules\":[{\"rule-type\":\"selection\",\"rule-id\":\"1\",\"rule-name\":\"%[3]s\",\"object-locator\":{\"schema-name\":\"%%\",\"table-name\":\"%%\"},\"rule-action\":\"include\"}]}"

start_replication_task = %[2]t

Expand All @@ -450,5 +482,5 @@ resource "aws_dms_replication_task" "test" {

depends_on = [aws_rds_cluster_instance.test1, aws_rds_cluster_instance.test2]
}
`, rName, startTask))
`, rName, startTask, ruleName))
}