Skip to content

Commit

Permalink
Add support for setting Pub/Sub Cloud Storage subscription max_messag…
Browse files Browse the repository at this point in the history
…es and use_topic_schema (#11583) (#19338)

[upstream:692bae292bf8c33f8443e60cc53bab800b4821a2]

Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician authored Sep 2, 2024
1 parent 6e01e99 commit 0f957d6
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .changelog/11583.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
pubsub: added `cloud_storage_config.max_messages` and `cloud_storage_config.avro_config.use_topic_schema` fields to `google_pubsub_subscription` resource
```
57 changes: 57 additions & 0 deletions google/services/pubsub/resource_pubsub_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ If all three are empty, then the subscriber will pull and ack messages using API
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"use_topic_schema": {
Type: schema.TypeBool,
Optional: true,
Description: `When true, the output Cloud Storage file will be serialized using the topic schema, if it exists.`,
},
"write_metadata": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -240,6 +245,11 @@ May not exceed the subscription's acknowledgement deadline.
A duration in seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".`,
Default: "300s",
},
"max_messages": {
Type: schema.TypeInt,
Optional: true,
Description: `The maximum messages that can be written to a Cloud Storage file before a new file is created. Min 1000 messages.`,
},
"service_account_email": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -1178,6 +1188,8 @@ func flattenPubsubSubscriptionCloudStorageConfig(v interface{}, d *schema.Resour
flattenPubsubSubscriptionCloudStorageConfigMaxDuration(original["maxDuration"], d, config)
transformed["max_bytes"] =
flattenPubsubSubscriptionCloudStorageConfigMaxBytes(original["maxBytes"], d, config)
transformed["max_messages"] =
flattenPubsubSubscriptionCloudStorageConfigMaxMessages(original["maxMessages"], d, config)
transformed["state"] =
flattenPubsubSubscriptionCloudStorageConfigState(original["state"], d, config)
transformed["avro_config"] =
Expand Down Expand Up @@ -1223,6 +1235,23 @@ func flattenPubsubSubscriptionCloudStorageConfigMaxBytes(v interface{}, d *schem
return v // let terraform core handle it otherwise
}

func flattenPubsubSubscriptionCloudStorageConfigMaxMessages(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
// Handles the string fixed64 format
if strVal, ok := v.(string); ok {
if intVal, err := tpgresource.StringToFixed64(strVal); err == nil {
return intVal
}
}

// number values are represented as float64
if floatVal, ok := v.(float64); ok {
intVal := int(floatVal)
return intVal
}

return v // let terraform core handle it otherwise
}

func flattenPubsubSubscriptionCloudStorageConfigState(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}
Expand All @@ -1238,12 +1267,18 @@ func flattenPubsubSubscriptionCloudStorageConfigAvroConfig(v interface{}, d *sch
transformed := make(map[string]interface{})
transformed["write_metadata"] =
flattenPubsubSubscriptionCloudStorageConfigAvroConfigWriteMetadata(original["writeMetadata"], d, config)
transformed["use_topic_schema"] =
flattenPubsubSubscriptionCloudStorageConfigAvroConfigUseTopicSchema(original["useTopicSchema"], d, config)
return []interface{}{transformed}
}
func flattenPubsubSubscriptionCloudStorageConfigAvroConfigWriteMetadata(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubSubscriptionCloudStorageConfigAvroConfigUseTopicSchema(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubSubscriptionCloudStorageConfigServiceAccountEmail(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}
Expand Down Expand Up @@ -1599,6 +1634,13 @@ func expandPubsubSubscriptionCloudStorageConfig(v interface{}, d tpgresource.Ter
transformed["maxBytes"] = transformedMaxBytes
}

transformedMaxMessages, err := expandPubsubSubscriptionCloudStorageConfigMaxMessages(original["max_messages"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMaxMessages); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["maxMessages"] = transformedMaxMessages
}

transformedState, err := expandPubsubSubscriptionCloudStorageConfigState(original["state"], d, config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1647,6 +1689,10 @@ func expandPubsubSubscriptionCloudStorageConfigMaxBytes(v interface{}, d tpgreso
return v, nil
}

func expandPubsubSubscriptionCloudStorageConfigMaxMessages(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionCloudStorageConfigState(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}
Expand All @@ -1667,13 +1713,24 @@ func expandPubsubSubscriptionCloudStorageConfigAvroConfig(v interface{}, d tpgre
transformed["writeMetadata"] = transformedWriteMetadata
}

transformedUseTopicSchema, err := expandPubsubSubscriptionCloudStorageConfigAvroConfigUseTopicSchema(original["use_topic_schema"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedUseTopicSchema); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["useTopicSchema"] = transformedUseTopicSchema
}

return transformed, nil
}

func expandPubsubSubscriptionCloudStorageConfigAvroConfigWriteMetadata(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionCloudStorageConfigAvroConfigUseTopicSchema(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionCloudStorageConfigServiceAccountEmail(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ resource "google_pubsub_subscription" "example" {
max_bytes = 1000
max_duration = "300s"
max_messages = 1000
}
depends_on = [
google_storage_bucket.example,
Expand Down Expand Up @@ -551,9 +552,11 @@ resource "google_pubsub_subscription" "example" {
max_bytes = 1000
max_duration = "300s"
max_messages = 1000
avro_config {
write_metadata = true
use_topic_schema = true
}
}
depends_on = [
Expand Down
89 changes: 69 additions & 20 deletions google/services/pubsub/resource_pubsub_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestAccPubsubSubscriptionBigQuery_serviceAccount(t *testing.T) {
})
}

func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
func TestAccPubsubSubscriptionCloudStorage_updateText(t *testing.T) {
t.Parallel()

bucket := fmt.Sprintf("tf-test-bucket-%s", acctest.RandString(t, 10))
Expand All @@ -265,7 +265,7 @@ func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", ""),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "", "text"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand All @@ -274,7 +274,41 @@ func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
ImportStateVerify: true,
},
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", ""),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "text"),
},
{
ResourceName: "google_pubsub_subscription.foo",
ImportStateId: subscriptionShort,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccPubsubSubscriptionCloudStorage_updateAvro(t *testing.T) {
t.Parallel()

bucket := fmt.Sprintf("tf-test-bucket-%s", acctest.RandString(t, 10))
topic := fmt.Sprintf("tf-test-topic-%s", acctest.RandString(t, 10))
subscriptionShort := fmt.Sprintf("tf-test-sub-%s", acctest.RandString(t, 10))

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "", "avro"),
},
{
ResourceName: "google_pubsub_subscription.foo",
ImportStateId: subscriptionShort,
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "avro"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand All @@ -299,7 +333,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", "gcs-test-sa"),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "gcs-test-sa", "text"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand All @@ -308,7 +342,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
ImportStateVerify: true,
},
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", ""),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "text"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand All @@ -317,7 +351,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
ImportStateVerify: true,
},
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", "gcs-test-sa2"),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "gcs-test-sa2", "avro"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand Down Expand Up @@ -599,10 +633,10 @@ resource "google_pubsub_subscription" "foo" {
}

func testAccPubsubSubscriptionBigQuery_basic(dataset, table, topic, subscription string, useTableSchema bool, serviceAccountId string) string {
serivceAccountEmailField := ""
serivceAccountResource := ""
serviceAccountEmailField := ""
serviceAccountResource := ""
if serviceAccountId != "" {
serivceAccountResource = fmt.Sprintf(`
serviceAccountResource = fmt.Sprintf(`
resource "google_service_account" "bq_write_service_account" {
account_id = "%s"
display_name = "BQ Write Service Account"
Expand All @@ -619,9 +653,9 @@ resource "google_project_iam_member" "editor" {
role = "roles/bigquery.dataEditor"
member = "serviceAccount:${google_service_account.bq_write_service_account.email}"
}`, serviceAccountId)
serivceAccountEmailField = "service_account_email = google_service_account.bq_write_service_account.email"
serviceAccountEmailField = "service_account_email = google_service_account.bq_write_service_account.email"
} else {
serivceAccountResource = fmt.Sprintf(`
serviceAccountResource = fmt.Sprintf(`
resource "google_project_iam_member" "viewer" {
project = data.google_project.project.project_id
role = "roles/bigquery.metadataViewer"
Expand Down Expand Up @@ -681,10 +715,10 @@ resource "google_pubsub_subscription" "foo" {
google_project_iam_member.editor
]
}
`, serivceAccountResource, dataset, table, topic, subscription, useTableSchema, serivceAccountEmailField)
`, serviceAccountResource, dataset, table, topic, subscription, useTableSchema, serviceAccountEmailField)
}

func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, filenamePrefix, filenameSuffix, filenameDatetimeFormat string, maxBytes int, maxDuration string, serviceAccountId string) string {
func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, filenamePrefix, filenameSuffix, filenameDatetimeFormat string, maxBytes int, maxDuration string, maxMessages int, serviceAccountId, outputFormat string) string {
filenamePrefixString := ""
if filenamePrefix != "" {
filenamePrefixString = fmt.Sprintf(`filename_prefix = "%s"`, filenamePrefix)
Expand All @@ -705,11 +739,15 @@ func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, fi
if maxDuration != "" {
maxDurationString = fmt.Sprintf(`max_duration = "%s"`, maxDuration)
}
maxMessagesString := ""
if maxMessages != 0 {
maxMessagesString = fmt.Sprintf(`max_messages = %d`, maxMessages)
}

serivceAccountEmailField := ""
serivceAccountResource := ""
serviceAccountEmailField := ""
serviceAccountResource := ""
if serviceAccountId != "" {
serivceAccountResource = fmt.Sprintf(`
serviceAccountResource = fmt.Sprintf(`
resource "google_service_account" "storage_write_service_account" {
account_id = "%s"
display_name = "Write Service Account"
Expand All @@ -726,14 +764,23 @@ resource "google_project_iam_member" "editor" {
role = "roles/bigquery.dataEditor"
member = "serviceAccount:${google_service_account.storage_write_service_account.email}"
}`, serviceAccountId)
serivceAccountEmailField = "service_account_email = google_service_account.storage_write_service_account.email"
serviceAccountEmailField = "service_account_email = google_service_account.storage_write_service_account.email"
} else {
serivceAccountResource = fmt.Sprintf(`
serviceAccountResource = fmt.Sprintf(`
resource "google_storage_bucket_iam_member" "admin" {
bucket = google_storage_bucket.test.name
role = "roles/storage.admin"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}`)
}
outputFormatString := ""
if outputFormat == "avro" {
outputFormatString = `
avro_config {
write_metadata = true
use_topic_schema = true
}
`
}
return fmt.Sprintf(`
data "google_project" "project" { }
Expand All @@ -760,15 +807,17 @@ resource "google_pubsub_subscription" "foo" {
%s
%s
%s
%s
%s
%s
%s
}
depends_on = [
google_storage_bucket.test,
google_storage_bucket_iam_member.admin,
]
}
`, bucket, serivceAccountResource, topic, subscription, filenamePrefixString, filenameSuffixString, filenameDatetimeString, maxBytesString, maxDurationString, serivceAccountEmailField)
`, bucket, serviceAccountResource, topic, subscription, filenamePrefixString, filenameSuffixString, filenameDatetimeString, maxBytesString, maxDurationString, maxMessagesString, serviceAccountEmailField, outputFormatString)
}

func testAccPubsubSubscription_topicOnly(topic string) string {
Expand Down
11 changes: 11 additions & 0 deletions website/docs/r/pubsub_subscription.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ resource "google_pubsub_subscription" "example" {
max_bytes = 1000
max_duration = "300s"
max_messages = 1000
}
depends_on = [
google_storage_bucket.example,
Expand Down Expand Up @@ -396,9 +397,11 @@ resource "google_pubsub_subscription" "example" {
max_bytes = 1000
max_duration = "300s"
max_messages = 1000
avro_config {
write_metadata = true
use_topic_schema = true
}
}
depends_on = [
Expand Down Expand Up @@ -673,6 +676,10 @@ The following arguments are supported:
The maximum bytes that can be written to a Cloud Storage file before a new file is created. Min 1 KB, max 10 GiB.
The maxBytes limit may be exceeded in cases where messages are larger than the limit.

* `max_messages` -
(Optional)
The maximum messages that can be written to a Cloud Storage file before a new file is created. Min 1000 messages.

* `state` -
(Output)
An output-only field that indicates whether or not the subscription can receive messages.
Expand All @@ -695,6 +702,10 @@ The following arguments are supported:
(Optional)
When true, write the subscription name, messageId, publishTime, attributes, and orderingKey as additional fields in the output.

* `use_topic_schema` -
(Optional)
When true, the output Cloud Storage file will be serialized using the topic schema, if it exists.

<a name="nested_push_config"></a>The `push_config` block supports:

* `oidc_token` -
Expand Down

0 comments on commit 0f957d6

Please sign in to comment.