Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource/aws_emr_cluster: Fix aws_emr_security_configuration destroy issues #12578

Merged
merged 2 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12578.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_emr_cluster: Wait for the cluster to reach a terminated state on deletion
```
79 changes: 8 additions & 71 deletions internal/service/emr/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func resourceClusterCreate(d *schema.ResourceData, meta interface{}) error {
cluster, err := waitClusterCreated(conn, d.Id())

if err != nil {
return fmt.Errorf("error waiting for EMR Cluster (%s) to be created: %w", d.Id(), err)
return fmt.Errorf("error waiting for EMR Cluster (%s) to create: %w", d.Id(), err)
}

// For multiple master nodes, EMR automatically enables
Expand Down Expand Up @@ -1377,90 +1377,27 @@ func resourceClusterUpdate(d *schema.ResourceData, meta interface{}) error {
func resourceClusterDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).EMRConn

req := &emr.TerminateJobFlowsInput{
log.Printf("[DEBUG] Deleting EMR Cluster: (%s)", d.Id())
_, err := conn.TerminateJobFlows(&emr.TerminateJobFlowsInput{
JobFlowIds: []*string{
aws.String(d.Id()),
},
}

_, err := conn.TerminateJobFlows(req)
if err != nil {
log.Printf("[ERROR], %s", err)
return err
}

input := &emr.ListInstancesInput{
ClusterId: aws.String(d.Id()),
}
var resp *emr.ListInstancesOutput
var count int
err = resource.Retry(20*time.Minute, func() *resource.RetryError {
var err error
resp, err = conn.ListInstances(input)

if err != nil {
return resource.NonRetryableError(err)
}

count = CountRemainingInstances(resp, d.Id())
if count != 0 {
return resource.RetryableError(fmt.Errorf("EMR Cluster (%s) has (%d) Instances remaining", d.Id(), count))
}
return nil
})

if tfresource.TimedOut(err) {
resp, err = conn.ListInstances(input)

if err == nil {
count = CountRemainingInstances(resp, d.Id())
}
if err != nil {
return fmt.Errorf("error terminating EMR Cluster (%s): %w", d.Id(), err)
}

if count != 0 {
return fmt.Errorf("EMR Cluster (%s) has (%d) Instances remaining", d.Id(), count)
}
log.Println("[INFO] Waiting for EMR Cluster to be terminated")
_, err = waitClusterDeleted(conn, d.Id())

if err != nil {
return fmt.Errorf("error waiting for EMR Cluster (%s) Instances to drain: %s", d.Id(), err)
return fmt.Errorf("error waiting for EMR Cluster (%s) to delete: %w", d.Id(), err)
}

return nil
}

func CountRemainingInstances(resp *emr.ListInstancesOutput, emrClusterId string) int {
if resp == nil {
log.Printf("[ERROR] response is nil")
return 0
}

instanceCount := len(resp.Instances)

if instanceCount == 0 {
log.Printf("[DEBUG] No instances found for EMR Cluster (%s)", emrClusterId)
return 0
}

// Collect instance status states, wait for all instances to be terminated
// before moving on
var terminated []string
for j, i := range resp.Instances {
instanceId := aws.StringValue(i.Ec2InstanceId)
if i.Status != nil {
if aws.StringValue(i.Status.State) == emr.InstanceStateTerminated {
terminated = append(terminated, instanceId)
}
} else {
log.Printf("[DEBUG] Cluster instance (%d : %s) has no status", j, instanceId)
}
}
if len(terminated) == instanceCount {
log.Printf("[DEBUG] All (%d) EMR Cluster (%s) Instances terminated", instanceCount, emrClusterId)
return 0
}
return len(resp.Instances)
}

func expandApplications(apps []interface{}) []*emr.Application {
appOut := make([]*emr.Application, 0, len(apps))

Expand Down
43 changes: 14 additions & 29 deletions internal/service/emr/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hashicorp/terraform-provider-aws/internal/conns"
tfec2 "github.com/hashicorp/terraform-provider-aws/internal/service/ec2"
tfemr "github.com/hashicorp/terraform-provider-aws/internal/service/emr"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
)

func TestAccEMRCluster_basic(t *testing.T) {
Expand Down Expand Up @@ -1545,24 +1546,17 @@ func testAccCheckDestroy(s *terraform.State) error {
continue
}

input := &emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
_, err := tfemr.FindClusterByID(conn, rs.Primary.ID)

if tfresource.NotFound(err) {
continue
}

output, err := conn.DescribeCluster(input)
if err != nil {
return err
}

// if output.Cluster != nil &&
// *output.Cluster.Status.State == "WAITING" {
// return fmt.Errorf("EMR Cluster still exists")
// }
if output.Cluster == nil || output.Cluster.Status == nil || aws.StringValue(output.Cluster.Status.State) == emr.ClusterStateTerminated {
continue
}

return fmt.Errorf("EMR Cluster still exists")
return fmt.Errorf("EMR Cluster %s still exists", rs.Primary.ID)
}

return nil
Expand All @@ -1574,30 +1568,21 @@ func testAccCheckClusterExists(n string, v *emr.Cluster) resource.TestCheckFunc
if !ok {
return fmt.Errorf("Not found: %s", n)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No cluster id set")
}
conn := acctest.Provider.Meta().(*conns.AWSClient).EMRConn
describe, err := conn.DescribeCluster(&emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
})
if err != nil {
return fmt.Errorf("EMR error: %w", err)
return fmt.Errorf("No EMR Cluster ID is set")
}

if describe.Cluster == nil || *describe.Cluster.Id != rs.Primary.ID {
return fmt.Errorf("EMR cluster %q not found", rs.Primary.ID)
}
conn := acctest.Provider.Meta().(*conns.AWSClient).EMRConn

*v = *describe.Cluster
output, err := tfemr.FindClusterByID(conn, rs.Primary.ID)

if describe.Cluster.Status != nil {
state := aws.StringValue(describe.Cluster.Status.State)
if state != emr.ClusterStateRunning && state != emr.ClusterStateWaiting {
return fmt.Errorf("EMR cluster %q is not RUNNING or WAITING, currently: %s", rs.Primary.ID, state)
}
if err != nil {
return err
}

*v = *output

return nil
}
}
Expand Down
7 changes: 7 additions & 0 deletions internal/service/emr/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func FindClusterByID(conn *emr.EMR, id string) (*emr.Cluster, error) {
return nil, err
}

// Eventual consistency check.
if aws.StringValue(output.Id) != id {
return nil, &resource.NotFoundError{
LastRequest: input,
}
}

if state := aws.StringValue(output.Status.State); state == emr.ClusterStateTerminated || state == emr.ClusterStateTerminatedWithErrors {
return nil, &resource.NotFoundError{
Message: state,
Expand Down
37 changes: 29 additions & 8 deletions internal/service/emr/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@ const (
ClusterCreatedTimeout = 75 * time.Minute
ClusterCreatedMinTimeout = 10 * time.Second
ClusterCreatedDelay = 30 * time.Second

ClusterDeletedTimeout = 20 * time.Minute
ClusterDeletedMinTimeout = 10 * time.Second
ClusterDeletedDelay = 30 * time.Second
)

func waitClusterCreated(conn *emr.EMR, id string) (*emr.Cluster, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{
emr.ClusterStateBootstrapping,
emr.ClusterStateStarting,
},
Target: []string{
emr.ClusterStateRunning,
emr.ClusterStateWaiting,
},
Pending: []string{emr.ClusterStateBootstrapping, emr.ClusterStateStarting},
Target: []string{emr.ClusterStateRunning, emr.ClusterStateWaiting},
Refresh: statusCluster(conn, id),
Timeout: ClusterCreatedTimeout,
MinTimeout: ClusterCreatedMinTimeout,
Expand All @@ -44,3 +42,26 @@ func waitClusterCreated(conn *emr.EMR, id string) (*emr.Cluster, error) {

return nil, err
}

func waitClusterDeleted(conn *emr.EMR, id string) (*emr.Cluster, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{emr.ClusterStateTerminating},
Target: []string{emr.ClusterStateTerminated, emr.ClusterStateTerminatedWithErrors},
Refresh: statusCluster(conn, id),
Timeout: ClusterDeletedTimeout,
MinTimeout: ClusterDeletedMinTimeout,
Delay: ClusterDeletedDelay,
}

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*emr.Cluster); ok {
if stateChangeReason := output.Status.StateChangeReason; stateChangeReason != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(stateChangeReason.Code), aws.StringValue(stateChangeReason.Message)))
}

return output, err
}

return nil, err
}