Skip to content

Commit

Permalink
add ibm_event_streams_quota
Browse files Browse the repository at this point in the history
  • Loading branch information
kccox committed Sep 4, 2024
1 parent 7d735aa commit 7e3dfe8
Show file tree
Hide file tree
Showing 9 changed files with 703 additions and 3 deletions.
33 changes: 30 additions & 3 deletions examples/ibm-event-streams/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,34 @@ resource "ibm_resource_tag" "tag_example_on_es" {
}
```

#### Scenario 6: Connect to an existing Event Streams instance and its topics.
#### Scenario 6: Set default and user quotas on an existing Event Streams instance.

This code sets the default quota to 32768 bytes/second for producers and 16384 bytes/second for consumers.
It sets a quota for user `iam-ServiceId-00001111-2222-3333-4444-555566667777` to 65536 bytes/second for producers and no limit (-1) for consumers.
For more information on quotas, see [Setting Kafka quotas](https://cloud.ibm.com/docs/EventStreams?topic=EventStreams-enabling_kafka_quotas).

```terraform
data "ibm_resource_instance" "es_instance_6" {
name = "terraform-integration-6"
resource_group_id = data.ibm_resource_group.group.id
}
resource "ibm_event_streams_quota" "default_quota" {
resource_instance_id = data.ibm_resource_instance.es_instance_6.id
entity = "default"
producer_byte_rate = 32768
consumer_byte_rate = 16384
}
resource "ibm_event_streams_quota" "user00001111_quota" {
resource_instance_id = data.ibm_resource_instance.es_instance_6.id
entity = "iam-ServiceId-00001111-2222-3333-4444-555566667777"
producer_byte_rate = 65536
consumer_byte_rate = -1
}
```

#### Scenario 7: Connect to an existing Event Streams instance and its topics.

This scenario uses a fictitious `"kafka_consumer_app"` resource to demonstrate how a consumer application could be configured.
The resource uses three configuration properties:
Expand All @@ -177,8 +204,8 @@ The topic names can be provided as strings, or can be taken from topic data sour

```terraform
# Use an existing instance
data "ibm_resource_instance" "es_instance_6" {
name = "terraform-integration-6"
data "ibm_resource_instance" "es_instance_7" {
name = "terraform-integration-7"
resource_group_id = data.ibm_resource_group.group.id
}
Expand Down
23 changes: 23 additions & 0 deletions ibm/conns/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ import (
"github.com/IBM/continuous-delivery-go-sdk/cdtektonpipelinev2"
"github.com/IBM/continuous-delivery-go-sdk/cdtoolchainv2"
"github.com/IBM/event-notifications-go-admin-sdk/eventnotificationsv1"
"github.com/IBM/eventstreams-go-sdk/pkg/adminrestv1"
"github.com/IBM/eventstreams-go-sdk/pkg/schemaregistryv1"
"github.com/IBM/ibm-hpcs-uko-sdk/ukov4"
"github.com/IBM/logs-go-sdk/logsv0"
Expand Down Expand Up @@ -299,6 +300,7 @@ type ClientSession interface {
AtrackerV2() (*atrackerv2.AtrackerV2, error)
MetricsRouterV3() (*metricsrouterv3.MetricsRouterV3, error)
ESschemaRegistrySession() (*schemaregistryv1.SchemaregistryV1, error)
ESadminRestSession() (*adminrestv1.AdminrestV1, error)
ContextBasedRestrictionsV1() (*contextbasedrestrictionsv1.ContextBasedRestrictionsV1, error)
SecurityAndComplianceCenterV3() (*scc.SecurityAndComplianceCenterApiV3, error)
CdToolchainV2() (*cdtoolchainv2.CdToolchainV2, error)
Expand Down Expand Up @@ -614,6 +616,9 @@ type clientSession struct {
esSchemaRegistryClient *schemaregistryv1.SchemaregistryV1
esSchemaRegistryErr error

esAdminRestClient *adminrestv1.AdminrestV1
esAdminRestErr error

// Security and Compliance Center (SCC)
securityAndComplianceCenterClient *scc.SecurityAndComplianceCenterApiV3
securityAndComplianceCenterClientErr error
Expand Down Expand Up @@ -1212,6 +1217,10 @@ func (session clientSession) ESschemaRegistrySession() (*schemaregistryv1.Schema
return session.esSchemaRegistryClient, session.esSchemaRegistryErr
}

func (session clientSession) ESadminRestSession() (*adminrestv1.AdminrestV1, error) {
return session.esAdminRestClient, session.esAdminRestErr
}

// Security and Compliance center Admin API
func (session clientSession) SecurityAndComplianceCenterV3() (*scc.SecurityAndComplianceCenterApiV3, error) {
return session.securityAndComplianceCenterClient, session.securityAndComplianceCenterClientErr
Expand Down Expand Up @@ -3325,6 +3334,20 @@ func (c *Config) ClientSession() (interface{}, error) {
})
}

esAdminRestV1Options := &adminrestv1.AdminrestV1Options{
Authenticator: authenticator,
}
session.esAdminRestClient, err = adminrestv1.NewAdminrestV1(esAdminRestV1Options)
if err != nil {
session.esAdminRestErr = fmt.Errorf("[ERROR] Error occured while configuring Event Streams admin rest: %q", err)
}
if session.esAdminRestClient != nil && session.esAdminRestClient.Service != nil {
session.esAdminRestClient.Service.EnableRetries(c.RetryCount, c.RetryDelay)
session.esAdminRestClient.SetDefaultHeaders(gohttp.Header{
"X-Original-User-Agent": {fmt.Sprintf("terraform-provider-ibm/%s", version.Version)},
})
}

// Construct an "options" struct for creating the service client.
var cdToolchainClientURL string
if c.Visibility == "private" || c.Visibility == "public-and-private" {
Expand Down
2 changes: 2 additions & 0 deletions ibm/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ func Provider() *schema.Provider {
"ibm_dns_secondary": classicinfrastructure.DataSourceIBMDNSSecondary(),
"ibm_event_streams_topic": eventstreams.DataSourceIBMEventStreamsTopic(),
"ibm_event_streams_schema": eventstreams.DataSourceIBMEventStreamsSchema(),
"ibm_event_streams_quota": eventstreams.DataSourceIBMEventStreamsQuota(),
"ibm_hpcs": hpcs.DataSourceIBMHPCS(),
"ibm_hpcs_managed_key": hpcs.DataSourceIbmManagedKey(),
"ibm_hpcs_key_template": hpcs.DataSourceIbmKeyTemplate(),
Expand Down Expand Up @@ -1100,6 +1101,7 @@ func Provider() *schema.Provider {
"ibm_dns_record": classicinfrastructure.ResourceIBMDNSRecord(),
"ibm_event_streams_topic": eventstreams.ResourceIBMEventStreamsTopic(),
"ibm_event_streams_schema": eventstreams.ResourceIBMEventStreamsSchema(),
"ibm_event_streams_quota": eventstreams.ResourceIBMEventStreamsQuota(),
"ibm_firewall": classicinfrastructure.ResourceIBMFirewall(),
"ibm_firewall_policy": classicinfrastructure.ResourceIBMFirewallPolicy(),
"ibm_hpcs": hpcs.ResourceIBMHPCS(),
Expand Down
123 changes: 123 additions & 0 deletions ibm/service/eventstreams/data_source_ibm_event_streams_quota.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright IBM Corp. 2024 All Rights Reserved.
// Licensed under the Mozilla Public License v2.0

package eventstreams

import (
"context"
"fmt"
"log"
"strings"

"github.com/IBM-Cloud/terraform-provider-ibm/ibm/conns"
"github.com/IBM/eventstreams-go-sdk/pkg/adminrestv1"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

// A quota in an Event Streams service instance.
// The ID is the CRN with the last two components "quota:entity".
func DataSourceIBMEventStreamsQuota() *schema.Resource {
return &schema.Resource{
ReadContext: dataSourceIBMEventStreamsQuotaRead,

Schema: map[string]*schema.Schema{
"resource_instance_id": {
Type: schema.TypeString,
Required: true,
Description: "The ID or CRN of the Event Streams service instance",
},
"entity": {
Type: schema.TypeString,
Required: true,
Description: "The entity for which the quota is set; 'default' or IAM ID",
},
"producer_byte_rate": {
Type: schema.TypeInt,
Computed: true,
Description: "The producer quota in bytes per second, -1 means no quota",
},
"consumer_byte_rate": {
Type: schema.TypeInt,
Computed: true,
Description: "The consumer quota in bytes per second, -1 means no quota",
},
},
}
}

// read quota properties using the admin-rest API
func dataSourceIBMEventStreamsQuotaRead(context context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
adminrestClient, instanceCRN, entity, err := getQuotaClientInstanceEntity(d, meta)
if err != nil {
return diag.FromErr(err)
}

getQuotaOptions := &adminrestv1.GetQuotaOptions{}
getQuotaOptions.SetEntityName(entity)
quota, response, err := adminrestClient.GetQuotaWithContext(context, getQuotaOptions)
if err != nil {
log.Printf("[DEBUG] GetQuota failed with error: %s and response: %s\n", err, response)
if response != nil && response.StatusCode == 404 {
return diag.FromErr(fmt.Errorf("Quota for '%s' does not exist", entity))
}
return diag.FromErr(fmt.Errorf("GetQuota for '%s' failed %s", entity, err))
}

d.Set("resource_instance_id", instanceCRN)
d.Set("entity", entity)
d.Set("producer_byte_rate", getQuotaValue(quota.ProducerByteRate))
d.Set("consumer_byte_rate", getQuotaValue(quota.ConsumerByteRate))
d.SetId(getQuotaID(instanceCRN, entity))

return nil
}

// Returns
// admin-rest client (set to use the service instance)
// CRN for the service instance
// entity name
// Any error that occurred
func getQuotaClientInstanceEntity(d *schema.ResourceData, meta interface{}) (*adminrestv1.AdminrestV1, string, string, error) {
adminrestClient, err := meta.(conns.ClientSession).ESadminRestSession()
if err != nil {
return nil, "", "", err
}
instanceCRN := d.Get("resource_instance_id").(string)
if instanceCRN == "" { // importing
id := d.Id()
crnSegments := strings.Split(id, ":")
if len(crnSegments) != 10 || crnSegments[8] != "quota" || crnSegments[9] == "" {
return nil, "", "", fmt.Errorf("ID '%s' is not a quota resource", id)
}
entity := crnSegments[9]
crnSegments[8] = ""
crnSegments[9] = ""
instanceCRN = strings.Join(crnSegments, ":")
d.Set("resource_instance_id", instanceCRN)
d.Set("entity", entity)
}

instance, err := getInstanceDetails(instanceCRN, meta)
if err != nil {
return nil, "", "", err
}
adminURL := instance.Extensions["kafka_http_url"].(string)
adminrestClient.SetServiceURL(adminURL)
return adminrestClient, instanceCRN, d.Get("entity").(string), nil
}

func getQuotaID(instanceCRN string, entity string) string {
crnSegments := strings.Split(instanceCRN, ":")
crnSegments[8] = "quota"
crnSegments[9] = entity
return strings.Join(crnSegments, ":")
}

// admin-rest API returns nil for undefined rate, convert that to -1
func getQuotaValue(v *int64) int {
if v == nil {
return -1
}
return int(*v)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright IBM Corp. 2017, 2021 All Rights Reserved.
// Licensed under the Mozilla Public License v2.0

package eventstreams_test

import (
"fmt"
"strings"
"testing"

acc "github.com/IBM-Cloud/terraform-provider-ibm/ibm/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
)

const (
// Data source test requires MZR instance have this quota with producer rate 10000, consumer rate 20000
testQuotaEntity1 = "iam-ServiceId-00001111-2222-3333-4444-555566667777"
// Data source test requires MZR instance have this quota with producer rate 4096, consumer rate not defined
testQuotaEntity2 = "iam-ServiceId-77776666-5555-4444-3333-222211110000"
// Resource test requires MZR instance NOT have a quota for this
testQuotaEntity3 = "iam-ServiceId-99998888-7777-6666-5555-444433332222"
// Resource test requires MZR instance NOT have a quota for this
testQuotaEntity4 = "default"
)

func TestAccIBMEventStreamsQuotaDataSource(t *testing.T) {
resource.Test(t, resource.TestCase{
Providers: acc.TestAccProviders,
Steps: []resource.TestStep{
{
Config: testAccCheckIBMEventStreamsQuotaDataSourceConfig(getTestInstanceName(mzrKey), testQuotaEntity1),
Check: resource.ComposeTestCheckFunc(
testAccCheckIBMEventStreamsQuotaDataSourceProperties("data.ibm_event_streams_quota.es_quota", testQuotaEntity1, "10000", "20000"),
),
},
{
Config: testAccCheckIBMEventStreamsQuotaDataSourceConfig(getTestInstanceName(mzrKey), testQuotaEntity2),
Check: resource.ComposeTestCheckFunc(
testAccCheckIBMEventStreamsQuotaDataSourceProperties("data.ibm_event_streams_quota.es_quota", testQuotaEntity2, "4096", "-1"),
),
},
},
})
}

func testAccCheckIBMEventStreamsQuotaDataSourceConfig(instanceName string, entity string) string {
return fmt.Sprintf(`
data "ibm_resource_group" "group" {
is_default=true
}
data "ibm_resource_instance" "es_instance" {
resource_group_id = data.ibm_resource_group.group.id
name = "%s"
}
data "ibm_event_streams_quota" "es_quota" {
resource_instance_id = data.ibm_resource_instance.es_instance.id
entity = "%s"
}`, instanceName, entity)
}

// check properties of the terraform data source object
func testAccCheckIBMEventStreamsQuotaDataSourceProperties(name string, entity string, producerRate string, consumerRate string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[name]
if !ok {
return fmt.Errorf("Not found: %s", name)
}
quotaID := rs.Primary.ID
if quotaID == "" {
return fmt.Errorf("[ERROR] Quota ID is not set")
}
if !strings.HasSuffix(quotaID, fmt.Sprintf(":quota:%s", entity)) {
return fmt.Errorf("[ERROR] Quota ID for %s not expected CRN", quotaID)
}
if producerRate != rs.Primary.Attributes["producer_byte_rate"] {
return fmt.Errorf("[ERROR] Quota for %s producer_byte_rate = %s, expected %s", entity, rs.Primary.Attributes["producer_byte_rate"], producerRate)
}
if consumerRate != rs.Primary.Attributes["consumer_byte_rate"] {
return fmt.Errorf("[ERROR] Quota for %s consumer_byte_rate = %s, expected %s", entity, rs.Primary.Attributes["consumer_byte_rate"], consumerRate)
}
return nil
}
}
Loading

0 comments on commit 7e3dfe8

Please sign in to comment.