From 3a8e662c11792f7eab8f068e635570f960aaa6a2 Mon Sep 17 00:00:00 2001 From: Sheikh Araf Date: Sun, 7 Jun 2020 20:00:36 +0530 Subject: [PATCH] resource/aws_msk_cluster: Support in-place Kafka version upgrade --- aws/resource_aws_msk_cluster.go | 40 +++++- aws/resource_aws_msk_cluster_test.go | 199 +++++++++++++++++++++++++++ 2 files changed, 236 insertions(+), 3 deletions(-) diff --git a/aws/resource_aws_msk_cluster.go b/aws/resource_aws_msk_cluster.go index d6688b6a86b..eb25c6ec419 100644 --- a/aws/resource_aws_msk_cluster.go +++ b/aws/resource_aws_msk_cluster.go @@ -1,12 +1,14 @@ package aws import ( + "context" "fmt" "log" "time" "" "" + "" "" "" "" @@ -22,6 +24,11 @@ func resourceAwsMskCluster() *schema.Resource { Importer: &schema.ResourceImporter{ State: schema.ImportStatePassthrough, }, + CustomizeDiff: customdiff.Sequence( + customdiff.ForceNewIfChange("kafka_version", func(_ context.Context, old, new, meta interface{}) bool { + return new.(string) < old.(string) + }), + ), Schema: map[string]*schema.Schema{ "arn": { Type: schema.TypeString, @@ -198,7 +205,6 @@ func resourceAwsMskCluster() *schema.Resource { "kafka_version": { Type: schema.TypeString, Required: true, - ForceNew: true, ValidateFunc: validation.StringLenBetween(1, 64), }, "number_of_broker_nodes": { @@ -563,7 +569,7 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error } } - if d.HasChange("configuration_info") { + if d.HasChange("configuration_info") && !d.HasChange("kafka_version") { input := &kafka.UpdateClusterConfigurationInput{ ClusterArn: aws.String(d.Id()), ConfigurationInfo: expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{})), @@ -587,6 +593,34 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error } } + if d.HasChange("kafka_version") { + input := &kafka.UpdateClusterKafkaVersionInput{ + ClusterArn: aws.String(d.Id()), + CurrentVersion: aws.String(d.Get("current_version").(string)), + TargetKafkaVersion: aws.String(d.Get("kafka_version").(string)), + } + + if d.HasChange("configuration_info") { + input.ConfigurationInfo = expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{})) + } + + output, err := conn.UpdateClusterKafkaVersion(input) + + if err != nil { + return fmt.Errorf("error updating MSK Cluster (%s) kafka version: %w", d.Id(), err) + } + + if output == nil { + return fmt.Errorf("error updating MSK Cluster (%s) kafka version: empty response", d.Id()) + } + + clusterOperationARN := aws.StringValue(output.ClusterOperationArn) + + if err := waitForMskClusterOperation(conn, clusterOperationARN); err != nil { + return fmt.Errorf("error waiting for MSK Cluster (%s) operation (%s): %w", d.Id(), clusterOperationARN, err) + } + } + if d.HasChange("tags") { o, n := d.GetChange("tags") @@ -1110,7 +1144,7 @@ func waitForMskClusterOperation(conn *kafka.Kafka, clusterOperationARN string) e Pending: []string{"PENDING", "UPDATE_IN_PROGRESS"}, Target: []string{"UPDATE_COMPLETE"}, Refresh: mskClusterOperationRefreshFunc(conn, clusterOperationARN), - Timeout: 60 * time.Minute, + Timeout: 2 * time.Hour, } log.Printf("[DEBUG] Waiting for MSK Cluster Operation (%s) completion", clusterOperationARN) diff --git a/aws/resource_aws_msk_cluster_test.go b/aws/resource_aws_msk_cluster_test.go index 0f7d33246f5..2b53f734352 100644 --- a/aws/resource_aws_msk_cluster_test.go +++ b/aws/resource_aws_msk_cluster_test.go @@ -514,6 +514,128 @@ func TestAccAWSMskCluster_LoggingInfo(t *testing.T) { }) } +func TestAccAWSMskCluster_KafkaVersionUpgrade(t *testing.T) { + var cluster1, cluster2 kafka.ClusterInfo + rName := acctest.RandomWithPrefix("tf-acc-test") + resourceName := "aws_msk_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster1), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + "bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return + }, + }, + { + Config: testAccMskClusterConfigKafkaVersion(rName, ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster2), + testAccCheckMskClusterNotRecreated(&cluster1, &cluster2), + resource.TestCheckResourceAttr(resourceName, "kafka_version", ""), + ), + }, + }, + }) +} + +func TestAccAWSMskCluster_KafkaVersionDowngrade(t *testing.T) { + var cluster1, cluster2 kafka.ClusterInfo + rName := acctest.RandomWithPrefix("tf-acc-test") + resourceName := "aws_msk_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfigKafkaVersion(rName, ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster1), + resource.TestCheckResourceAttr(resourceName, "kafka_version", ""), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + "bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return + }, + }, + { + Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster2), + testAccCheckMskClusterRecreated(&cluster1, &cluster2), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"), + ), + }, + }, + }) +} + +func TestAccAWSMskCluster_KafkaVersionUpgradeWithConfigurationInfo(t *testing.T) { + var cluster1, cluster2 kafka.ClusterInfo + rName := acctest.RandomWithPrefix("tf-acc-test") + configurationResourceName1 := "aws_msk_configuration.config1" + configurationResourceName2 := "aws_msk_configuration.config2" + resourceName := "aws_msk_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.2.1", "config1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster1), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"), + resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName1, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName1, "latest_revision"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + "bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return + }, + }, + { + Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "", "config2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster2), + testAccCheckMskClusterNotRecreated(&cluster1, &cluster2), + resource.TestCheckResourceAttr(resourceName, "kafka_version", ""), + resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName2, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName2, "latest_revision"), + ), + }, + }, + }) +} + func TestAccAWSMskCluster_Tags(t *testing.T) { var cluster kafka.ClusterInfo var td kafka.ListTagsForResourceOutput @@ -611,6 +733,16 @@ func testAccCheckMskClusterNotRecreated(i, j *kafka.ClusterInfo) resource.TestCh } } +func testAccCheckMskClusterRecreated(i, j *kafka.ClusterInfo) resource.TestCheckFunc { + return func(s *terraform.State) error { + if aws.StringValue(i.ClusterArn) == aws.StringValue(j.ClusterArn) { + return fmt.Errorf("MSK Cluster (%s) was not recreated", aws.StringValue(i.ClusterArn)) + } + + return nil + } +} + func testAccLoadMskTags(cluster *kafka.ClusterInfo, td *kafka.ListTagsForResourceOutput) resource.TestCheckFunc { return func(s *terraform.State) error { conn := testAccProvider.Meta().(*AWSClient).kafkaconn @@ -1103,6 +1235,73 @@ resource "aws_msk_cluster" "test" { `, rName, cloudwatchLogsEnabled, cloudwatchLogsLogGroup, firehoseEnabled, firehoseDeliveryStream, s3Enabled, s3Bucket) } +func testAccMskClusterConfigKafkaVersion(rName string, kafkaVersion string) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` +resource "aws_msk_cluster" "test" { + cluster_name = %[1]q + kafka_version = %[2]q + number_of_broker_nodes = 3 + + encryption_info { + encryption_in_transit { + client_broker = "TLS_PLAINTEXT" + } + } + + broker_node_group_info { + client_subnets = ["${}", "${}", "${}"] + ebs_volume_size = 10 + instance_type = "kafka.m5.large" + security_groups = ["${}"] + } +} +`, rName, kafkaVersion) +} + +func testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName string, kafkaVersion string, configResourceName string) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` +resource "aws_msk_configuration" "config1" { + kafka_versions = ["2.2.1"] + name = "%[1]s-1" + server_properties = <