diff --git a/.changelog/20904.txt b/.changelog/20904.txt new file mode 100644 index 000000000000..61729cc2a53a --- /dev/null +++ b/.changelog/20904.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_dms_endpoint: Add `message_format`, `include_transaction_details`, `include_partition_value`, `partition_include_schema_table`, `include_table_alter_operations`, `include_control_details`, `message_max_bytes`, `include_null_and_empty`, `security_protocol`, `ssl_client_certificate_arn`, `ssl_client_key_arn`, `ssl_client_key_password`, `ssl_ca_certificate_arn`, `sasl_username`, `sasl_password` and `no_hex_prefix` arguments to `kafka_settings` configuration block +``` \ No newline at end of file diff --git a/aws/internal/service/dms/enum.go b/aws/internal/service/dms/enum.go index 61960847a280..559122fe2b2c 100644 --- a/aws/internal/service/dms/enum.go +++ b/aws/internal/service/dms/enum.go @@ -1,5 +1,9 @@ package dms +const ( + EndpointStatusDeleting = "deleting" +) + const ( EngineNameAurora = "aurora" EngineNameAuroraPostgresql = "aurora-postgresql" diff --git a/aws/internal/service/dms/finder/finder.go b/aws/internal/service/dms/finder/finder.go new file mode 100644 index 000000000000..5c7b423cab8d --- /dev/null +++ b/aws/internal/service/dms/finder/finder.go @@ -0,0 +1,43 @@ +package finder + +import ( + "github.com/aws/aws-sdk-go/aws" + dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" +) + +func EndpointByID(conn *dms.DatabaseMigrationService, id string) (*dms.Endpoint, error) { + input := &dms.DescribeEndpointsInput{ + Filters: []*dms.Filter{ + { + Name: aws.String("endpoint-id"), + Values: aws.StringSlice([]string{id}), + }, + }, + } + + output, err := conn.DescribeEndpoints(input) + + if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + return nil, &resource.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + + if err != nil { + return nil, err + } + + if output == nil || len(output.Endpoints) == 0 || output.Endpoints[0] == nil { + return nil, tfresource.NewEmptyResultError(input) + } + + if count := len(output.Endpoints); count > 1 { + return nil, tfresource.NewTooManyResultsError(count, input) + } + + return output.Endpoints[0], nil +} diff --git a/aws/internal/service/dms/waiter/status.go b/aws/internal/service/dms/waiter/status.go new file mode 100644 index 000000000000..145b3494c4c8 --- /dev/null +++ b/aws/internal/service/dms/waiter/status.go @@ -0,0 +1,25 @@ +package waiter + +import ( + "github.com/aws/aws-sdk-go/aws" + dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" +) + +func EndpointStatus(conn *dms.DatabaseMigrationService, id string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + output, err := finder.EndpointByID(conn, id) + + if tfresource.NotFound(err) { + return nil, "", nil + } + + if err != nil { + return nil, "", err + } + + return output, aws.StringValue(output.Status), nil + } +} diff --git a/aws/internal/service/dms/waiter/waiter.go b/aws/internal/service/dms/waiter/waiter.go new file mode 100644 index 000000000000..6227b5d1baa4 --- /dev/null +++ b/aws/internal/service/dms/waiter/waiter.go @@ -0,0 +1,30 @@ +package waiter + +import ( + "time" + + dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + tfdms "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms" +) + +const ( + EndpointDeletedTimeout = 5 * time.Minute +) + +func EndpointDeleted(conn *dms.DatabaseMigrationService, id string) (*dms.Endpoint, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{tfdms.EndpointStatusDeleting}, + Target: []string{}, + Refresh: EndpointStatus(conn, id), + Timeout: EndpointDeletedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if output, ok := outputRaw.(*dms.Endpoint); ok { + return output, err + } + + return nil, err +} diff --git a/aws/provider_test.go b/aws/provider_test.go index bae5d844fce8..8d0ff54808cb 100644 --- a/aws/provider_test.go +++ b/aws/provider_test.go @@ -356,6 +356,7 @@ func testAccCheckResourceAttrRegionalReverseDnsService(resourceName, attributeNa } } +/* // testAccCheckResourceAttrHostnameWithPort ensures the Terraform state regexp matches a formatted DNS hostname with prefix, partition DNS suffix, and given port func testAccCheckResourceAttrHostnameWithPort(resourceName, attributeName, serviceName, hostnamePrefix string, port int) resource.TestCheckFunc { return func(s *terraform.State) error { @@ -365,6 +366,7 @@ func testAccCheckResourceAttrHostnameWithPort(resourceName, attributeName, servi return resource.TestCheckResourceAttr(resourceName, attributeName, hostname)(s) } } +*/ // testAccCheckResourceAttrPrivateDnsName ensures the Terraform state exactly matches a private DNS name // diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index 324c0a479744..d2a29d2f3c25 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -1,6 +1,7 @@ package aws import ( + "context" "fmt" "log" "regexp" @@ -8,13 +9,17 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags" tfdms "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/waiter" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) func resourceAwsDmsEndpoint() *schema.Resource { @@ -120,6 +125,85 @@ func resourceAwsDmsEndpoint() *schema.Resource { Required: true, ValidateFunc: validation.NoZeroValues, }, + "include_control_details": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "include_null_and_empty": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "include_partition_value": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "include_table_alter_operations": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "include_transaction_details": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "message_format": { + Type: schema.TypeString, + Optional: true, + Default: dms.MessageFormatValueJson, + ValidateFunc: validation.StringInSlice(dms.MessageFormatValue_Values(), false), + }, + "message_max_bytes": { + Type: schema.TypeInt, + Optional: true, + Default: 1000000, + }, + "no_hex_prefix": { + Type: schema.TypeBool, + Optional: true, + }, + "partition_include_schema_table": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "sasl_password": { + Type: schema.TypeString, + Optional: true, + Sensitive: true, + }, + "sasl_username": { + Type: schema.TypeString, + Optional: true, + }, + "security_protocol": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringInSlice(dms.KafkaSecurityProtocol_Values(), false), + }, + "ssl_ca_certificate_arn": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validateArn, + }, + "ssl_client_certificate_arn": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validateArn, + }, + "ssl_client_key_arn": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validateArn, + }, + "ssl_client_key_password": { + Type: schema.TypeString, + Optional: true, + Sensitive: true, + }, "topic": { Type: schema.TypeString, Optional: true, @@ -221,9 +305,10 @@ func resourceAwsDmsEndpoint() *schema.Resource { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "service_access_role_arn": { - Type: schema.TypeString, - Optional: true, - Default: "", + Type: schema.TypeString, + Optional: true, + Default: "", + ValidateFunc: validateArn, }, "external_table_definition": { Type: schema.TypeString, @@ -313,7 +398,10 @@ func resourceAwsDmsEndpoint() *schema.Resource { }, }, - CustomizeDiff: SetTagsDiff, + CustomizeDiff: customdiff.All( + resourceAwsDmsEndpointCustomizeDiff, + SetTagsDiff, + ), } } @@ -330,30 +418,26 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro } switch d.Get("engine_name").(string) { - // if dynamodb then add required params - case "dynamodb": + case tfdms.EngineNameDynamodb: request.DynamoDbSettings = &dms.DynamoDbSettings{ ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)), } - case "elasticsearch": + case tfdms.EngineNameElasticsearch: request.ElasticsearchSettings = &dms.ElasticsearchSettings{ ServiceAccessRoleArn: aws.String(d.Get("elasticsearch_settings.0.service_access_role_arn").(string)), EndpointUri: aws.String(d.Get("elasticsearch_settings.0.endpoint_uri").(string)), ErrorRetryDuration: aws.Int64(int64(d.Get("elasticsearch_settings.0.error_retry_duration").(int))), FullLoadErrorPercentage: aws.Int64(int64(d.Get("elasticsearch_settings.0.full_load_error_percentage").(int))), } - case "kafka": - request.KafkaSettings = &dms.KafkaSettings{ - Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), - Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), - } - case "kinesis": + case tfdms.EngineNameKafka: + request.KafkaSettings = expandDmsKafkaSettings(d.Get("kafka_settings").([]interface{})[0].(map[string]interface{})) + case tfdms.EngineNameKinesis: request.KinesisSettings = &dms.KinesisSettings{ MessageFormat: aws.String(d.Get("kinesis_settings.0.message_format").(string)), ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)), StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)), } - case "mongodb": + case tfdms.EngineNameMongodb: request.MongoDbSettings = &dms.MongoDbSettings{ Username: aws.String(d.Get("username").(string)), Password: aws.String(d.Get("password").(string)), @@ -376,7 +460,7 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro request.ServerName = aws.String(d.Get("server_name").(string)) request.Port = aws.Int64(int64(d.Get("port").(int))) request.DatabaseName = aws.String(d.Get("database_name").(string)) - case "s3": + case tfdms.EngineNameS3: request.S3Settings = &dms.S3Settings{ BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)), BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), @@ -451,24 +535,20 @@ func resourceAwsDmsEndpointRead(d *schema.ResourceData, meta interface{}) error defaultTagsConfig := meta.(*AWSClient).DefaultTagsConfig ignoreTagsConfig := meta.(*AWSClient).IgnoreTagsConfig - response, err := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{ - Filters: []*dms.Filter{ - { - Name: aws.String("endpoint-id"), - Values: []*string{aws.String(d.Id())}, // Must use d.Id() to work with import. - }, - }, - }) + endpoint, err := finder.EndpointByID(conn, d.Id()) + + if !d.IsNewResource() && tfresource.NotFound(err) { + log.Printf("[WARN] DMS Endpoint (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + if err != nil { - if dmserr, ok := err.(awserr.Error); ok && dmserr.Code() == "ResourceNotFoundFault" { - log.Printf("[DEBUG] DMS Replication Endpoint %q Not Found", d.Id()) - d.SetId("") - return nil - } - return err + return fmt.Errorf("error reading DMS Endpoint (%s): %w", d.Id(), err) } - err = resourceAwsDmsEndpointSetState(d, response.Endpoints[0]) + err = resourceAwsDmsEndpointSetState(d, endpoint) + if err != nil { return err } @@ -476,7 +556,7 @@ func resourceAwsDmsEndpointRead(d *schema.ResourceData, meta interface{}) error tags, err := keyvaluetags.DatabasemigrationserviceListTags(conn, d.Get("endpoint_arn").(string)) if err != nil { - return fmt.Errorf("error listing tags for DMS Endpoint (%s): %s", d.Get("endpoint_arn").(string), err) + return fmt.Errorf("error listing tags for DMS Endpoint (%s): %w", d.Get("endpoint_arn").(string), err) } tags = tags.IgnoreAws().IgnoreConfig(ignoreTagsConfig) @@ -547,15 +627,15 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro } } - switch d.Get("engine_name").(string) { - case "dynamodb": + switch engineName := d.Get("engine_name").(string); engineName { + case tfdms.EngineNameDynamodb: if d.HasChange("service_access_role") { request.DynamoDbSettings = &dms.DynamoDbSettings{ ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)), } hasChanges = true } - case "elasticsearch": + case tfdms.EngineNameElasticsearch: if d.HasChanges( "elasticsearch_settings.0.endpoint_uri", "elasticsearch_settings.0.error_retry_duration", @@ -567,21 +647,16 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro ErrorRetryDuration: aws.Int64(int64(d.Get("elasticsearch_settings.0.error_retry_duration").(int))), FullLoadErrorPercentage: aws.Int64(int64(d.Get("elasticsearch_settings.0.full_load_error_percentage").(int))), } - request.EngineName = aws.String(d.Get("engine_name").(string)) + request.EngineName = aws.String(engineName) hasChanges = true } - case "kafka": - if d.HasChanges( - "kafka_settings.0.broker", - "kafka_settings.0.topic") { - request.KafkaSettings = &dms.KafkaSettings{ - Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), - Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), - } - request.EngineName = aws.String(d.Get("engine_name").(string)) + case tfdms.EngineNameKafka: + if d.HasChange("kafka_settings") { + request.KafkaSettings = expandDmsKafkaSettings(d.Get("kafka_settings").([]interface{})[0].(map[string]interface{})) + request.EngineName = aws.String(engineName) hasChanges = true } - case "kinesis": + case tfdms.EngineNameKinesis: if d.HasChanges( "kinesis_settings.0.service_access_role_arn", "kinesis_settings.0.stream_arn") { @@ -592,10 +667,10 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)), StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)), } - request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 'kinesis') + request.EngineName = aws.String(engineName) hasChanges = true } - case "mongodb": + case tfdms.EngineNameMongodb: if d.HasChanges( "username", "password", "server_name", "port", "database_name", "mongodb_settings.0.auth_type", "mongodb_settings.0.auth_mechanism", "mongodb_settings.0.nesting_level", "mongodb_settings.0.extract_doc_id", @@ -615,7 +690,7 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro DocsToInvestigate: aws.String(d.Get("mongodb_settings.0.docs_to_investigate").(string)), AuthSource: aws.String(d.Get("mongodb_settings.0.auth_source").(string)), } - request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 'mongodb') + request.EngineName = aws.String(engineName) // Update connection info in top-level namespace as well request.Username = aws.String(d.Get("username").(string)) @@ -626,7 +701,7 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro hasChanges = true } - case "s3": + case tfdms.EngineNameS3: if d.HasChanges( "s3_settings.0.service_access_role_arn", "s3_settings.0.external_table_definition", "s3_settings.0.csv_row_delimiter", "s3_settings.0.csv_delimiter", "s3_settings.0.bucket_folder", @@ -647,7 +722,7 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)), ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)), } - request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 's3') + request.EngineName = aws.String(engineName) hasChanges = true } default: @@ -694,16 +769,55 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro func resourceAwsDmsEndpointDelete(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).dmsconn - request := &dms.DeleteEndpointInput{ + log.Printf("[DEBUG] Deleting DMS Endpoint: (%s)", d.Id()) + _, err := conn.DeleteEndpoint(&dms.DeleteEndpointInput{ EndpointArn: aws.String(d.Get("endpoint_arn").(string)), + }) + + if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + return nil } - log.Printf("[DEBUG] DMS delete endpoint: %#v", request) + if err != nil { + return fmt.Errorf("error deleting DMS Endpoint (%s): %w", d.Id(), err) + } + + _, err = waiter.EndpointDeleted(conn, d.Id()) + + if err != nil { + return fmt.Errorf("error waiting for DMS Endpoint (%s) delete: %w", d.Id(), err) + } - _, err := conn.DeleteEndpoint(request) return err } +func resourceAwsDmsEndpointCustomizeDiff(_ context.Context, diff *schema.ResourceDiff, v interface{}) error { + switch engineName := diff.Get("engine_name").(string); engineName { + case tfdms.EngineNameElasticsearch: + if v, ok := diff.GetOk("elasticsearch_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("elasticsearch_settings must be set when engine_name = %q", engineName) + } + case tfdms.EngineNameKafka: + if v, ok := diff.GetOk("kafka_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("kafka_settings must be set when engine_name = %q", engineName) + } + case tfdms.EngineNameKinesis: + if v, ok := diff.GetOk("kinesis_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("kinesis_settings must be set when engine_name = %q", engineName) + } + case tfdms.EngineNameMongodb: + if v, ok := diff.GetOk("mongodb_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("mongodb_settings must be set when engine_name = %q", engineName) + } + case tfdms.EngineNameS3: + if v, ok := diff.GetOk("s3_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("s3_settings must be set when engine_name = %q", engineName) + } + } + + return nil +} + func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) error { d.SetId(aws.StringValue(endpoint.EndpointIdentifier)) @@ -727,8 +841,16 @@ func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoi return fmt.Errorf("Error setting elasticsearch for DMS: %s", err) } case "kafka": - if err := d.Set("kafka_settings", flattenDmsKafkaSettings(endpoint.KafkaSettings)); err != nil { - return fmt.Errorf("Error setting kafka_settings for DMS: %s", err) + if endpoint.KafkaSettings != nil { + // SASL password isn't returned in API. Propagate state value. + tfMap := flattenDmsKafkaSettings(endpoint.KafkaSettings) + tfMap["sasl_password"] = d.Get("kafka_settings.0.sasl_password").(string) + + if err := d.Set("kafka_settings", []interface{}{tfMap}); err != nil { + return fmt.Errorf("error setting kafka_settings: %w", err) + } + } else { + d.Set("kafka_settings", nil) } case "kinesis": if err := d.Set("kinesis_settings", flattenDmsKinesisSettings(endpoint.KinesisSettings)); err != nil { @@ -782,17 +904,168 @@ func flattenDmsElasticsearchSettings(settings *dms.ElasticsearchSettings) []map[ return []map[string]interface{}{m} } -func flattenDmsKafkaSettings(settings *dms.KafkaSettings) []map[string]interface{} { - if settings == nil { - return []map[string]interface{}{} +func expandDmsKafkaSettings(tfMap map[string]interface{}) *dms.KafkaSettings { + if tfMap == nil { + return nil } - m := map[string]interface{}{ - "broker": aws.StringValue(settings.Broker), - "topic": aws.StringValue(settings.Topic), + apiObject := &dms.KafkaSettings{} + + if v, ok := tfMap["broker"].(string); ok && v != "" { + apiObject.Broker = aws.String(v) } - return []map[string]interface{}{m} + if v, ok := tfMap["include_control_details"].(bool); ok { + apiObject.IncludeControlDetails = aws.Bool(v) + } + + if v, ok := tfMap["include_null_and_empty"].(bool); ok { + apiObject.IncludeNullAndEmpty = aws.Bool(v) + } + + if v, ok := tfMap["include_partition_value"].(bool); ok { + apiObject.IncludePartitionValue = aws.Bool(v) + } + + if v, ok := tfMap["include_table_alter_operations"].(bool); ok { + apiObject.IncludeTableAlterOperations = aws.Bool(v) + } + + if v, ok := tfMap["include_transaction_details"].(bool); ok { + apiObject.IncludeTransactionDetails = aws.Bool(v) + } + + if v, ok := tfMap["message_format"].(string); ok && v != "" { + apiObject.MessageFormat = aws.String(v) + } + + if v, ok := tfMap["message_max_bytes"].(int); ok && v != 0 { + apiObject.MessageMaxBytes = aws.Int64(int64(v)) + } + + if v, ok := tfMap["no_hex_prefix"].(bool); ok { + apiObject.NoHexPrefix = aws.Bool(v) + } + + if v, ok := tfMap["partition_include_schema_table"].(bool); ok { + apiObject.PartitionIncludeSchemaTable = aws.Bool(v) + } + + if v, ok := tfMap["sasl_password"].(string); ok && v != "" { + apiObject.SaslPassword = aws.String(v) + } + + if v, ok := tfMap["sasl_username"].(string); ok && v != "" { + apiObject.SaslUsername = aws.String(v) + } + + if v, ok := tfMap["security_protocol"].(string); ok && v != "" { + apiObject.SecurityProtocol = aws.String(v) + } + + if v, ok := tfMap["ssl_ca_certificate_arn"].(string); ok && v != "" { + apiObject.SslCaCertificateArn = aws.String(v) + } + + if v, ok := tfMap["ssl_client_certificate_arn"].(string); ok && v != "" { + apiObject.SslClientCertificateArn = aws.String(v) + } + + if v, ok := tfMap["ssl_client_key_arn"].(string); ok && v != "" { + apiObject.SslClientKeyArn = aws.String(v) + } + + if v, ok := tfMap["ssl_client_key_password"].(string); ok && v != "" { + apiObject.SslClientKeyPassword = aws.String(v) + } + + if v, ok := tfMap["topic"].(string); ok && v != "" { + apiObject.Topic = aws.String(v) + } + + return apiObject +} + +func flattenDmsKafkaSettings(apiObject *dms.KafkaSettings) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.Broker; v != nil { + tfMap["broker"] = aws.StringValue(v) + } + + if v := apiObject.IncludeControlDetails; v != nil { + tfMap["include_control_details"] = aws.BoolValue(v) + } + + if v := apiObject.IncludeNullAndEmpty; v != nil { + tfMap["include_null_and_empty"] = aws.BoolValue(v) + } + + if v := apiObject.IncludePartitionValue; v != nil { + tfMap["include_partition_value"] = aws.BoolValue(v) + } + + if v := apiObject.IncludeTableAlterOperations; v != nil { + tfMap["include_table_alter_operations"] = aws.BoolValue(v) + } + + if v := apiObject.IncludeTransactionDetails; v != nil { + tfMap["include_transaction_details"] = aws.BoolValue(v) + } + + if v := apiObject.MessageFormat; v != nil { + tfMap["message_format"] = aws.StringValue(v) + } + + if v := apiObject.MessageMaxBytes; v != nil { + tfMap["message_max_bytes"] = aws.Int64Value(v) + } + + if v := apiObject.NoHexPrefix; v != nil { + tfMap["no_hex_prefix"] = aws.BoolValue(v) + } + + if v := apiObject.PartitionIncludeSchemaTable; v != nil { + tfMap["partition_include_schema_table"] = aws.BoolValue(v) + } + + if v := apiObject.SaslPassword; v != nil { + tfMap["sasl_password"] = aws.StringValue(v) + } + + if v := apiObject.SaslUsername; v != nil { + tfMap["sasl_username"] = aws.StringValue(v) + } + + if v := apiObject.SecurityProtocol; v != nil { + tfMap["security_protocol"] = aws.StringValue(v) + } + + if v := apiObject.SslCaCertificateArn; v != nil { + tfMap["ssl_ca_certificate_arn"] = aws.StringValue(v) + } + + if v := apiObject.SslClientCertificateArn; v != nil { + tfMap["ssl_client_certificate_arn"] = aws.StringValue(v) + } + + if v := apiObject.SslClientKeyArn; v != nil { + tfMap["ssl_client_key_arn"] = aws.StringValue(v) + } + + if v := apiObject.SslClientKeyPassword; v != nil { + tfMap["ssl_client_key_password"] = aws.StringValue(v) + } + + if v := apiObject.Topic; v != nil { + tfMap["topic"] = aws.StringValue(v) + } + + return tfMap } func flattenDmsKinesisSettings(settings *dms.KinesisSettings) []map[string]interface{} { diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index 50dec074b846..3881b827cb25 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -5,11 +5,12 @@ import ( "regexp" "testing" - "github.com/aws/aws-sdk-go/aws" dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) func TestAccAwsDmsEndpoint_basic(t *testing.T) { @@ -20,12 +21,12 @@ func TestAccAwsDmsEndpoint_basic(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointBasicConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -38,7 +39,7 @@ func TestAccAwsDmsEndpoint_basic(t *testing.T) { { Config: dmsEndpointBasicConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "database_name", "tf-test-dms-db-updated"), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "extra"), resource.TestCheckResourceAttr(resourceName, "password", "tftestupdate"), @@ -60,12 +61,12 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointS3Config(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", ""), resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\n"), @@ -89,7 +90,7 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { { Config: dmsEndpointS3ConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestMatchResourceAttr(resourceName, "extra_connection_attributes", regexp.MustCompile(`key=value;`)), resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"), @@ -113,12 +114,12 @@ func TestAccAwsDmsEndpoint_S3_ExtraConnectionAttributes(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointS3ExtraConnectionAttributesConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestMatchResourceAttr(resourceName, "extra_connection_attributes", regexp.MustCompile(`dataFormat=parquet;`)), ), }, @@ -140,12 +141,12 @@ func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointDynamoDbConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -158,7 +159,7 @@ func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) { { Config: dmsEndpointDynamoDbConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), ), }, }, @@ -173,12 +174,12 @@ func TestAccAwsDmsEndpoint_Elasticsearch(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointElasticsearchConfig(rName), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), testAccCheckResourceAttrRegionalHostname(resourceName, "elasticsearch_settings.0.endpoint_uri", "es", "search-estest"), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.full_load_error_percentage", "10"), @@ -207,12 +208,12 @@ func TestAccAwsDmsEndpoint_Elasticsearch_ExtraConnectionAttributes(t *testing.T) PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointElasticsearchExtraConnectionAttributesConfig(rName), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "errorRetryDuration=400;"), ), }, @@ -234,12 +235,12 @@ func TestAccAwsDmsEndpoint_Elasticsearch_ErrorRetryDuration(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointElasticsearchConfigErrorRetryDuration(rName, 60), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.error_retry_duration", "60"), ), @@ -255,7 +256,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_ErrorRetryDuration(t *testing.T) { // { // Config: dmsEndpointElasticsearchConfigErrorRetryDuration(rName, 120), // Check: resource.ComposeTestCheckFunc( - // checkDmsEndpointExists(resourceName), + // testAccCheckAWSDmsEndpointExists(resourceName), // resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), // resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.error_retry_duration", "120"), // ), @@ -272,12 +273,12 @@ func TestAccAwsDmsEndpoint_Elasticsearch_FullLoadErrorPercentage(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointElasticsearchConfigFullLoadErrorPercentage(rName, 1), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.full_load_error_percentage", "1"), ), @@ -293,7 +294,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_FullLoadErrorPercentage(t *testing.T) { // { // Config: dmsEndpointElasticsearchConfigFullLoadErrorPercentage(rName, 2), // Check: resource.ComposeTestCheckFunc( - // checkDmsEndpointExists(resourceName), + // testAccCheckAWSDmsEndpointExists(resourceName), // resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), // resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.full_load_error_percentage", "2"), // ), @@ -302,26 +303,38 @@ func TestAccAwsDmsEndpoint_Elasticsearch_FullLoadErrorPercentage(t *testing.T) { }) } -func TestAccAwsDmsEndpoint_Kafka_Broker(t *testing.T) { - resourceName := "aws_dms_endpoint.test" +func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { + domainName := testAccRandomSubdomain() rName := acctest.RandomWithPrefix("tf-acc-test") - brokerPrefix := "ec2-12-345-678-901" - brokerService := "compute-1" - brokerPort1 := 2345 - brokerPort2 := 3456 + resourceName := "aws_dms_endpoint.test" resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { - Config: dmsEndpointKafkaConfigBroker(rName, brokerPrefix, brokerService, brokerPort1), + Config: dmsEndpointKafkaConfig(rName, domainName), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), - testAccCheckResourceAttrHostnameWithPort(resourceName, "kafka_settings.0.broker", brokerService, brokerPrefix, brokerPort1), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_control_details", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_null_and_empty", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_partition_value", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_table_alter_operations", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_transaction_details", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "json"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_max_bytes", "1000000"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.no_hex_prefix", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.partition_include_schema_table", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_password", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_username", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.security_protocol", "plaintext"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_ca_certificate_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_certificate_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_key_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_key_password", ""), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "kafka-default-topic"), ), }, @@ -332,50 +345,29 @@ func TestAccAwsDmsEndpoint_Kafka_Broker(t *testing.T) { ImportStateVerifyIgnore: []string{"password"}, }, { - Config: dmsEndpointKafkaConfigBroker(rName, brokerPrefix, brokerService, brokerPort2), + Config: dmsEndpointKafkaConfigUpdate(rName, domainName), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), - testAccCheckResourceAttrHostnameWithPort(resourceName, "kafka_settings.0.broker", brokerService, brokerPrefix, brokerPort2), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "kafka-default-topic"), - ), - }, - }, - }) -} - -func TestAccAwsDmsEndpoint_Kafka_Topic(t *testing.T) { - resourceName := "aws_dms_endpoint.test" - rName := acctest.RandomWithPrefix("tf-acc-test") - - resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { testAccPreCheck(t) }, - ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), - Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, - Steps: []resource.TestStep{ - { - Config: dmsEndpointKafkaConfigTopic(rName, "topic1"), - Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_control_details", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_null_and_empty", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_partition_value", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_table_alter_operations", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_transaction_details", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "json-unformatted"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_max_bytes", "500000"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.no_hex_prefix", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.partition_include_schema_table", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_password", "tftest-new"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_username", "tftest-new"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.security_protocol", "sasl-ssl"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_ca_certificate_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_certificate_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_key_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_key_password", ""), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "topic1"), ), }, - { - ResourceName: resourceName, - ImportState: true, - ImportStateVerify: true, - ImportStateVerifyIgnore: []string{"password"}, - }, - { - Config: dmsEndpointKafkaConfigTopic(rName, "topic2"), - Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "topic2"), - ), - }, }, }) } @@ -388,12 +380,12 @@ func TestAccAwsDmsEndpoint_Kinesis(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointKinesisConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kinesis_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.message_format", "json"), resource.TestCheckResourceAttrPair(resourceName, "kinesis_settings.0.stream_arn", "aws_kinesis_stream.stream1", "arn"), @@ -408,7 +400,7 @@ func TestAccAwsDmsEndpoint_Kinesis(t *testing.T) { { Config: dmsEndpointKinesisConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kinesis_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.message_format", "json"), resource.TestCheckResourceAttrPair(resourceName, "kinesis_settings.0.stream_arn", "aws_kinesis_stream.stream2", "arn"), @@ -426,12 +418,12 @@ func TestAccAwsDmsEndpoint_MongoDb(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointMongoDbConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -456,19 +448,19 @@ func TestAccAwsDmsEndpoint_MongoDb_Update(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointMongoDbConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, { Config: dmsEndpointMongoDbConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "server_name", "tftest-new-server_name"), resource.TestCheckResourceAttr(resourceName, "port", "27018"), resource.TestCheckResourceAttr(resourceName, "username", "tftest-new-username"), @@ -501,12 +493,12 @@ func TestAccAwsDmsEndpoint_DocDB(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointDocDBConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -519,7 +511,7 @@ func TestAccAwsDmsEndpoint_DocDB(t *testing.T) { { Config: dmsEndpointDocDBConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "database_name", "tf-test-dms-db-updated"), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "extra"), resource.TestCheckResourceAttr(resourceName, "password", "tftestupdate"), @@ -541,12 +533,12 @@ func TestAccAwsDmsEndpoint_Db2(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointDb2Config(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -559,7 +551,7 @@ func TestAccAwsDmsEndpoint_Db2(t *testing.T) { { Config: dmsEndpointDb2ConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "database_name", "tf-test-dms-db-updated"), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "extra"), resource.TestCheckResourceAttr(resourceName, "password", "tftestupdate"), @@ -573,22 +565,31 @@ func TestAccAwsDmsEndpoint_Db2(t *testing.T) { }) } -func dmsEndpointDestroy(s *terraform.State) error { +func testAccCheckAWSDmsEndpointDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).dmsconn + for _, rs := range s.RootModule().Resources { if rs.Type != "aws_dms_endpoint" { continue } - err := checkDmsEndpointExists(rs.Primary.ID) - if err == nil { - return fmt.Errorf("Found an endpoint that was not destroyed: %s", rs.Primary.ID) + _, err := finder.EndpointByID(conn, rs.Primary.ID) + + if tfresource.NotFound(err) { + continue } + + if err != nil { + return err + } + + return fmt.Errorf("DMS Endpoint %s still exists", rs.Primary.ID) } return nil } -func checkDmsEndpointExists(n string) resource.TestCheckFunc { +func testAccCheckAWSDmsEndpointExists(n string) resource.TestCheckFunc { return func(s *terraform.State) error { rs, ok := s.RootModule().Resources[n] if !ok { @@ -596,25 +597,15 @@ func checkDmsEndpointExists(n string) resource.TestCheckFunc { } if rs.Primary.ID == "" { - return fmt.Errorf("No ID is set") + return fmt.Errorf("No DMS Endpoint ID is set") } conn := testAccProvider.Meta().(*AWSClient).dmsconn - resp, err := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{ - Filters: []*dms.Filter{ - { - Name: aws.String("endpoint-id"), - Values: []*string{aws.String(rs.Primary.ID)}, - }, - }, - }) - if err != nil { - return fmt.Errorf("DMS endpoint error: %v", err) - } + _, err := finder.EndpointByID(conn, rs.Primary.ID) - if resp.Endpoints == nil { - return fmt.Errorf("DMS endpoint not found") + if err != nil { + return err } return nil @@ -1162,38 +1153,50 @@ resource "aws_dms_endpoint" "test" { `, rName, fullLoadErrorPercentage)) } -func dmsEndpointKafkaConfigBroker(rName, brokerPrefix, brokerServiceName string, brokerPort int) string { +func dmsEndpointKafkaConfig(rName, domainName string) string { return fmt.Sprintf(` -data "aws_partition" "current" {} - resource "aws_dms_endpoint" "test" { endpoint_id = %[1]q endpoint_type = "target" engine_name = "kafka" + ssl_mode = "none" kafka_settings { - # example kafka broker: "ec2-12-345-678-901.compute-1.amazonaws.com:2345" - broker = "%[2]s.%[3]s.${data.aws_partition.current.dns_suffix}:%[4]d" + broker = "%[2]s:2345" + include_null_and_empty = false + security_protocol = "plaintext" + no_hex_prefix = false } } -`, rName, brokerPrefix, brokerServiceName, brokerPort) +`, rName, domainName) } -func dmsEndpointKafkaConfigTopic(rName string, topic string) string { +func dmsEndpointKafkaConfigUpdate(rName, domainName string) string { return fmt.Sprintf(` -data "aws_partition" "current" {} - resource "aws_dms_endpoint" "test" { endpoint_id = %[1]q endpoint_type = "target" engine_name = "kafka" + ssl_mode = "none" kafka_settings { - broker = "ec2-12-345-678-901.compute-1.${data.aws_partition.current.dns_suffix}:2345" - topic = %[2]q + broker = "%[2]s:2345" + topic = "topic1" + message_format = "json-unformatted" + include_transaction_details = true + include_partition_value = true + partition_include_schema_table = true + include_table_alter_operations = true + include_control_details = true + message_max_bytes = 500000 + include_null_and_empty = true + security_protocol = "sasl-ssl" + sasl_username = "tftest-new" + sasl_password = "tftest-new" + no_hex_prefix = true } } -`, rName, topic) +`, rName, domainName) } func dmsEndpointKinesisConfig(randId string) string { diff --git a/website/docs/r/dms_endpoint.html.markdown b/website/docs/r/dms_endpoint.html.markdown index 16f1cf32f763..f823968fec20 100644 --- a/website/docs/r/dms_endpoint.html.markdown +++ b/website/docs/r/dms_endpoint.html.markdown @@ -87,6 +87,22 @@ The `elasticsearch_settings` configuration block supports the following argument The `kafka_settings` configuration block supports the following arguments: * `broker` - (Required) Kafka broker location. Specify in the form broker-hostname-or-ip:port. +* `include_control_details` - (Optional) Shows detailed control information for table definition, column definition, and table and column changes in the Kafka message output. The default is `false`. +* `include_null_and_empty` - (Optional) Include NULL and empty columns for records migrated to the endpoint. The default is `false`. +* `include_partition_value` - (Optional) Shows the partition value within the Kafka message output unless the partition type is `schema-table-type`. The default is `false`. +* `include_table_alter_operations` - (Optional) Includes any data definition language (DDL) operations that change the table in the control data, such as `rename-table`, `drop-table`, `add-column`, `drop-column`, and `rename-column`. The default is `false`. +* `include_transaction_details` - (Optional) Provides detailed transaction information from the source database. This information includes a commit timestamp, a log position, and values for `transaction_id`, previous `transaction_id`, and `transaction_record_id` (the record offset within a transaction). The default is `false`. +* `message_format` - (Optional) The output format for the records created on the endpoint. The message format is `JSON` (default) or `JSON_UNFORMATTED` (a single line with no tab). +* `message_max_bytes` - (Optional) The maximum size in bytes for records created on the endpoint The default is `1,000,000`. +* `no_hex_prefix` - (Optional) Set this optional parameter to true to avoid adding a '0x' prefix to raw data in hexadecimal format. For example, by default, AWS DMS adds a '0x' prefix to the LOB column type in hexadecimal format moving from an Oracle source to a Kafka target. Use the `no_hex_prefix` endpoint setting to enable migration of RAW data type columns without adding the `'0x'` prefix. +* `partition_include_schema_table` - (Optional) Prefixes schema and table names to partition values, when the partition type is `primary-key-type`. Doing this increases data distribution among Kafka partitions. For example, suppose that a SysBench schema has thousands of tables and each table has only limited range for a primary key. In this case, the same primary key is sent from thousands of tables to the same partition, which causes throttling. The default is `false`. +* `sasl_password` - (Optional) The secure password you created when you first set up your MSK cluster to validate a client identity and make an encrypted connection between server and client using SASL-SSL authentication. +* `sasl_username` - (Optional) The secure user name you created when you first set up your MSK cluster to validate a client identity and make an encrypted connection between server and client using SASL-SSL authentication. +* `security_protocol` - (Optional) Set secure connection to a Kafka target endpoint using Transport Layer Security (TLS). Options include `ssl-encryption`, `ssl-authentication`, and `sasl-ssl`. `sasl-ssl` requires `sasl_username` and `sasl_password`. +* `ssl_ca_certificate_arn` - (Optional) The Amazon Resource Name (ARN) for the private certificate authority (CA) cert that AWS DMS uses to securely connect to your Kafka target endpoint. +* `ssl_client_certificate_arn` - (Optional) The Amazon Resource Name (ARN) of the client certificate used to securely connect to a Kafka target endpoint. +* `ssl_client_key_arn` - (Optional) The Amazon Resource Name (ARN) for the client private key used to securely connect to a Kafka target endpoint. +* `ssl_client_key_password` - (Optional) The password for the client private key used to securely connect to a Kafka target endpoint. * `topic` - (Optional) Kafka topic for migration. Defaults to `kafka-default-topic`. ### kinesis_settings Arguments