Skip to content

Commit

Permalink
Merge pull request #37799 from nikhil-goenka/r/aws_dynamodb_table
Browse files Browse the repository at this point in the history
r/aws_dynamodb_table: add `on_demand_throughput` attribute
  • Loading branch information
johnsonaj authored Oct 14, 2024
2 parents ae86f76 + bfedf00 commit f01c7f5
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .changelog/37799.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_dynamodb_table: Add `on_demand_throughput` and `global_secondary_index.on_demand_throughput` arguments
```
13 changes: 13 additions & 0 deletions internal/service/dynamodb/forge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,16 @@ func stripNonKeyAttributes(in map[string]interface{}) (map[string]interface{}, e

return m, nil
}

func stripOnDemandThroughputAttributes(in map[string]interface{}) (map[string]interface{}, error) {
mapCopy, err := copystructure.Copy(in)
if err != nil {
return nil, err
}

m := mapCopy.(map[string]interface{})

delete(m, "on_demand_throughput")

return m, nil
}
159 changes: 152 additions & 7 deletions internal/service/dynamodb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func resourceTable() *schema.Resource {
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"on_demand_throughput": onDemandThroughputSchema(),
"projection_type": {
Type: schema.TypeString,
Required: true,
Expand All @@ -197,10 +198,12 @@ func resourceTable() *schema.Resource {
"read_capacity": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
},
"write_capacity": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
},
},
},
Expand Down Expand Up @@ -248,6 +251,7 @@ func resourceTable() *schema.Resource {
Required: true,
ForceNew: true,
},
"on_demand_throughput": onDemandThroughputSchema(),
"point_in_time_recovery": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -487,6 +491,34 @@ func resourceTable() *schema.Resource {
}
}

func onDemandThroughputSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"max_read_request_units": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool {
return old == "0" && new == "-1"
},
},
"max_write_request_units": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool {
return old == "0" && new == "-1"
},
},
},
},
}
}

func resourceTableCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).DynamoDBClient(ctx)
Expand Down Expand Up @@ -623,6 +655,10 @@ func resourceTableCreate(ctx context.Context, d *schema.ResourceData, meta inter
tcp.GlobalSecondaryIndexes = globalSecondaryIndexes
}

if v, ok := d.GetOk("on_demand_throughput"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
tcp.OnDemandThroughput = expandOnDemandThroughput(v.([]interface{})[0].(map[string]interface{}))
}

input.TableCreationParameters = tcp

importTableOutput, err := tfresource.RetryWhen(ctx, createTableTimeout, func() (interface{}, error) {
Expand Down Expand Up @@ -697,6 +733,10 @@ func resourceTableCreate(ctx context.Context, d *schema.ResourceData, meta inter
input.GlobalSecondaryIndexes = globalSecondaryIndexes
}

if v, ok := d.GetOk("on_demand_throughput"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.OnDemandThroughput = expandOnDemandThroughput(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("stream_enabled"); ok {
input.StreamSpecification = &awstypes.StreamSpecification{
StreamEnabled: aws.Bool(v.(bool)),
Expand Down Expand Up @@ -832,6 +872,10 @@ func resourceTableRead(ctx context.Context, d *schema.ResourceData, meta interfa
return create.AppendDiagSettingError(diags, names.DynamoDB, resNameTable, d.Id(), "global_secondary_index", err)
}

if err := d.Set("on_demand_throughput", flattenOnDemandThroughput(table.OnDemandThroughput)); err != nil {
return create.AppendDiagSettingError(diags, names.DynamoDB, resNameTable, d.Id(), "on_demand_throughput", err)
}

if table.StreamSpecification != nil {
d.Set("stream_enabled", table.StreamSpecification.StreamEnabled)
d.Set("stream_view_type", table.StreamSpecification.StreamViewType)
Expand Down Expand Up @@ -985,6 +1029,13 @@ func resourceTableUpdate(ctx context.Context, d *schema.ResourceData, meta inter
input.DeletionProtectionEnabled = aws.Bool(d.Get("deletion_protection_enabled").(bool))
}

if d.HasChange("on_demand_throughput") {
hasTableUpdate = true
if v, ok := d.GetOk("on_demand_throughput"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.OnDemandThroughput = expandOnDemandThroughput(v.([]interface{})[0].(map[string]interface{}))
}
}

// make change when
// stream_enabled has change (below) OR
// stream_view_type has change and stream_enabled is true (special case)
Expand Down Expand Up @@ -1025,6 +1076,18 @@ func resourceTableUpdate(ctx context.Context, d *schema.ResourceData, meta inter
}
}

// update only on-demand throughput indexes when switching to PAY_PER_REQUEST
if newBillingMode == awstypes.BillingModePayPerRequest {
for _, gsiUpdate := range gsiUpdates {
if gsiUpdate.Update.OnDemandThroughput == nil {
continue
}

hasTableUpdate = true
input.GlobalSecondaryIndexUpdates = append(input.GlobalSecondaryIndexUpdates, gsiUpdate)
}
}

if hasTableUpdate {
_, err := conn.UpdateTable(ctx, input)

Expand Down Expand Up @@ -1599,13 +1662,19 @@ func updateDiffGSI(oldGsi, newGsi []interface{}, billingMode awstypes.BillingMod
m := data.(map[string]interface{})
idxName := m[names.AttrName].(string)

c := awstypes.CreateGlobalSecondaryIndexAction{
IndexName: aws.String(idxName),
KeySchema: expandKeySchema(m),
ProvisionedThroughput: expandProvisionedThroughput(m, billingMode),
Projection: expandProjection(m),
}

if v, ok := m["on_demand_throughput"].([]any); ok && len(v) > 0 && v[0] != nil {
c.OnDemandThroughput = expandOnDemandThroughput(v[0].(map[string]any))
}

ops = append(ops, awstypes.GlobalSecondaryIndexUpdate{
Create: &awstypes.CreateGlobalSecondaryIndexAction{
IndexName: aws.String(idxName),
KeySchema: expandKeySchema(m),
ProvisionedThroughput: expandProvisionedThroughput(m, billingMode),
Projection: expandProjection(m),
},
Create: &c,
})
}
}
Expand All @@ -1623,6 +1692,20 @@ func updateDiffGSI(oldGsi, newGsi []interface{}, billingMode awstypes.BillingMod
newWriteCapacity, newReadCapacity := newMap["write_capacity"].(int), newMap["read_capacity"].(int)
capacityChanged := (oldWriteCapacity != newWriteCapacity || oldReadCapacity != newReadCapacity)

oldOnDemandThroughput := &awstypes.OnDemandThroughput{}
newOnDemandThroughput := &awstypes.OnDemandThroughput{}
if v, ok := oldMap["on_demand_throughput"].([]any); ok && len(v) > 0 && v[0] != nil {
oldOnDemandThroughput = expandOnDemandThroughput(v[0].(map[string]any))
}

if v, ok := newMap["on_demand_throughput"].([]any); ok && len(v) > 0 && v[0] != nil {
newOnDemandThroughput = expandOnDemandThroughput(v[0].(map[string]any))
}
var onDemandThroughputChanged bool
if !reflect.DeepEqual(oldOnDemandThroughput, newOnDemandThroughput) {
onDemandThroughputChanged = true
}

// pluck non_key_attributes from oldAttributes and newAttributes as reflect.DeepEquals will compare
// ordinal of elements in its equality (which we actually don't care about)
nonKeyAttributesChanged := checkIfNonKeyAttributesChanged(oldMap, newMap)
Expand All @@ -1635,6 +1718,10 @@ func updateDiffGSI(oldGsi, newGsi []interface{}, billingMode awstypes.BillingMod
if err != nil {
return ops, err
}
oldAttributes, err = stripOnDemandThroughputAttributes(oldAttributes)
if err != nil {
return ops, err
}
newAttributes, err := stripCapacityAttributes(newMap)
if err != nil {
return ops, err
Expand All @@ -1643,6 +1730,10 @@ func updateDiffGSI(oldGsi, newGsi []interface{}, billingMode awstypes.BillingMod
if err != nil {
return ops, err
}
newAttributes, err = stripOnDemandThroughputAttributes(newAttributes)
if err != nil {
return ops, err
}
otherAttributesChanged := nonKeyAttributesChanged || !reflect.DeepEqual(oldAttributes, newAttributes)

if capacityChanged && !otherAttributesChanged {
Expand All @@ -1653,6 +1744,14 @@ func updateDiffGSI(oldGsi, newGsi []interface{}, billingMode awstypes.BillingMod
},
}
ops = append(ops, update)
} else if onDemandThroughputChanged && !otherAttributesChanged {
update := awstypes.GlobalSecondaryIndexUpdate{
Update: &awstypes.UpdateGlobalSecondaryIndexAction{
IndexName: aws.String(idxName),
OnDemandThroughput: newOnDemandThroughput,
},
}
ops = append(ops, update)
} else if otherAttributesChanged {
// Other attributes cannot be updated
ops = append(ops, awstypes.GlobalSecondaryIndexUpdate{
Expand Down Expand Up @@ -2039,6 +2138,10 @@ func flattenTableGlobalSecondaryIndex(gsi []awstypes.GlobalSecondaryIndexDescrip
gsi["non_key_attributes"] = g.Projection.NonKeyAttributes
}

if g.OnDemandThroughput != nil {
gsi["on_demand_throughput"] = flattenOnDemandThroughput(g.OnDemandThroughput)
}

output = append(output, gsi)
}

Expand Down Expand Up @@ -2070,6 +2173,24 @@ func expandAttributes(cfg []interface{}) []awstypes.AttributeDefinition {
return attributes
}

func flattenOnDemandThroughput(apiObject *awstypes.OnDemandThroughput) []interface{} {
if apiObject == nil {
return []interface{}{}
}

m := map[string]interface{}{}

if v := apiObject.MaxReadRequestUnits; v != nil {
m["max_read_request_units"] = aws.ToInt64(v)
}

if v := apiObject.MaxWriteRequestUnits; v != nil {
m["max_write_request_units"] = aws.ToInt64(v)
}

return []interface{}{m}
}

func flattenReplicaDescription(apiObject *awstypes.ReplicaDescription) map[string]interface{} {
if apiObject == nil {
return nil
Expand Down Expand Up @@ -2186,12 +2307,18 @@ func expandImportTable(data map[string]interface{}) *dynamodb.ImportTableInput {
}

func expandGlobalSecondaryIndex(data map[string]interface{}, billingMode awstypes.BillingMode) *awstypes.GlobalSecondaryIndex {
return &awstypes.GlobalSecondaryIndex{
output := awstypes.GlobalSecondaryIndex{
IndexName: aws.String(data[names.AttrName].(string)),
KeySchema: expandKeySchema(data),
Projection: expandProjection(data),
ProvisionedThroughput: expandProvisionedThroughput(data, billingMode),
}

if v, ok := data["on_demand_throughput"].([]any); ok && len(v) > 0 && v[0] != nil {
output.OnDemandThroughput = expandOnDemandThroughput(v[0].(map[string]any))
}

return &output
}

func expandProvisionedThroughput(data map[string]interface{}, billingMode awstypes.BillingMode) *awstypes.ProvisionedThroughput {
Expand Down Expand Up @@ -2300,6 +2427,24 @@ func expandInputFormatOptions(data []interface{}) *awstypes.InputFormatOptions {
return a
}

func expandOnDemandThroughput(tfMap map[string]interface{}) *awstypes.OnDemandThroughput {
if tfMap == nil {
return nil
}

apiObject := &awstypes.OnDemandThroughput{}

if v, ok := tfMap["max_read_request_units"].(int); ok {
apiObject.MaxReadRequestUnits = aws.Int64(int64(v))
}

if v, ok := tfMap["max_write_request_units"].(int); ok {
apiObject.MaxWriteRequestUnits = aws.Int64(int64(v))
}

return apiObject
}

func expandS3BucketSource(data map[string]interface{}) *awstypes.S3BucketSource {
if data == nil {
return nil
Expand Down
Loading

0 comments on commit f01c7f5

Please sign in to comment.