Skip to content

Commit

Permalink
r/aws_emr_cluster: Reapply changes from #12578 in new style.
Browse files Browse the repository at this point in the history
  • Loading branch information
ewbankkit committed Dec 16, 2021
1 parent 959df07 commit e73bc96
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 108 deletions.
79 changes: 8 additions & 71 deletions internal/service/emr/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func resourceClusterCreate(d *schema.ResourceData, meta interface{}) error {
cluster, err := waitClusterCreated(conn, d.Id())

if err != nil {
return fmt.Errorf("error waiting for EMR Cluster (%s) to be created: %w", d.Id(), err)
return fmt.Errorf("error waiting for EMR Cluster (%s) to create: %w", d.Id(), err)
}

// For multiple master nodes, EMR automatically enables
Expand Down Expand Up @@ -1377,90 +1377,27 @@ func resourceClusterUpdate(d *schema.ResourceData, meta interface{}) error {
func resourceClusterDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).EMRConn

req := &emr.TerminateJobFlowsInput{
log.Printf("[DEBUG] Deleting EMR Cluster: (%s)", d.Id())
_, err := conn.TerminateJobFlows(&emr.TerminateJobFlowsInput{
JobFlowIds: []*string{
aws.String(d.Id()),
},
}

_, err := conn.TerminateJobFlows(req)
if err != nil {
log.Printf("[ERROR], %s", err)
return err
}

input := &emr.ListInstancesInput{
ClusterId: aws.String(d.Id()),
}
var resp *emr.ListInstancesOutput
var count int
err = resource.Retry(20*time.Minute, func() *resource.RetryError {
var err error
resp, err = conn.ListInstances(input)

if err != nil {
return resource.NonRetryableError(err)
}

count = CountRemainingInstances(resp, d.Id())
if count != 0 {
return resource.RetryableError(fmt.Errorf("EMR Cluster (%s) has (%d) Instances remaining", d.Id(), count))
}
return nil
})

if tfresource.TimedOut(err) {
resp, err = conn.ListInstances(input)

if err == nil {
count = CountRemainingInstances(resp, d.Id())
}
if err != nil {
return fmt.Errorf("error terminating EMR Cluster (%s): %w", d.Id(), err)
}

if count != 0 {
return fmt.Errorf("EMR Cluster (%s) has (%d) Instances remaining", d.Id(), count)
}
log.Println("[INFO] Waiting for EMR Cluster to be terminated")
_, err = waitClusterDeleted(conn, d.Id())

if err != nil {
return fmt.Errorf("error waiting for EMR Cluster (%s) Instances to drain: %s", d.Id(), err)
return fmt.Errorf("error waiting for EMR Cluster (%s) to delete: %w", d.Id(), err)
}

return nil
}

func CountRemainingInstances(resp *emr.ListInstancesOutput, emrClusterId string) int {
if resp == nil {
log.Printf("[ERROR] response is nil")
return 0
}

instanceCount := len(resp.Instances)

if instanceCount == 0 {
log.Printf("[DEBUG] No instances found for EMR Cluster (%s)", emrClusterId)
return 0
}

// Collect instance status states, wait for all instances to be terminated
// before moving on
var terminated []string
for j, i := range resp.Instances {
instanceId := aws.StringValue(i.Ec2InstanceId)
if i.Status != nil {
if aws.StringValue(i.Status.State) == emr.InstanceStateTerminated {
terminated = append(terminated, instanceId)
}
} else {
log.Printf("[DEBUG] Cluster instance (%d : %s) has no status", j, instanceId)
}
}
if len(terminated) == instanceCount {
log.Printf("[DEBUG] All (%d) EMR Cluster (%s) Instances terminated", instanceCount, emrClusterId)
return 0
}
return len(resp.Instances)
}

func expandApplications(apps []interface{}) []*emr.Application {
appOut := make([]*emr.Application, 0, len(apps))

Expand Down
43 changes: 14 additions & 29 deletions internal/service/emr/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hashicorp/terraform-provider-aws/internal/conns"
tfec2 "github.com/hashicorp/terraform-provider-aws/internal/service/ec2"
tfemr "github.com/hashicorp/terraform-provider-aws/internal/service/emr"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
)

func TestAccEMRCluster_basic(t *testing.T) {
Expand Down Expand Up @@ -1545,24 +1546,17 @@ func testAccCheckDestroy(s *terraform.State) error {
continue
}

input := &emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
_, err := tfemr.FindClusterByID(conn, rs.Primary.ID)

if tfresource.NotFound(err) {
continue
}

output, err := conn.DescribeCluster(input)
if err != nil {
return err
}

// if output.Cluster != nil &&
// *output.Cluster.Status.State == "WAITING" {
// return fmt.Errorf("EMR Cluster still exists")
// }
if output.Cluster == nil || output.Cluster.Status == nil || aws.StringValue(output.Cluster.Status.State) == emr.ClusterStateTerminated {
continue
}

return fmt.Errorf("EMR Cluster still exists")
return fmt.Errorf("EMR Cluster %s still exists", rs.Primary.ID)
}

return nil
Expand All @@ -1574,30 +1568,21 @@ func testAccCheckClusterExists(n string, v *emr.Cluster) resource.TestCheckFunc
if !ok {
return fmt.Errorf("Not found: %s", n)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No cluster id set")
}
conn := acctest.Provider.Meta().(*conns.AWSClient).EMRConn
describe, err := conn.DescribeCluster(&emr.DescribeClusterInput{
ClusterId: aws.String(rs.Primary.ID),
})
if err != nil {
return fmt.Errorf("EMR error: %w", err)
return fmt.Errorf("No EMR Cluster ID is set")
}

if describe.Cluster == nil || *describe.Cluster.Id != rs.Primary.ID {
return fmt.Errorf("EMR cluster %q not found", rs.Primary.ID)
}
conn := acctest.Provider.Meta().(*conns.AWSClient).EMRConn

*v = *describe.Cluster
output, err := tfemr.FindClusterByID(conn, rs.Primary.ID)

if describe.Cluster.Status != nil {
state := aws.StringValue(describe.Cluster.Status.State)
if state != emr.ClusterStateRunning && state != emr.ClusterStateWaiting {
return fmt.Errorf("EMR cluster %q is not RUNNING or WAITING, currently: %s", rs.Primary.ID, state)
}
if err != nil {
return err
}

*v = *output

return nil
}
}
Expand Down
7 changes: 7 additions & 0 deletions internal/service/emr/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func FindClusterByID(conn *emr.EMR, id string) (*emr.Cluster, error) {
return nil, err
}

// Eventual consistency check.
if aws.StringValue(output.Id) != id {
return nil, &resource.NotFoundError{
LastRequest: input,
}
}

if state := aws.StringValue(output.Status.State); state == emr.ClusterStateTerminated || state == emr.ClusterStateTerminatedWithErrors {
return nil, &resource.NotFoundError{
Message: state,
Expand Down
37 changes: 29 additions & 8 deletions internal/service/emr/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@ const (
ClusterCreatedTimeout = 75 * time.Minute
ClusterCreatedMinTimeout = 10 * time.Second
ClusterCreatedDelay = 30 * time.Second

ClusterDeletedTimeout = 20 * time.Minute
ClusterDeletedMinTimeout = 10 * time.Second
ClusterDeletedDelay = 30 * time.Second
)

func waitClusterCreated(conn *emr.EMR, id string) (*emr.Cluster, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{
emr.ClusterStateBootstrapping,
emr.ClusterStateStarting,
},
Target: []string{
emr.ClusterStateRunning,
emr.ClusterStateWaiting,
},
Pending: []string{emr.ClusterStateBootstrapping, emr.ClusterStateStarting},
Target: []string{emr.ClusterStateRunning, emr.ClusterStateWaiting},
Refresh: statusCluster(conn, id),
Timeout: ClusterCreatedTimeout,
MinTimeout: ClusterCreatedMinTimeout,
Expand All @@ -44,3 +42,26 @@ func waitClusterCreated(conn *emr.EMR, id string) (*emr.Cluster, error) {

return nil, err
}

func waitClusterDeleted(conn *emr.EMR, id string) (*emr.Cluster, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{emr.ClusterStateTerminating},
Target: []string{emr.ClusterStateTerminated, emr.ClusterStateTerminatedWithErrors},
Refresh: statusCluster(conn, id),
Timeout: ClusterDeletedTimeout,
MinTimeout: ClusterDeletedMinTimeout,
Delay: ClusterDeletedDelay,
}

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*emr.Cluster); ok {
if stateChangeReason := output.Status.StateChangeReason; stateChangeReason != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(stateChangeReason.Code), aws.StringValue(stateChangeReason.Message)))
}

return output, err
}

return nil, err
}

0 comments on commit e73bc96

Please sign in to comment.