Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ibm_event_streams_quota data source and resource #5610

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions examples/ibm-event-streams/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,34 @@ resource "ibm_resource_tag" "tag_example_on_es" {
}
```

#### Scenario 6: Connect to an existing Event Streams instance and its topics.
#### Scenario 6: Set default and user quotas on an existing Event Streams instance.

This code sets the default quota to 32768 bytes/second for producers and 16384 bytes/second for consumers.
It sets a quota for user `iam-ServiceId-00001111-2222-3333-4444-555566667777` to 65536 bytes/second for producers and no limit (-1) for consumers.
For more information on quotas, see [Setting Kafka quotas](https://cloud.ibm.com/docs/EventStreams?topic=EventStreams-enabling_kafka_quotas).

```terraform
data "ibm_resource_instance" "es_instance_6" {
name = "terraform-integration-6"
resource_group_id = data.ibm_resource_group.group.id
}

resource "ibm_event_streams_quota" "default_quota" {
resource_instance_id = data.ibm_resource_instance.es_instance_6.id
entity = "default"
producer_byte_rate = 32768
consumer_byte_rate = 16384
}

resource "ibm_event_streams_quota" "user00001111_quota" {
resource_instance_id = data.ibm_resource_instance.es_instance_6.id
entity = "iam-ServiceId-00001111-2222-3333-4444-555566667777"
producer_byte_rate = 65536
consumer_byte_rate = -1
}
```

#### Scenario 7: Connect to an existing Event Streams instance and its topics.

This scenario uses a fictitious `"kafka_consumer_app"` resource to demonstrate how a consumer application could be configured.
The resource uses three configuration properties:
Expand All @@ -177,22 +204,22 @@ The topic names can be provided as strings, or can be taken from topic data sour

```terraform
# Use an existing instance
data "ibm_resource_instance" "es_instance_6" {
name = "terraform-integration-6"
data "ibm_resource_instance" "es_instance_7" {
name = "terraform-integration-7"
resource_group_id = data.ibm_resource_group.group.id
}

# Use an existing topic on that instance
data "ibm_event_streams_topic" "es_topic_6" {
resource_instance_id = data.ibm_resource_instance.es_instance_6.id
data "ibm_event_streams_topic" "es_topic_7" {
resource_instance_id = data.ibm_resource_instance.es_instance_7.id
name = "my-es-topic"
}

# The FICTITIOUS consumer application, configured with brokers, API key, and topics
resource "kafka_consumer_app" "es_kafka_app" {
bootstrap_server = lookup(data.ibm_resource_instance.es_instance_4.extensions, "kafka_brokers_sasl", [])
bootstrap_server = lookup(data.ibm_resource_instance.es_instance_7.extensions, "kafka_brokers_sasl", [])
apikey = var.es_reader_api_key
topics = [data.ibm_event_streams_topic.es_topic_4.name]
topics = [data.ibm_event_streams_topic.es_topic_7.name]
}
```

Expand Down
23 changes: 23 additions & 0 deletions ibm/conns/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ import (
"github.com/IBM/continuous-delivery-go-sdk/cdtektonpipelinev2"
"github.com/IBM/continuous-delivery-go-sdk/cdtoolchainv2"
"github.com/IBM/event-notifications-go-admin-sdk/eventnotificationsv1"
"github.com/IBM/eventstreams-go-sdk/pkg/adminrestv1"
"github.com/IBM/eventstreams-go-sdk/pkg/schemaregistryv1"
"github.com/IBM/ibm-hpcs-uko-sdk/ukov4"
"github.com/IBM/logs-go-sdk/logsv0"
Expand Down Expand Up @@ -299,6 +300,7 @@ type ClientSession interface {
AtrackerV2() (*atrackerv2.AtrackerV2, error)
MetricsRouterV3() (*metricsrouterv3.MetricsRouterV3, error)
ESschemaRegistrySession() (*schemaregistryv1.SchemaregistryV1, error)
ESadminRestSession() (*adminrestv1.AdminrestV1, error)
ContextBasedRestrictionsV1() (*contextbasedrestrictionsv1.ContextBasedRestrictionsV1, error)
SecurityAndComplianceCenterV3() (*scc.SecurityAndComplianceCenterApiV3, error)
CdToolchainV2() (*cdtoolchainv2.CdToolchainV2, error)
Expand Down Expand Up @@ -614,6 +616,9 @@ type clientSession struct {
esSchemaRegistryClient *schemaregistryv1.SchemaregistryV1
esSchemaRegistryErr error

esAdminRestClient *adminrestv1.AdminrestV1
esAdminRestErr error

// Security and Compliance Center (SCC)
securityAndComplianceCenterClient *scc.SecurityAndComplianceCenterApiV3
securityAndComplianceCenterClientErr error
Expand Down Expand Up @@ -1212,6 +1217,10 @@ func (session clientSession) ESschemaRegistrySession() (*schemaregistryv1.Schema
return session.esSchemaRegistryClient, session.esSchemaRegistryErr
}

func (session clientSession) ESadminRestSession() (*adminrestv1.AdminrestV1, error) {
return session.esAdminRestClient, session.esAdminRestErr
}

// Security and Compliance center Admin API
func (session clientSession) SecurityAndComplianceCenterV3() (*scc.SecurityAndComplianceCenterApiV3, error) {
return session.securityAndComplianceCenterClient, session.securityAndComplianceCenterClientErr
Expand Down Expand Up @@ -3325,6 +3334,20 @@ func (c *Config) ClientSession() (interface{}, error) {
})
}

esAdminRestV1Options := &adminrestv1.AdminrestV1Options{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check other client session creation for visibility . If visbility is private then we construct service endpoint with private does your serivce support private endpoints

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Visibility doesn't apply in our case, we do not set visibility in the client session. The code interacts with an instance of the Event Streams service which the user has provisioned. The endpoint is determined by a URL exposed in the instance metadata.

Authenticator: authenticator,
}
session.esAdminRestClient, err = adminrestv1.NewAdminrestV1(esAdminRestV1Options)
if err != nil {
session.esAdminRestErr = fmt.Errorf("[ERROR] Error occured while configuring Event Streams admin rest: %q", err)
}
if session.esAdminRestClient != nil && session.esAdminRestClient.Service != nil {
session.esAdminRestClient.Service.EnableRetries(c.RetryCount, c.RetryDelay)
session.esAdminRestClient.SetDefaultHeaders(gohttp.Header{
"X-Original-User-Agent": {fmt.Sprintf("terraform-provider-ibm/%s", version.Version)},
})
}

// Construct an "options" struct for creating the service client.
var cdToolchainClientURL string
if c.Visibility == "private" || c.Visibility == "public-and-private" {
Expand Down
2 changes: 2 additions & 0 deletions ibm/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ func Provider() *schema.Provider {
"ibm_dns_secondary": classicinfrastructure.DataSourceIBMDNSSecondary(),
"ibm_event_streams_topic": eventstreams.DataSourceIBMEventStreamsTopic(),
"ibm_event_streams_schema": eventstreams.DataSourceIBMEventStreamsSchema(),
"ibm_event_streams_quota": eventstreams.DataSourceIBMEventStreamsQuota(),
"ibm_hpcs": hpcs.DataSourceIBMHPCS(),
"ibm_hpcs_managed_key": hpcs.DataSourceIbmManagedKey(),
"ibm_hpcs_key_template": hpcs.DataSourceIbmKeyTemplate(),
Expand Down Expand Up @@ -1100,6 +1101,7 @@ func Provider() *schema.Provider {
"ibm_dns_record": classicinfrastructure.ResourceIBMDNSRecord(),
"ibm_event_streams_topic": eventstreams.ResourceIBMEventStreamsTopic(),
"ibm_event_streams_schema": eventstreams.ResourceIBMEventStreamsSchema(),
"ibm_event_streams_quota": eventstreams.ResourceIBMEventStreamsQuota(),
"ibm_firewall": classicinfrastructure.ResourceIBMFirewall(),
"ibm_firewall_policy": classicinfrastructure.ResourceIBMFirewallPolicy(),
"ibm_hpcs": hpcs.ResourceIBMHPCS(),
Expand Down
123 changes: 123 additions & 0 deletions ibm/service/eventstreams/data_source_ibm_event_streams_quota.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright IBM Corp. 2024 All Rights Reserved.
// Licensed under the Mozilla Public License v2.0

package eventstreams

import (
"context"
"fmt"
"log"
"strings"

"github.com/IBM-Cloud/terraform-provider-ibm/ibm/conns"
"github.com/IBM/eventstreams-go-sdk/pkg/adminrestv1"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

// A quota in an Event Streams service instance.
// The ID is the CRN with the last two components "quota:entity".
func DataSourceIBMEventStreamsQuota() *schema.Resource {
return &schema.Resource{
ReadContext: dataSourceIBMEventStreamsQuotaRead,

Schema: map[string]*schema.Schema{
"resource_instance_id": {
Type: schema.TypeString,
Required: true,
Description: "The ID or CRN of the Event Streams service instance",
},
"entity": {
Type: schema.TypeString,
Required: true,
Description: "The entity for which the quota is set; 'default' or IAM ID",
},
"producer_byte_rate": {
Type: schema.TypeInt,
Computed: true,
Description: "The producer quota in bytes per second, -1 means no quota",
},
"consumer_byte_rate": {
Type: schema.TypeInt,
Computed: true,
Description: "The consumer quota in bytes per second, -1 means no quota",
},
},
}
}

// read quota properties using the admin-rest API
func dataSourceIBMEventStreamsQuotaRead(context context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
adminrestClient, instanceCRN, entity, err := getQuotaClientInstanceEntity(d, meta)
if err != nil {
return diag.FromErr(err)
}

getQuotaOptions := &adminrestv1.GetQuotaOptions{}
getQuotaOptions.SetEntityName(entity)
quota, response, err := adminrestClient.GetQuotaWithContext(context, getQuotaOptions)
if err != nil {
log.Printf("[DEBUG] GetQuota failed with error: %s and response: %s\n", err, response)
if response != nil && response.StatusCode == 404 {
return diag.FromErr(fmt.Errorf("Quota for '%s' does not exist", entity))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you migrate to new error framework as announced in different slack channels

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have migrated to the flex framework, and also updated #5621.

}
return diag.FromErr(fmt.Errorf("GetQuota for '%s' failed %s", entity, err))
}

d.Set("resource_instance_id", instanceCRN)
d.Set("entity", entity)
d.Set("producer_byte_rate", getQuotaValue(quota.ProducerByteRate))
d.Set("consumer_byte_rate", getQuotaValue(quota.ConsumerByteRate))
d.SetId(getQuotaID(instanceCRN, entity))

return nil
}

// Returns
// admin-rest client (set to use the service instance)
// CRN for the service instance
// entity name
// Any error that occurred
func getQuotaClientInstanceEntity(d *schema.ResourceData, meta interface{}) (*adminrestv1.AdminrestV1, string, string, error) {
adminrestClient, err := meta.(conns.ClientSession).ESadminRestSession()
if err != nil {
return nil, "", "", err
}
instanceCRN := d.Get("resource_instance_id").(string)
if instanceCRN == "" { // importing
id := d.Id()
crnSegments := strings.Split(id, ":")
if len(crnSegments) != 10 || crnSegments[8] != "quota" || crnSegments[9] == "" {
return nil, "", "", fmt.Errorf("ID '%s' is not a quota resource", id)
}
entity := crnSegments[9]
crnSegments[8] = ""
crnSegments[9] = ""
instanceCRN = strings.Join(crnSegments, ":")
d.Set("resource_instance_id", instanceCRN)
d.Set("entity", entity)
}

instance, err := getInstanceDetails(instanceCRN, meta)
if err != nil {
return nil, "", "", err
}
adminURL := instance.Extensions["kafka_http_url"].(string)
adminrestClient.SetServiceURL(adminURL)
return adminrestClient, instanceCRN, d.Get("entity").(string), nil
}

func getQuotaID(instanceCRN string, entity string) string {
crnSegments := strings.Split(instanceCRN, ":")
crnSegments[8] = "quota"
crnSegments[9] = entity
return strings.Join(crnSegments, ":")
}

// admin-rest API returns nil for undefined rate, convert that to -1
func getQuotaValue(v *int64) int {
if v == nil {
return -1
}
return int(*v)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright IBM Corp. 2024 All Rights Reserved.
// Licensed under the Mozilla Public License v2.0

package eventstreams_test

import (
"fmt"
"strings"
"testing"

acc "github.com/IBM-Cloud/terraform-provider-ibm/ibm/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
)

const (
// Data source test requires MZR instance have this quota with producer rate 10000, consumer rate 20000
testQuotaEntity1 = "iam-ServiceId-00001111-2222-3333-4444-555566667777"
// Data source test requires MZR instance have this quota with producer rate 4096, consumer rate not defined
testQuotaEntity2 = "iam-ServiceId-77776666-5555-4444-3333-222211110000"
// Resource test requires MZR instance NOT have a quota for this
testQuotaEntity3 = "iam-ServiceId-99998888-7777-6666-5555-444433332222"
// Resource test requires MZR instance NOT have a quota for this
testQuotaEntity4 = "default"
)

func TestAccIBMEventStreamsQuotaDataSource(t *testing.T) {
resource.Test(t, resource.TestCase{
Providers: acc.TestAccProviders,
Steps: []resource.TestStep{
{
Config: testAccCheckIBMEventStreamsQuotaDataSourceConfig(getTestInstanceName(mzrKey), testQuotaEntity1),
Check: resource.ComposeTestCheckFunc(
testAccCheckIBMEventStreamsQuotaDataSourceProperties("data.ibm_event_streams_quota.es_quota", testQuotaEntity1, "10000", "20000"),
),
},
{
Config: testAccCheckIBMEventStreamsQuotaDataSourceConfig(getTestInstanceName(mzrKey), testQuotaEntity2),
Check: resource.ComposeTestCheckFunc(
testAccCheckIBMEventStreamsQuotaDataSourceProperties("data.ibm_event_streams_quota.es_quota", testQuotaEntity2, "4096", "-1"),
),
},
},
})
}

func testAccCheckIBMEventStreamsQuotaDataSourceConfig(instanceName string, entity string) string {
return fmt.Sprintf(`
data "ibm_resource_group" "group" {
is_default=true
}
data "ibm_resource_instance" "es_instance" {
resource_group_id = data.ibm_resource_group.group.id
name = "%s"
}
data "ibm_event_streams_quota" "es_quota" {
resource_instance_id = data.ibm_resource_instance.es_instance.id
entity = "%s"
}`, instanceName, entity)
}

// check properties of the terraform data source object
func testAccCheckIBMEventStreamsQuotaDataSourceProperties(name string, entity string, producerRate string, consumerRate string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[name]
if !ok {
return fmt.Errorf("Not found: %s", name)
}
quotaID := rs.Primary.ID
if quotaID == "" {
return fmt.Errorf("[ERROR] Quota ID is not set")
}
if !strings.HasSuffix(quotaID, fmt.Sprintf(":quota:%s", entity)) {
return fmt.Errorf("[ERROR] Quota ID for %s not expected CRN", quotaID)
}
if producerRate != rs.Primary.Attributes["producer_byte_rate"] {
return fmt.Errorf("[ERROR] Quota for %s producer_byte_rate = %s, expected %s", entity, rs.Primary.Attributes["producer_byte_rate"], producerRate)
}
if consumerRate != rs.Primary.Attributes["consumer_byte_rate"] {
return fmt.Errorf("[ERROR] Quota for %s consumer_byte_rate = %s, expected %s", entity, rs.Primary.Attributes["consumer_byte_rate"], consumerRate)
}
return nil
}
}
Loading
Loading