diff --git a/aws/resource_aws_msk_cluster.go b/aws/resource_aws_msk_cluster.go index d6688b6a86b..eb25c6ec419 100644 --- a/aws/resource_aws_msk_cluster.go +++ b/aws/resource_aws_msk_cluster.go @@ -1,12 +1,14 @@ package aws import ( + "context" "fmt" "log" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kafka" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" @@ -22,6 +24,11 @@ func resourceAwsMskCluster() *schema.Resource { Importer: &schema.ResourceImporter{ State: schema.ImportStatePassthrough, }, + CustomizeDiff: customdiff.Sequence( + customdiff.ForceNewIfChange("kafka_version", func(_ context.Context, old, new, meta interface{}) bool { + return new.(string) < old.(string) + }), + ), Schema: map[string]*schema.Schema{ "arn": { Type: schema.TypeString, @@ -198,7 +205,6 @@ func resourceAwsMskCluster() *schema.Resource { "kafka_version": { Type: schema.TypeString, Required: true, - ForceNew: true, ValidateFunc: validation.StringLenBetween(1, 64), }, "number_of_broker_nodes": { @@ -563,7 +569,7 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error } } - if d.HasChange("configuration_info") { + if d.HasChange("configuration_info") && !d.HasChange("kafka_version") { input := &kafka.UpdateClusterConfigurationInput{ ClusterArn: aws.String(d.Id()), ConfigurationInfo: expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{})), @@ -587,6 +593,34 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error } } + if d.HasChange("kafka_version") { + input := &kafka.UpdateClusterKafkaVersionInput{ + ClusterArn: aws.String(d.Id()), + CurrentVersion: aws.String(d.Get("current_version").(string)), + TargetKafkaVersion: aws.String(d.Get("kafka_version").(string)), + } + + if d.HasChange("configuration_info") { + input.ConfigurationInfo = expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{})) + } + + output, err := conn.UpdateClusterKafkaVersion(input) + + if err != nil { + return fmt.Errorf("error updating MSK Cluster (%s) kafka version: %w", d.Id(), err) + } + + if output == nil { + return fmt.Errorf("error updating MSK Cluster (%s) kafka version: empty response", d.Id()) + } + + clusterOperationARN := aws.StringValue(output.ClusterOperationArn) + + if err := waitForMskClusterOperation(conn, clusterOperationARN); err != nil { + return fmt.Errorf("error waiting for MSK Cluster (%s) operation (%s): %w", d.Id(), clusterOperationARN, err) + } + } + if d.HasChange("tags") { o, n := d.GetChange("tags") @@ -1110,7 +1144,7 @@ func waitForMskClusterOperation(conn *kafka.Kafka, clusterOperationARN string) e Pending: []string{"PENDING", "UPDATE_IN_PROGRESS"}, Target: []string{"UPDATE_COMPLETE"}, Refresh: mskClusterOperationRefreshFunc(conn, clusterOperationARN), - Timeout: 60 * time.Minute, + Timeout: 2 * time.Hour, } log.Printf("[DEBUG] Waiting for MSK Cluster Operation (%s) completion", clusterOperationARN) diff --git a/aws/resource_aws_msk_cluster_test.go b/aws/resource_aws_msk_cluster_test.go index c4950d94c2a..4524538bb4c 100644 --- a/aws/resource_aws_msk_cluster_test.go +++ b/aws/resource_aws_msk_cluster_test.go @@ -514,6 +514,128 @@ func TestAccAWSMskCluster_LoggingInfo(t *testing.T) { }) } +func TestAccAWSMskCluster_KafkaVersionUpgrade(t *testing.T) { + var cluster1, cluster2 kafka.ClusterInfo + rName := acctest.RandomWithPrefix("tf-acc-test") + resourceName := "aws_msk_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster1), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + "bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return + }, + }, + { + Config: testAccMskClusterConfigKafkaVersion(rName, "2.4.1.1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster2), + testAccCheckMskClusterNotRecreated(&cluster1, &cluster2), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"), + ), + }, + }, + }) +} + +func TestAccAWSMskCluster_KafkaVersionDowngrade(t *testing.T) { + var cluster1, cluster2 kafka.ClusterInfo + rName := acctest.RandomWithPrefix("tf-acc-test") + resourceName := "aws_msk_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfigKafkaVersion(rName, "2.4.1.1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster1), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + "bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return + }, + }, + { + Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster2), + testAccCheckMskClusterRecreated(&cluster1, &cluster2), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"), + ), + }, + }, + }) +} + +func TestAccAWSMskCluster_KafkaVersionUpgradeWithConfigurationInfo(t *testing.T) { + var cluster1, cluster2 kafka.ClusterInfo + rName := acctest.RandomWithPrefix("tf-acc-test") + configurationResourceName1 := "aws_msk_configuration.config1" + configurationResourceName2 := "aws_msk_configuration.config2" + resourceName := "aws_msk_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.2.1", "config1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster1), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"), + resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName1, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName1, "latest_revision"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + "bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return + }, + }, + { + Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.4.1.1", "config2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster2), + testAccCheckMskClusterNotRecreated(&cluster1, &cluster2), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"), + resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName2, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName2, "latest_revision"), + ), + }, + }, + }) +} + func TestAccAWSMskCluster_Tags(t *testing.T) { var cluster kafka.ClusterInfo var td kafka.ListTagsForResourceOutput @@ -611,6 +733,16 @@ func testAccCheckMskClusterNotRecreated(i, j *kafka.ClusterInfo) resource.TestCh } } +func testAccCheckMskClusterRecreated(i, j *kafka.ClusterInfo) resource.TestCheckFunc { + return func(s *terraform.State) error { + if aws.StringValue(i.ClusterArn) == aws.StringValue(j.ClusterArn) { + return fmt.Errorf("MSK Cluster (%s) was not recreated", aws.StringValue(i.ClusterArn)) + } + + return nil + } +} + func testAccLoadMskTags(cluster *kafka.ClusterInfo, td *kafka.ListTagsForResourceOutput) resource.TestCheckFunc { return func(s *terraform.State) error { conn := testAccProvider.Meta().(*AWSClient).kafkaconn @@ -1106,6 +1238,73 @@ resource "aws_msk_cluster" "test" { `, rName, cloudwatchLogsEnabled, cloudwatchLogsLogGroup, firehoseEnabled, firehoseDeliveryStream, s3Enabled, s3Bucket) } +func testAccMskClusterConfigKafkaVersion(rName string, kafkaVersion string) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` +resource "aws_msk_cluster" "test" { + cluster_name = %[1]q + kafka_version = %[2]q + number_of_broker_nodes = 3 + + encryption_info { + encryption_in_transit { + client_broker = "TLS_PLAINTEXT" + } + } + + broker_node_group_info { + client_subnets = ["${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}"] + ebs_volume_size = 10 + instance_type = "kafka.m5.large" + security_groups = ["${aws_security_group.example_sg.id}"] + } +} +`, rName, kafkaVersion) +} + +func testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName string, kafkaVersion string, configResourceName string) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` +resource "aws_msk_configuration" "config1" { + kafka_versions = ["2.2.1"] + name = "%[1]s-1" + server_properties = <