diff --git a/.changelog/38098.txt b/.changelog/38098.txt new file mode 100644 index 00000000000..f8800ab363b --- /dev/null +++ b/.changelog/38098.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_dynamodb_kinesis_streaming_destination: Add `approximate_creation_date_time_precision` argument +``` \ No newline at end of file diff --git a/.changelog/39844.txt b/.changelog/39844.txt new file mode 100644 index 00000000000..3c78467b6f7 --- /dev/null +++ b/.changelog/39844.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_kinesis_firehose_delivery_stream: Add `iceberg_configuration` argument +``` \ No newline at end of file diff --git a/internal/service/dynamodb/kinesis_streaming_destination.go b/internal/service/dynamodb/kinesis_streaming_destination.go index 537670c300b..9269681b3d0 100644 --- a/internal/service/dynamodb/kinesis_streaming_destination.go +++ b/internal/service/dynamodb/kinesis_streaming_destination.go @@ -42,6 +42,13 @@ func resourceKinesisStreamingDestination() *schema.Resource { }, Schema: map[string]*schema.Schema{ + "approximate_creation_date_time_precision": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ForceNew: true, + ValidateDiagFunc: enum.Validate[awstypes.ApproximateCreationDateTimePrecision](), + }, names.AttrStreamARN: { Type: schema.TypeString, Required: true, @@ -73,6 +80,12 @@ func resourceKinesisStreamingDestinationCreate(ctx context.Context, d *schema.Re TableName: aws.String(tableName), } + if v, ok := d.GetOk("approximate_creation_date_time_precision"); ok { + input.EnableKinesisStreamingConfiguration = &awstypes.EnableKinesisStreamingConfiguration{ + ApproximateCreationDateTimePrecision: awstypes.ApproximateCreationDateTimePrecision(v.(string)), + } + } + if _, err := conn.EnableKinesisStreamingDestination(ctx, input); err != nil { return sdkdiag.AppendErrorf(diags, "enabling DynamoDB Kinesis Streaming Destination (%s): %s", id, err) } @@ -108,6 +121,7 @@ func resourceKinesisStreamingDestinationRead(ctx context.Context, d *schema.Reso return sdkdiag.AppendErrorf(diags, "reading DynamoDB Kinesis Streaming Destination (%s): %s", d.Id(), err) } + d.Set("approximate_creation_date_time_precision", output.ApproximateCreationDateTimePrecision) d.Set(names.AttrStreamARN, output.StreamArn) d.Set(names.AttrTableName, tableName) @@ -131,7 +145,7 @@ func resourceKinesisStreamingDestinationDelete(ctx context.Context, d *schema.Re } if err != nil { - return sdkdiag.AppendErrorf(diags, "disabling DynamoDB Kinesis Streaming Destination (%s): %s", d.Id(), err) + return sdkdiag.AppendErrorf(diags, "reading DynamoDB Kinesis Streaming Destination (%s): %s", d.Id(), err) } log.Printf("[DEBUG] Deleting DynamoDB Kinesis Streaming Destination: %s", d.Id()) @@ -243,6 +257,7 @@ func waitKinesisStreamingDestinationActive(ctx context.Context, conn *dynamodb.C if output, ok := outputRaw.(*awstypes.KinesisDataStreamDestination); ok { tfresource.SetLastError(err, errors.New(aws.ToString(output.DestinationStatusDescription))) + return output, err } @@ -264,6 +279,7 @@ func waitKinesisStreamingDestinationDisabled(ctx context.Context, conn *dynamodb if output, ok := outputRaw.(*awstypes.KinesisDataStreamDestination); ok { tfresource.SetLastError(err, errors.New(aws.ToString(output.DestinationStatusDescription))) + return output, err } diff --git a/internal/service/dynamodb/kinesis_streaming_destination_test.go b/internal/service/dynamodb/kinesis_streaming_destination_test.go index 091a7a0f05a..216db131560 100644 --- a/internal/service/dynamodb/kinesis_streaming_destination_test.go +++ b/internal/service/dynamodb/kinesis_streaming_destination_test.go @@ -32,8 +32,38 @@ func TestAccDynamoDBKinesisStreamingDestination_basic(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccKinesisStreamingDestinationConfig_basic(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckKinesisStreamingDestinationExists(ctx, resourceName), + resource.TestCheckResourceAttr(resourceName, "approximate_creation_date_time_precision", ""), + acctest.MatchResourceAttrRegionalARN(resourceName, names.AttrStreamARN, "kinesis", regexache.MustCompile(fmt.Sprintf("stream/%s", rName))), + resource.TestCheckResourceAttr(resourceName, names.AttrTableName, rName), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccDynamoDBKinesisStreamingDestination_approximateCreationDateTimePrecision(t *testing.T) { + ctx := acctest.Context(t) + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_dynamodb_kinesis_streaming_destination.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, names.DynamoDBServiceID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckKinesisStreamingDestinationDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccKinesisStreamingDestinationConfig_approximateCreationDateTimePrecision(rName, "MICROSECOND"), Check: resource.ComposeTestCheckFunc( testAccCheckKinesisStreamingDestinationExists(ctx, resourceName), + resource.TestCheckResourceAttr(resourceName, "approximate_creation_date_time_precision", "MICROSECOND"), acctest.MatchResourceAttrRegionalARN(resourceName, names.AttrStreamARN, "kinesis", regexache.MustCompile(fmt.Sprintf("stream/%s", rName))), resource.TestCheckResourceAttr(resourceName, names.AttrTableName, rName), ), @@ -95,7 +125,7 @@ func TestAccDynamoDBKinesisStreamingDestination_Disappears_dynamoDBTable(t *test }) } -func testAccKinesisStreamingDestinationConfig_basic(rName string) string { +func testAccKinesisStreamingDestinationConfig_base(rName string) string { return fmt.Sprintf(` resource "aws_dynamodb_table" "test" { name = %[1]q @@ -113,12 +143,26 @@ resource "aws_kinesis_stream" "test" { name = %[1]q shard_count = 2 } +`, rName) +} +func testAccKinesisStreamingDestinationConfig_basic(rName string) string { + return acctest.ConfigCompose(testAccKinesisStreamingDestinationConfig_base(rName), ` resource "aws_dynamodb_kinesis_streaming_destination" "test" { table_name = aws_dynamodb_table.test.name stream_arn = aws_kinesis_stream.test.arn } -`, rName) +`) +} + +func testAccKinesisStreamingDestinationConfig_approximateCreationDateTimePrecision(rName, precision string) string { + return acctest.ConfigCompose(testAccKinesisStreamingDestinationConfig_base(rName), fmt.Sprintf(` +resource "aws_dynamodb_kinesis_streaming_destination" "test" { + table_name = aws_dynamodb_table.test.name + stream_arn = aws_kinesis_stream.test.arn + approximate_creation_date_time_precision = %[1]q +} +`, precision)) } func testAccCheckKinesisStreamingDestinationExists(ctx context.Context, n string) resource.TestCheckFunc { diff --git a/internal/service/firehose/delivery_stream.go b/internal/service/firehose/delivery_stream.go index 510eacfb766..fd33aff98bd 100644 --- a/internal/service/firehose/delivery_stream.go +++ b/internal/service/firehose/delivery_stream.go @@ -38,6 +38,7 @@ const ( destinationTypeElasticsearch destinationType = "elasticsearch" destinationTypeExtendedS3 destinationType = "extended_s3" destinationTypeHTTPEndpoint destinationType = "http_endpoint" + destinationTypeIceberg destinationType = "iceberg" destinationTypeOpenSearch destinationType = "opensearch" destinationTypeOpenSearchServerless destinationType = "opensearchserverless" destinationTypeRedshift destinationType = "redshift" @@ -50,6 +51,7 @@ func (destinationType) Values() []destinationType { destinationTypeElasticsearch, destinationTypeExtendedS3, destinationTypeHTTPEndpoint, + destinationTypeIceberg, destinationTypeOpenSearch, destinationTypeOpenSearchServerless, destinationTypeRedshift, @@ -61,7 +63,7 @@ func (destinationType) Values() []destinationType { // @SDKResource("aws_kinesis_firehose_delivery_stream", name="Delivery Stream") // @Tags(identifierAttribute="name") func resourceDeliveryStream() *schema.Resource { - //lintignore:R011 + // lintignore:R011 return &schema.Resource{ CreateWithoutTimeout: resourceDeliveryStreamCreate, ReadWithoutTimeout: resourceDeliveryStreamRead, @@ -118,6 +120,35 @@ func resourceDeliveryStream() *schema.Resource { }, } } + destinationTableConfigurationSchema := func() *schema.Schema { + return &schema.Schema{ + Type: schema.TypeList, + Optional: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + names.AttrDatabaseName: { + Type: schema.TypeString, + Required: true, + }, + names.AttrTableName: { + Type: schema.TypeString, + Required: true, + }, + "s3_error_output_prefix": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringLenBetween(0, 1024), + }, + "unique_keys": { + Type: schema.TypeList, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + }, + } + } dynamicPartitioningConfigurationSchema := func() *schema.Schema { return &schema.Schema{ Type: schema.TypeList, @@ -821,6 +852,52 @@ func resourceDeliveryStream() *schema.Resource { }, }, }, + "iceberg_configuration": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "buffering_interval": { + Type: schema.TypeInt, + Optional: true, + Default: 300, + }, + "buffering_size": { + Type: schema.TypeInt, + Optional: true, + Default: 5, + }, + "catalog_arn": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: verify.ValidARN, + }, + "cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(), + "destination_table_configuration": destinationTableConfigurationSchema(), + "processing_configuration": processingConfigurationSchema(), + "retry_duration": { + Type: schema.TypeInt, + Optional: true, + Default: 300, + ValidateFunc: validation.IntBetween(0, 7200), + }, + names.AttrRoleARN: { + Type: schema.TypeString, + Required: true, + ValidateFunc: verify.ValidARN, + }, + "s3_backup_mode": { + Type: schema.TypeString, + Optional: true, + Default: types.IcebergS3BackupModeFailedDataOnly, + ValidateDiagFunc: enum.Validate[types.IcebergS3BackupMode](), + }, + "s3_configuration": s3ConfigurationSchema(), + }, + }, + }, "kinesis_source_configuration": { Type: schema.TypeList, ForceNew: true, @@ -1376,6 +1453,7 @@ func resourceDeliveryStream() *schema.Resource { destinationTypeElasticsearch: "elasticsearch_configuration", destinationTypeExtendedS3: "extended_s3_configuration", destinationTypeHTTPEndpoint: "http_endpoint_configuration", + destinationTypeIceberg: "iceberg_configuration", destinationTypeOpenSearch: "opensearch_configuration", destinationTypeOpenSearchServerless: "opensearchserverless_configuration", destinationTypeRedshift: "redshift_configuration", @@ -1425,6 +1503,10 @@ func resourceDeliveryStreamCreate(ctx context.Context, d *schema.ResourceData, m if v, ok := d.GetOk("http_endpoint_configuration"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.HttpEndpointDestinationConfiguration = expandHTTPEndpointDestinationConfiguration(v.([]interface{})[0].(map[string]interface{})) } + case destinationTypeIceberg: + if v, ok := d.GetOk("iceberg_configuration"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.IcebergDestinationConfiguration = expandIcebergDestinationConfiguration(v.([]interface{})[0].(map[string]interface{})) + } case destinationTypeOpenSearch: if v, ok := d.GetOk("opensearch_configuration"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.AmazonopensearchserviceDestinationConfiguration = expandAmazonopensearchserviceDestinationConfiguration(v.([]interface{})[0].(map[string]interface{})) @@ -1546,6 +1628,11 @@ func resourceDeliveryStreamRead(ctx context.Context, d *schema.ResourceData, met if err := d.Set("http_endpoint_configuration", flattenHTTPEndpointDestinationDescription(destination.HttpEndpointDestinationDescription, configuredAccessKey)); err != nil { return sdkdiag.AppendErrorf(diags, "setting http_endpoint_configuration: %s", err) } + case destination.IcebergDestinationDescription != nil: + d.Set(names.AttrDestination, destinationTypeIceberg) + if err := d.Set("iceberg_configuration", flattenIcebergDestinationDescription(destination.IcebergDestinationDescription)); err != nil { + return sdkdiag.AppendErrorf(diags, "setting iceberg_configuration: %s", err) + } case destination.AmazonopensearchserviceDestinationDescription != nil: d.Set(names.AttrDestination, destinationTypeOpenSearch) if err := d.Set("opensearch_configuration", flattenAmazonopensearchserviceDestinationDescription(destination.AmazonopensearchserviceDestinationDescription)); err != nil { @@ -1612,6 +1699,10 @@ func resourceDeliveryStreamUpdate(ctx context.Context, d *schema.ResourceData, m if v, ok := d.GetOk("http_endpoint_configuration"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.HttpEndpointDestinationUpdate = expandHTTPEndpointDestinationUpdate(v.([]interface{})[0].(map[string]interface{})) } + case destinationTypeIceberg: + if v, ok := d.GetOk("iceberg_configuration"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.IcebergDestinationUpdate = expandIcebergDestinationUpdate(v.([]interface{})[0].(map[string]interface{})) + } case destinationTypeOpenSearch: if v, ok := d.GetOk("opensearch_configuration"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.AmazonopensearchserviceDestinationUpdate = expandAmazonopensearchserviceDestinationUpdate(v.([]interface{})[0].(map[string]interface{})) @@ -1817,7 +1908,6 @@ func waitDeliveryStreamDeleted(ctx context.Context, conn *firehose.Client, name func findDeliveryStreamEncryptionConfigurationByName(ctx context.Context, conn *firehose.Client, name string) (*types.DeliveryStreamEncryptionConfiguration, error) { output, err := findDeliveryStreamByName(ctx, conn, name) - if err != nil { return nil, err } @@ -2434,6 +2524,86 @@ func expandPrefix(s3 map[string]interface{}) *string { return nil } +func expandIcebergDestinationConfiguration(tfMap map[string]interface{}) *types.IcebergDestinationConfiguration { + roleARN := tfMap[names.AttrRoleARN].(string) + apiObject := &types.IcebergDestinationConfiguration{ + BufferingHints: &types.BufferingHints{ + IntervalInSeconds: aws.Int32(int32(tfMap["buffering_interval"].(int))), + SizeInMBs: aws.Int32(int32(tfMap["buffering_size"].(int))), + }, + CatalogConfiguration: &types.CatalogConfiguration{ + CatalogARN: aws.String(tfMap["catalog_arn"].(string)), + }, + RoleARN: aws.String(roleARN), + S3Configuration: expandS3DestinationConfiguration(tfMap["s3_configuration"].([]interface{})), + } + + if _, ok := tfMap["cloudwatch_logging_options"]; ok { + apiObject.CloudWatchLoggingOptions = expandCloudWatchLoggingOptions(tfMap) + } + + if _, ok := tfMap["destination_table_configuration"]; ok { + apiObject.DestinationTableConfigurationList = expandDestinationTableConfigurationList(tfMap) + } + + if _, ok := tfMap["processing_configuration"]; ok { + apiObject.ProcessingConfiguration = expandProcessingConfiguration(tfMap, destinationTypeIceberg, roleARN) + } + + if _, ok := tfMap["retry_duration"]; ok { + apiObject.RetryOptions = expandIcebergRetryOptions(tfMap) + } + + if v, ok := tfMap["s3_backup_mode"]; ok { + apiObject.S3BackupMode = types.IcebergS3BackupMode(v.(string)) + } + + return apiObject +} + +func expandIcebergDestinationUpdate(tfMap map[string]interface{}) *types.IcebergDestinationUpdate { + roleARN := tfMap[names.AttrRoleARN].(string) + apiObject := &types.IcebergDestinationUpdate{ + BufferingHints: &types.BufferingHints{ + IntervalInSeconds: aws.Int32(int32(tfMap["buffering_interval"].(int))), + SizeInMBs: aws.Int32(int32(tfMap["buffering_size"].(int))), + }, + RoleARN: aws.String(roleARN), + } + + if catalogARN, ok := tfMap["catalog_arn"].(string); ok { + apiObject.CatalogConfiguration = &types.CatalogConfiguration{ + CatalogARN: aws.String(catalogARN), + } + } + + if _, ok := tfMap["cloudwatch_logging_options"]; ok { + apiObject.CloudWatchLoggingOptions = expandCloudWatchLoggingOptions(tfMap) + } + + if _, ok := tfMap["destination_table_configuration"]; ok { + apiObject.DestinationTableConfigurationList = expandDestinationTableConfigurationList(tfMap) + } + + if _, ok := tfMap["processing_configuration"]; ok { + apiObject.ProcessingConfiguration = expandProcessingConfiguration(tfMap, destinationTypeIceberg, roleARN) + } + + if _, ok := tfMap["retry_duration"]; ok { + apiObject.RetryOptions = expandIcebergRetryOptions(tfMap) + } + + if v, ok := tfMap["s3_backup_mode"]; ok { + apiObject.S3BackupMode = types.IcebergS3BackupMode(v.(string)) + } + + if v, ok := tfMap["s3_configuration"]; ok { + apiObject.S3Configuration = expandS3DestinationConfiguration(v.([]interface{})) + } + + return apiObject +} + func expandRedshiftDestinationConfiguration(tfMap map[string]interface{}) *types.RedshiftDestinationConfiguration { roleARN := tfMap[names.AttrRoleARN].(string) apiObject := &types.RedshiftDestinationConfiguration{ @@ -3123,6 +3293,16 @@ func expandAmazonOpenSearchServerlessBufferingHints(es map[string]interface{}) * return bufferingHints } +func expandIcebergRetryOptions(tfMap map[string]interface{}) *types.RetryOptions { + apiObject := &types.RetryOptions{} + + if v, ok := tfMap["retry_duration"].(int); ok { + apiObject.DurationInSeconds = aws.Int32(int32(v)) + } + + return apiObject +} + func expandElasticsearchRetryOptions(es map[string]interface{}) *types.ElasticsearchRetryOptions { retryOptions := &types.ElasticsearchRetryOptions{} @@ -3230,6 +3410,37 @@ func expandSplunkRetryOptions(splunk map[string]interface{}) *types.SplunkRetryO return retryOptions } +func expandDestinationTableConfigurationList(tfMap map[string]interface{}) []types.DestinationTableConfiguration { + tfList := tfMap["destination_table_configuration"].([]interface{}) + if len(tfList) == 0 { + return nil + } + + apiObjects := make([]types.DestinationTableConfiguration, 0, len(tfList)) + for _, table := range tfList { + apiObjects = append(apiObjects, expandDestinationTableConfiguration(table.(map[string]interface{}))) + } + + return apiObjects +} + +func expandDestinationTableConfiguration(tfMap map[string]interface{}) types.DestinationTableConfiguration { + apiObject := types.DestinationTableConfiguration{ + DestinationDatabaseName: aws.String(tfMap[names.AttrDatabaseName].(string)), + DestinationTableName: aws.String(tfMap[names.AttrTableName].(string)), + } + + if v, ok := tfMap["s3_error_output_prefix"].(string); ok { + apiObject.S3ErrorOutputPrefix = aws.String(v) + } + + if v, ok := tfMap["unique_keys"].([]interface{}); ok { + apiObject.UniqueKeys = flex.ExpandStringValueList(v) + } + + return apiObject +} + func expandCopyCommand(redshift map[string]interface{}) *types.CopyCommand { cmd := &types.CopyCommand{ DataTableName: aws.String(redshift["data_table_name"].(string)), @@ -4012,6 +4223,54 @@ func flattenHTTPEndpointDestinationDescription(apiObject *types.HttpEndpointDest return []interface{}{tfMap} } +func flattenIcebergDestinationDescription(apiObject *types.IcebergDestinationDescription) []interface{} { + if apiObject == nil { + return []interface{}{} + } + + tfMap := map[string]interface{}{ + "catalog_arn": aws.ToString(apiObject.CatalogConfiguration.CatalogARN), + "s3_configuration": flattenS3DestinationDescription(apiObject.S3DestinationDescription), + names.AttrRoleARN: aws.ToString(apiObject.RoleARN), + } + + if apiObject.BufferingHints != nil { + tfMap["buffering_interval"] = int(aws.ToInt32(apiObject.BufferingHints.IntervalInSeconds)) + tfMap["buffering_size"] = int(aws.ToInt32(apiObject.BufferingHints.SizeInMBs)) + } + + if apiObject.CloudWatchLoggingOptions != nil { + tfMap["cloudwatch_logging_options"] = flattenCloudWatchLoggingOptions(apiObject.CloudWatchLoggingOptions) + } + + if apiObject.DestinationTableConfigurationList != nil { + tableConfigurations := make([]map[string]interface{}, 0, len(apiObject.DestinationTableConfigurationList)) + for _, table := range apiObject.DestinationTableConfigurationList { + tableConfigurations = append(tableConfigurations, map[string]interface{}{ + names.AttrDatabaseName: aws.ToString(table.DestinationDatabaseName), + names.AttrTableName: aws.ToString(table.DestinationTableName), + "s3_error_output_prefix": table.S3ErrorOutputPrefix, + "unique_keys": table.UniqueKeys, + }) + } + tfMap["destination_table_configuration"] = tableConfigurations + } + + if apiObject.ProcessingConfiguration != nil { + tfMap["processing_configuration"] = flattenProcessingConfiguration(apiObject.ProcessingConfiguration, destinationTypeIceberg, aws.ToString(apiObject.RoleARN)) + } + + if apiObject.RetryOptions != nil { + tfMap["retry_duration"] = int(aws.ToInt32(apiObject.RetryOptions.DurationInSeconds)) + } + + if apiObject.S3BackupMode != "" { + tfMap["s3_backup_mode"] = apiObject.S3BackupMode + } + + return []interface{}{tfMap} +} + func expandDocumentIDOptions(tfMap map[string]interface{}) *types.DocumentIdOptions { if tfMap == nil { return nil diff --git a/internal/service/firehose/delivery_stream_test.go b/internal/service/firehose/delivery_stream_test.go index 443bb0f2d8b..18e33a8491d 100644 --- a/internal/service/firehose/delivery_stream_test.go +++ b/internal/service/firehose/delivery_stream_test.go @@ -80,6 +80,7 @@ func TestAccFirehoseDeliveryStream_basic(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.s3_backup_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.s3_backup_mode", "Disabled"), resource.TestCheckResourceAttr(resourceName, "http_endpoint_configuration.#", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "kinesis_source_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, names.AttrName, rName), @@ -1005,6 +1006,141 @@ func TestAccFirehoseDeliveryStream_ExtendedS3_mskClusterSource(t *testing.T) { }) } +func TestAccFirehoseDeliveryStream_icebergUpdates(t *testing.T) { + // "InvalidArgumentException: Role ... is not authorized to perform: glue:GetTable for the given table or the table does not exist." + acctest.Skip(t, "Unresolvable Glue permission issue") + + ctx := acctest.Context(t) + var stream types.DeliveryStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_kinesis_firehose_delivery_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, names.FirehoseServiceID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckDeliveryStreamDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccDeliveryStream_iceberg(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(ctx, resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.role_arn"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.s3_configuration.0.bucket_arn"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.s3_configuration.0.role_arn"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.buffering_interval", "300"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.buffering_size", "5"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.log_group_name", ""), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.log_stream_name", ""), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.destination_table_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.destination_table_configuration.0.database_name"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.destination_table_configuration.0.table_name"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.0.enabled", acctest.CtFalse), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.retry_options.#", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.s3_backup_mode", "FailedDataOnly"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStream_icebergUpdates(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckDeliveryStreamExists(ctx, resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.role_arn"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.s3_configuration.0.bucket_arn"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.s3_configuration.0.role_arn"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.buffering_interval", "900"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.buffering_size", "100"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.log_group_name", ""), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.log_stream_name", ""), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.destination_table_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.destination_table_configuration.0.database_name"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.destination_table_configuration.0.table_name"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.0.enabled", acctest.CtFalse), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.s3_backup_mode.#", acctest.Ct0), + ), + }, + { + Config: testAccDeliveryStream_icebergUpdatesMetadataProcessor(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(ctx, resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.role_arn"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.s3_configuration.0.bucket_arn"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.s3_configuration.0.role_arn"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.buffering_interval", "300"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.buffering_size", "5"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.log_group_name", ""), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.log_stream_name", ""), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.destination_table_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.destination_table_configuration.0.database_name"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.destination_table_configuration.0.table_name"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.destination_table_configuration.0.s3_error_output_prefix", "error"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.destination_table_configuration.0.unique_keys.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.destination_table_configuration.0.unique_keys.0", "my_column_1"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.type", "MetadataExtraction"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.parameters.#", acctest.Ct2), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.parameters.*", map[string]string{ + "parameter_name": "MetadataExtractionQuery", + "parameter_value": "{destinationDatabaseName: .databaseName, destinationTableName: .tableName, operation: .operation}", + }), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.parameters.*", map[string]string{ + "parameter_name": "JsonParsingEngine", + "parameter_value": "JQ-1.6", + }), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.retry_options.#", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.s3_backup_mode", "FailedDataOnly"), + ), + }, + { + Config: testAccDeliveryStream_icebergUpdatesLambdaProcessor(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckDeliveryStreamExists(ctx, resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.role_arn"), + resource.TestCheckResourceAttrSet(resourceName, "iceberg_configuration.0.s3_configuration.0.bucket_arn"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.buffering_interval", "300"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.buffering_size", "5"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.log_group_name", ""), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.cloudwatch_logging_options.0.log_stream_name", ""), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.destination_table_configuration.#", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.#", acctest.Ct1), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.type", "Lambda"), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.parameters.#", acctest.Ct3), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.parameters.*", map[string]string{ + "parameter_name": "LambdaArn", + }), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.parameters.*", map[string]string{ + "parameter_name": "RoleArn", + }), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "iceberg_configuration.0.processing_configuration.0.processors.0.parameters.*", map[string]string{ + "parameter_name": "NumberOfRetries", + "parameter_value": "5", + }), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.retry_options.#", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.0.s3_backup_mode.#", acctest.Ct0), + ), + }, + }, + }) +} + func TestAccFirehoseDeliveryStream_redshiftUpdates(t *testing.T) { ctx := acctest.Context(t) var stream types.DeliveryStreamDescription @@ -2119,7 +2255,9 @@ func TestAccFirehoseDeliveryStream_openSearchServerlessUpdates(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "elasticsearch_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "http_endpoint_configuration.#", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "kinesis_source_configuration.#", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, names.AttrName, rName), resource.TestCheckResourceAttr(resourceName, "opensearch_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "opensearchserverless_configuration.#", acctest.Ct1), @@ -2177,6 +2315,7 @@ func TestAccFirehoseDeliveryStream_openSearchServerlessUpdates(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "elasticsearch_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "http_endpoint_configuration.#", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "iceberg_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "kinesis_source_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.#", acctest.Ct0), resource.TestCheckResourceAttr(resourceName, names.AttrName, rName), @@ -2277,7 +2416,6 @@ func testAccCheckDeliveryStreamExists(ctx context.Context, n string, v *types.De conn := acctest.Provider.Meta().(*conns.AWSClient).FirehoseClient(ctx) output, err := tffirehose.FindDeliveryStreamByName(ctx, conn, rs.Primary.Attributes[names.AttrName]) - if err != nil { return err } @@ -3830,6 +3968,276 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { `, rName)) } +func testAccDeliveryStreamConfig_baseIceberg(rName string) string { + return acctest.ConfigCompose( + testAccDeliveryStreamConfig_baseLambda(rName), + fmt.Sprintf(` +data "aws_caller_identity" "current" {} +data "aws_partition" "current" {} +data "aws_region" "current" {} + +resource "aws_iam_role" "firehose" { + name = %[1]q + + assume_role_policy = <