Skip to content

Commit

Permalink
Merge pull request #13654 from arafsheikh/master
Browse files Browse the repository at this point in the history
aws_msk_cluster: support in-place Kafka version upgrade
  • Loading branch information
anGie44 authored Nov 17, 2020
2 parents 87dfe39 + 3a8e662 commit a1ea7ce
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 3 deletions.
40 changes: 37 additions & 3 deletions aws/resource_aws_msk_cluster.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package aws

import (
"context"
"fmt"
"log"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
Expand All @@ -22,6 +24,11 @@ func resourceAwsMskCluster() *schema.Resource {
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
CustomizeDiff: customdiff.Sequence(
customdiff.ForceNewIfChange("kafka_version", func(_ context.Context, old, new, meta interface{}) bool {
return new.(string) < old.(string)
}),
),
Schema: map[string]*schema.Schema{
"arn": {
Type: schema.TypeString,
Expand Down Expand Up @@ -198,7 +205,6 @@ func resourceAwsMskCluster() *schema.Resource {
"kafka_version": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validation.StringLenBetween(1, 64),
},
"number_of_broker_nodes": {
Expand Down Expand Up @@ -563,7 +569,7 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
}
}

if d.HasChange("configuration_info") {
if d.HasChange("configuration_info") && !d.HasChange("kafka_version") {
input := &kafka.UpdateClusterConfigurationInput{
ClusterArn: aws.String(d.Id()),
ConfigurationInfo: expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{})),
Expand All @@ -587,6 +593,34 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
}
}

if d.HasChange("kafka_version") {
input := &kafka.UpdateClusterKafkaVersionInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
TargetKafkaVersion: aws.String(d.Get("kafka_version").(string)),
}

if d.HasChange("configuration_info") {
input.ConfigurationInfo = expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{}))
}

output, err := conn.UpdateClusterKafkaVersion(input)

if err != nil {
return fmt.Errorf("error updating MSK Cluster (%s) kafka version: %w", d.Id(), err)
}

if output == nil {
return fmt.Errorf("error updating MSK Cluster (%s) kafka version: empty response", d.Id())
}

clusterOperationARN := aws.StringValue(output.ClusterOperationArn)

if err := waitForMskClusterOperation(conn, clusterOperationARN); err != nil {
return fmt.Errorf("error waiting for MSK Cluster (%s) operation (%s): %w", d.Id(), clusterOperationARN, err)
}
}

if d.HasChange("tags") {
o, n := d.GetChange("tags")

Expand Down Expand Up @@ -1110,7 +1144,7 @@ func waitForMskClusterOperation(conn *kafka.Kafka, clusterOperationARN string) e
Pending: []string{"PENDING", "UPDATE_IN_PROGRESS"},
Target: []string{"UPDATE_COMPLETE"},
Refresh: mskClusterOperationRefreshFunc(conn, clusterOperationARN),
Timeout: 60 * time.Minute,
Timeout: 2 * time.Hour,
}

log.Printf("[DEBUG] Waiting for MSK Cluster Operation (%s) completion", clusterOperationARN)
Expand Down
199 changes: 199 additions & 0 deletions aws/resource_aws_msk_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,128 @@ func TestAccAWSMskCluster_LoggingInfo(t *testing.T) {
})
}

func TestAccAWSMskCluster_KafkaVersionUpgrade(t *testing.T) {
var cluster1, cluster2 kafka.ClusterInfo
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckMskClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
},
},
{
Config: testAccMskClusterConfigKafkaVersion(rName, "2.4.1.1"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster2),
testAccCheckMskClusterNotRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"),
),
},
},
})
}

func TestAccAWSMskCluster_KafkaVersionDowngrade(t *testing.T) {
var cluster1, cluster2 kafka.ClusterInfo
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckMskClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccMskClusterConfigKafkaVersion(rName, "2.4.1.1"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
},
},
{
Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster2),
testAccCheckMskClusterRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"),
),
},
},
})
}

func TestAccAWSMskCluster_KafkaVersionUpgradeWithConfigurationInfo(t *testing.T) {
var cluster1, cluster2 kafka.ClusterInfo
rName := acctest.RandomWithPrefix("tf-acc-test")
configurationResourceName1 := "aws_msk_configuration.config1"
configurationResourceName2 := "aws_msk_configuration.config2"
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckMskClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.2.1", "config1"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"),
resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"),
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName1, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName1, "latest_revision"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
},
},
{
Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.4.1.1", "config2"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster2),
testAccCheckMskClusterNotRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"),
resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"),
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName2, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName2, "latest_revision"),
),
},
},
})
}

func TestAccAWSMskCluster_Tags(t *testing.T) {
var cluster kafka.ClusterInfo
var td kafka.ListTagsForResourceOutput
Expand Down Expand Up @@ -611,6 +733,16 @@ func testAccCheckMskClusterNotRecreated(i, j *kafka.ClusterInfo) resource.TestCh
}
}

func testAccCheckMskClusterRecreated(i, j *kafka.ClusterInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
if aws.StringValue(i.ClusterArn) == aws.StringValue(j.ClusterArn) {
return fmt.Errorf("MSK Cluster (%s) was not recreated", aws.StringValue(i.ClusterArn))
}

return nil
}
}

func testAccLoadMskTags(cluster *kafka.ClusterInfo, td *kafka.ListTagsForResourceOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).kafkaconn
Expand Down Expand Up @@ -1106,6 +1238,73 @@ resource "aws_msk_cluster" "test" {
`, rName, cloudwatchLogsEnabled, cloudwatchLogsLogGroup, firehoseEnabled, firehoseDeliveryStream, s3Enabled, s3Bucket)
}

func testAccMskClusterConfigKafkaVersion(rName string, kafkaVersion string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = %[2]q
number_of_broker_nodes = 3
encryption_info {
encryption_in_transit {
client_broker = "TLS_PLAINTEXT"
}
}
broker_node_group_info {
client_subnets = ["${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}"]
ebs_volume_size = 10
instance_type = "kafka.m5.large"
security_groups = ["${aws_security_group.example_sg.id}"]
}
}
`, rName, kafkaVersion)
}

func testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName string, kafkaVersion string, configResourceName string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_msk_configuration" "config1" {
kafka_versions = ["2.2.1"]
name = "%[1]s-1"
server_properties = <<PROPERTIES
log.cleaner.delete.retention.ms = 86400000
PROPERTIES
}
resource "aws_msk_configuration" "config2" {
kafka_versions = ["2.4.1.1"]
name = "%[1]s-2"
server_properties = <<PROPERTIES
log.cleaner.delete.retention.ms = 86400001
PROPERTIES
}
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = %[2]q
number_of_broker_nodes = 3
encryption_info {
encryption_in_transit {
client_broker = "TLS_PLAINTEXT"
}
}
broker_node_group_info {
client_subnets = ["${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}"]
ebs_volume_size = 10
instance_type = "kafka.m5.large"
security_groups = ["${aws_security_group.example_sg.id}"]
}
configuration_info {
arn = aws_msk_configuration.%[3]s.arn
revision = aws_msk_configuration.%[3]s.latest_revision
}
}
`, rName, kafkaVersion, configResourceName)
}

func testAccMskClusterConfigTags1(rName string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
Expand Down

0 comments on commit a1ea7ce

Please sign in to comment.