Skip to content

Commit

Permalink
resource/aws_msk_cluster: Support in-place Kafka version upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
arafsheikh committed Nov 4, 2020
1 parent c29aa2b commit 585f2ff
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 3 deletions.
40 changes: 37 additions & 3 deletions aws/resource_aws_msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func resourceAwsMskCluster() *schema.Resource {
"kafka_version": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validation.StringLenBetween(1, 64),
},
"number_of_broker_nodes": {
Expand Down Expand Up @@ -563,7 +562,7 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
}
}

if d.HasChange("configuration_info") {
if d.HasChange("configuration_info") && !d.HasChange("kafka_version") {
input := &kafka.UpdateClusterConfigurationInput{
ClusterArn: aws.String(d.Id()),
ConfigurationInfo: expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{})),
Expand All @@ -587,6 +586,41 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
}
}

if d.HasChange("kafka_version") {
var input *kafka.UpdateClusterKafkaVersionInput

if d.HasChange("configuration_info") {
input = &kafka.UpdateClusterKafkaVersionInput{
ClusterArn: aws.String(d.Id()),
ConfigurationInfo: expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{})),
CurrentVersion: aws.String(d.Get("current_version").(string)),
TargetKafkaVersion: aws.String(d.Get("kafka_version").(string)),
}
} else {
input = &kafka.UpdateClusterKafkaVersionInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
TargetKafkaVersion: aws.String(d.Get("kafka_version").(string)),
}
}

output, err := conn.UpdateClusterKafkaVersion(input)

if err != nil {
return fmt.Errorf("error updating MSK Cluster (%s) kafka version: %s", d.Id(), err)
}

if output == nil {
return fmt.Errorf("error updating MSK Cluster (%s) kafka version: empty response", d.Id())
}

clusterOperationARN := aws.StringValue(output.ClusterOperationArn)

if err := waitForMskClusterOperation(conn, clusterOperationARN); err != nil {
return fmt.Errorf("error waiting for MSK Cluster (%s) operation (%s): %s", d.Id(), clusterOperationARN, err)
}
}

if d.HasChange("tags") {
o, n := d.GetChange("tags")

Expand Down Expand Up @@ -1110,7 +1144,7 @@ func waitForMskClusterOperation(conn *kafka.Kafka, clusterOperationARN string) e
Pending: []string{"PENDING", "UPDATE_IN_PROGRESS"},
Target: []string{"UPDATE_COMPLETE"},
Refresh: mskClusterOperationRefreshFunc(conn, clusterOperationARN),
Timeout: 60 * time.Minute,
Timeout: 2 * time.Hour,
}

log.Printf("[DEBUG] Waiting for MSK Cluster Operation (%s) completion", clusterOperationARN)
Expand Down
151 changes: 151 additions & 0 deletions aws/resource_aws_msk_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,90 @@ func TestAccAWSMskCluster_LoggingInfo(t *testing.T) {
})
}

func TestAccAWSMskCluster_KafkaVersionUpdate(t *testing.T) {
var cluster1, cluster2 kafka.ClusterInfo
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckMskClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
},
},
{
Config: testAccMskClusterConfigKafkaVersion(rName, "2.4.1.1"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster2),
testAccCheckMskClusterNotRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"),
),
},
},
})
}

func TestAccAWSMskCluster_KafkaVersionUpdateWithConfigurationInfo(t *testing.T) {
var cluster1, cluster2 kafka.ClusterInfo
rName := acctest.RandomWithPrefix("tf-acc-test")
configurationResourceName1 := "aws_msk_configuration.config1"
configurationResourceName2 := "aws_msk_configuration.config2"
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckMskClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.2.1", "config1"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"),
resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"),
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName1, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName1, "latest_revision"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
},
},
{
Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.4.1.1", "config2"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster2),
testAccCheckMskClusterNotRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"),
resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"),
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName2, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName2, "latest_revision"),
),
},
},
})
}

func TestAccAWSMskCluster_Tags(t *testing.T) {
var cluster kafka.ClusterInfo
var td kafka.ListTagsForResourceOutput
Expand Down Expand Up @@ -1103,6 +1187,73 @@ resource "aws_msk_cluster" "test" {
`, rName, cloudwatchLogsEnabled, cloudwatchLogsLogGroup, firehoseEnabled, firehoseDeliveryStream, s3Enabled, s3Bucket)
}

func testAccMskClusterConfigKafkaVersion(rName string, kafkaVersion string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = %[2]q
number_of_broker_nodes = 3
encryption_info {
encryption_in_transit {
client_broker = "TLS_PLAINTEXT"
}
}
broker_node_group_info {
client_subnets = ["${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}"]
ebs_volume_size = 10
instance_type = "kafka.m5.large"
security_groups = ["${aws_security_group.example_sg.id}"]
}
}
`, rName, kafkaVersion)
}

func testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName string, kafkaVersion string, configResourceName string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_msk_configuration" "config1" {
kafka_versions = ["2.2.1"]
name = "%[1]s-1"
server_properties = <<PROPERTIES
log.cleaner.delete.retention.ms = 86400000
PROPERTIES
}
resource "aws_msk_configuration" "config2" {
kafka_versions = ["2.4.1.1"]
name = "%[1]s-2"
server_properties = <<PROPERTIES
log.cleaner.delete.retention.ms = 86400001
PROPERTIES
}
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = %[2]q
number_of_broker_nodes = 3
encryption_info {
encryption_in_transit {
client_broker = "TLS_PLAINTEXT"
}
}
broker_node_group_info {
client_subnets = ["${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}"]
ebs_volume_size = 10
instance_type = "kafka.m5.large"
security_groups = ["${aws_security_group.example_sg.id}"]
}
configuration_info {
arn = aws_msk_configuration.%[3]s.arn
revision = aws_msk_configuration.%[3]s.latest_revision
}
}
`, rName, kafkaVersion, configResourceName)
}

func testAccMskClusterConfigTags1(rName string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
Expand Down

0 comments on commit 585f2ff

Please sign in to comment.