diff --git a/.changelog/2497.txt b/.changelog/2497.txt new file mode 100644 index 0000000000..6b9788e416 --- /dev/null +++ b/.changelog/2497.txt @@ -0,0 +1,3 @@ +```release-note:new-datasource +data-source/mongodbatlas_stream_processor +``` diff --git a/.changelog/2501.txt b/.changelog/2501.txt new file mode 100644 index 0000000000..b358b319c4 --- /dev/null +++ b/.changelog/2501.txt @@ -0,0 +1,3 @@ +```release-note:new-resource +mongodbatlas_stream_processor +``` diff --git a/.changelog/2566.txt b/.changelog/2566.txt new file mode 100644 index 0000000000..a6347324a2 --- /dev/null +++ b/.changelog/2566.txt @@ -0,0 +1,3 @@ +```release-note:new-datasource +data-source/mongodbatlas_stream_processors +``` diff --git a/.github/workflows/acceptance-tests-runner.yml b/.github/workflows/acceptance-tests-runner.yml index 8a51636934..114b7737f8 100644 --- a/.github/workflows/acceptance-tests-runner.yml +++ b/.github/workflows/acceptance-tests-runner.yml @@ -302,6 +302,7 @@ jobs: stream: - 'internal/service/streamconnection/*.go' - 'internal/service/streaminstance/*.go' + - 'internal/service/streamprocessor/*.go' control_plane_ip_addresses: - 'internal/service/controlplaneipaddresses/*.go' @@ -871,6 +872,7 @@ jobs: ACCTEST_PACKAGES: | ./internal/service/streamconnection ./internal/service/streaminstance + ./internal/service/streamprocessor run: make testacc control_plane_ip_addresses: diff --git a/.github/workflows/code-health.yml b/.github/workflows/code-health.yml index 652ecacdc4..9fe2183f12 100644 --- a/.github/workflows/code-health.yml +++ b/.github/workflows/code-health.yml @@ -81,6 +81,8 @@ jobs: run: make generate-doc resource_name=encryption_at_rest_private_endpoint - name: Doc for project_ip_addresses run: make generate-doc resource_name=project_ip_addresses + - name: Doc for stream_processor + run: make generate-doc resource_name=stream_processor - name: Find mutations id: self_mutation run: |- diff --git a/docs/data-sources/stream_processor.md b/docs/data-sources/stream_processor.md new file mode 100644 index 0000000000..6d4f960f60 --- /dev/null +++ b/docs/data-sources/stream_processor.md @@ -0,0 +1,141 @@ +# Data Source: mongodbatlas_stream_processor + +`mongodbatlas_stream_processor` describes a stream processor. + +## Example Usages +```terraform +resource "mongodbatlas_stream_instance" "example" { + project_id = var.project_id + instance_name = "InstanceName" + data_process_region = { + region = "VIRGINIA_USA" + cloud_provider = "AWS" + } +} + +resource "mongodbatlas_stream_connection" "example-sample" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "sample_stream_solar" + type = "Sample" +} + +resource "mongodbatlas_stream_connection" "example-cluster" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "ClusterConnection" + type = "Cluster" + cluster_name = var.cluster_name + db_role_to_execute = { + role = "atlasAdmin" + type = "BUILT_IN" + } +} + +resource "mongodbatlas_stream_connection" "example-kafka" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "KafkaPlaintextConnection" + type = "Kafka" + authentication = { + mechanism = "PLAIN" + username = var.kafka_username + password = var.kafka_password + } + bootstrap_servers = "localhost:9092,localhost:9092" + config = { + "auto.offset.reset" : "earliest" + } + security = { + protocol = "PLAINTEXT" + } +} + +resource "mongodbatlas_stream_processor" "stream-processor-sample-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "sampleProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-sample.connection_name } }, { "$emit" = { "connectionName" : "__testLog" } }]) + state = "CREATED" +} + +resource "mongodbatlas_stream_processor" "stream-processor-cluster-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "clusterProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } }, { "$emit" = { "connectionName" : "__testLog" } }]) + state = "STARTED" +} + +resource "mongodbatlas_stream_processor" "stream-processor-kafka-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "kafkaProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } }, { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-kafka.connection_name, "topic" : "example_topic" } }]) + state = "CREATED" + options = { + dlq = { + coll = "exampleColumn" + connection_name = resource.mongodbatlas_stream_connection.example-cluster.connection_name + db = "exampleDb" + } + } +} + +data "mongodbatlas_stream_processors" "example-stream-processors" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name +} + +data "mongodbatlas_stream_processor" "example-stream-processor" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = mongodbatlas_stream_processor.stream-processor-sample-example.processor_name +} + +# example making use of data sources +output "stream_processors_state" { + value = data.mongodbatlas_stream_processor.example-stream-processor.state +} + +output "stream_processors_results" { + value = data.mongodbatlas_stream_processors.example-stream-processors.results +} +``` + + +## Schema + +### Required + +- `instance_name` (String) Human-readable label that identifies the stream instance. +- `processor_name` (String) Human-readable label that identifies the stream processor. +- `project_id` (String) Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access. + +**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups. + +### Read-Only + +- `id` (String) Unique 24-hexadecimal character string that identifies the stream processor. +- `options` (Attributes) Optional configuration for the stream processor. (see [below for nested schema](#nestedatt--options)) +- `pipeline` (String) Stream aggregation pipeline you want to apply to your streaming data. +- `state` (String) The state of the stream processor. +- `stats` (String) The stats associated with the stream processor. + + +### Nested Schema for `options` + +Read-Only: + +- `dlq` (Attributes) Dead letter queue for the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/reference/glossary/#std-term-dead-letter-queue) for more information. (see [below for nested schema](#nestedatt--options--dlq)) + + +### Nested Schema for `options.dlq` + +Read-Only: + +- `coll` (String) Name of the collection to use for the DLQ. +- `connection_name` (String) Name of the connection to write DLQ messages to. Must be an Atlas connection. +- `db` (String) Name of the database to use for the DLQ. + +For more information see: [MongoDB Atlas API - Stream Processor](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v2/#tag/Streams/operation/createStreamProcessor) Documentation. diff --git a/docs/data-sources/stream_processors.md b/docs/data-sources/stream_processors.md new file mode 100644 index 0000000000..b65f30755b --- /dev/null +++ b/docs/data-sources/stream_processors.md @@ -0,0 +1,156 @@ +# Data Source: mongodbatlas_stream_processors + +`mongodbatlas_stream_processors` returns all stream processors in a stream instance. + +## Example Usages +```terraform +resource "mongodbatlas_stream_instance" "example" { + project_id = var.project_id + instance_name = "InstanceName" + data_process_region = { + region = "VIRGINIA_USA" + cloud_provider = "AWS" + } +} + +resource "mongodbatlas_stream_connection" "example-sample" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "sample_stream_solar" + type = "Sample" +} + +resource "mongodbatlas_stream_connection" "example-cluster" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "ClusterConnection" + type = "Cluster" + cluster_name = var.cluster_name + db_role_to_execute = { + role = "atlasAdmin" + type = "BUILT_IN" + } +} + +resource "mongodbatlas_stream_connection" "example-kafka" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "KafkaPlaintextConnection" + type = "Kafka" + authentication = { + mechanism = "PLAIN" + username = var.kafka_username + password = var.kafka_password + } + bootstrap_servers = "localhost:9092,localhost:9092" + config = { + "auto.offset.reset" : "earliest" + } + security = { + protocol = "PLAINTEXT" + } +} + +resource "mongodbatlas_stream_processor" "stream-processor-sample-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "sampleProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-sample.connection_name } }, { "$emit" = { "connectionName" : "__testLog" } }]) + state = "CREATED" +} + +resource "mongodbatlas_stream_processor" "stream-processor-cluster-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "clusterProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } }, { "$emit" = { "connectionName" : "__testLog" } }]) + state = "STARTED" +} + +resource "mongodbatlas_stream_processor" "stream-processor-kafka-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "kafkaProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } }, { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-kafka.connection_name, "topic" : "example_topic" } }]) + state = "CREATED" + options = { + dlq = { + coll = "exampleColumn" + connection_name = resource.mongodbatlas_stream_connection.example-cluster.connection_name + db = "exampleDb" + } + } +} + +data "mongodbatlas_stream_processors" "example-stream-processors" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name +} + +data "mongodbatlas_stream_processor" "example-stream-processor" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = mongodbatlas_stream_processor.stream-processor-sample-example.processor_name +} + +# example making use of data sources +output "stream_processors_state" { + value = data.mongodbatlas_stream_processor.example-stream-processor.state +} + +output "stream_processors_results" { + value = data.mongodbatlas_stream_processors.example-stream-processors.results +} +``` + + +## Schema + +### Required + +- `instance_name` (String) Human-readable label that identifies the stream instance. +- `project_id` (String) Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access. + +**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups. + +### Read-Only + +- `results` (Attributes List) Returns all Stream Processors within the specified stream instance. + +To use this resource, the requesting API Key must have the Project Owner + +role or Project Stream Processing Owner role. (see [below for nested schema](#nestedatt--results)) + + +### Nested Schema for `results` + +Read-Only: + +- `id` (String) Unique 24-hexadecimal character string that identifies the stream processor. +- `instance_name` (String) Human-readable label that identifies the stream instance. +- `options` (Attributes) Optional configuration for the stream processor. (see [below for nested schema](#nestedatt--results--options)) +- `pipeline` (String) Stream aggregation pipeline you want to apply to your streaming data. +- `processor_name` (String) Human-readable label that identifies the stream processor. +- `project_id` (String) Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access. + +**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups. +- `state` (String) The state of the stream processor. +- `stats` (String) The stats associated with the stream processor. + + +### Nested Schema for `results.options` + +Read-Only: + +- `dlq` (Attributes) Dead letter queue for the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/reference/glossary/#std-term-dead-letter-queue) for more information. (see [below for nested schema](#nestedatt--results--options--dlq)) + + +### Nested Schema for `results.options.dlq` + +Read-Only: + +- `coll` (String) Name of the collection to use for the DLQ. +- `connection_name` (String) Name of the connection to write DLQ messages to. Must be an Atlas connection. +- `db` (String) Name of the database to use for the DLQ. + +For more information see: [MongoDB Atlas API - Stream Processor](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v2/#tag/Streams/operation/createStreamProcessor) Documentation. diff --git a/docs/resources/stream_processor.md b/docs/resources/stream_processor.md new file mode 100644 index 0000000000..68b66f978d --- /dev/null +++ b/docs/resources/stream_processor.md @@ -0,0 +1,166 @@ +# Resource: mongodbatlas_stream_processor + +`mongodbatlas_stream_processor` provides a Stream Processor resource. The resource lets you create, delete, import, start and stop a stream processor in a stream instance. + +**NOTE**: Updating an Atlas Stream Processor is currently not supported. As a result, the following steps are needed to be able to change an Atlas Stream Processor with an Atlas Change Stream Source: +1. Retrieve the value of Change Stream Source Token `changeStreamState` from the computed `stats` attribute in `mongodbatlas_stream_processor` resource or datasource or from the Terraform state file. This takes the form of a [resume token](https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens-from-change-events). The Stream Processor has to be running in the state `STARTED` for the `stats` attribute to be available. However, before you retrieve the value, you should set the `state` to `STOPPED` to get the latest `changeStreamState`. + - Example: + ``` + {\"changeStreamState\":{\"_data\":\"8266C71670000000012B0429296E1404\"} + ``` +2. Update the `pipeline` argument setting `config.StartAfter` with the value retrieved in the previous step. More details in the [MongoDB Collection Change Stream](https://www.mongodb.com/docs/atlas/atlas-stream-processing/sp-agg-source/#mongodb-collection-change-stream) documentation. + - Example: + ``` + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name, "config" = { "startAfter" = { "_data" : "8266C71562000000012B0429296E1404" } } } }, { "$emit" = { "connectionName" : "__testLog" } }]) + ``` +3. Delete the existing Atlas Stream Processor and then create a new Atlas Stream Processor with updated pipeline parameter and the updated values. + +## Example Usages + +```terraform +resource "mongodbatlas_stream_instance" "example" { + project_id = var.project_id + instance_name = "InstanceName" + data_process_region = { + region = "VIRGINIA_USA" + cloud_provider = "AWS" + } +} + +resource "mongodbatlas_stream_connection" "example-sample" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "sample_stream_solar" + type = "Sample" +} + +resource "mongodbatlas_stream_connection" "example-cluster" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "ClusterConnection" + type = "Cluster" + cluster_name = var.cluster_name + db_role_to_execute = { + role = "atlasAdmin" + type = "BUILT_IN" + } +} + +resource "mongodbatlas_stream_connection" "example-kafka" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "KafkaPlaintextConnection" + type = "Kafka" + authentication = { + mechanism = "PLAIN" + username = var.kafka_username + password = var.kafka_password + } + bootstrap_servers = "localhost:9092,localhost:9092" + config = { + "auto.offset.reset" : "earliest" + } + security = { + protocol = "PLAINTEXT" + } +} + +resource "mongodbatlas_stream_processor" "stream-processor-sample-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "sampleProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-sample.connection_name } }, { "$emit" = { "connectionName" : "__testLog" } }]) + state = "CREATED" +} + +resource "mongodbatlas_stream_processor" "stream-processor-cluster-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "clusterProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } }, { "$emit" = { "connectionName" : "__testLog" } }]) + state = "STARTED" +} + +resource "mongodbatlas_stream_processor" "stream-processor-kafka-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "kafkaProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } }, { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-kafka.connection_name, "topic" : "example_topic" } }]) + state = "CREATED" + options = { + dlq = { + coll = "exampleColumn" + connection_name = resource.mongodbatlas_stream_connection.example-cluster.connection_name + db = "exampleDb" + } + } +} + +data "mongodbatlas_stream_processors" "example-stream-processors" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name +} + +data "mongodbatlas_stream_processor" "example-stream-processor" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = mongodbatlas_stream_processor.stream-processor-sample-example.processor_name +} + +# example making use of data sources +output "stream_processors_state" { + value = data.mongodbatlas_stream_processor.example-stream-processor.state +} + +output "stream_processors_results" { + value = data.mongodbatlas_stream_processors.example-stream-processors.results +} +``` + + +## Schema + +### Required + +- `instance_name` (String) Human-readable label that identifies the stream instance. +- `pipeline` (String) Stream aggregation pipeline you want to apply to your streaming data. [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/stream-aggregation/#std-label-stream-aggregation) contain more information. Using [jsonencode](https://developer.hashicorp.com/terraform/language/functions/jsonencode) is recommended when settig this attribute. For more details see [Aggregation Pipelines Documentation](https://www.mongodb.com/docs/atlas/atlas-stream-processing/stream-aggregation/) +- `processor_name` (String) Human-readable label that identifies the stream processor. +- `project_id` (String) Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access. + +**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups. + +### Optional + +- `options` (Attributes) Optional configuration for the stream processor. (see [below for nested schema](#nestedatt--options)) +- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. + +**NOTE** When a stream processor is created, the only valid states are CREATED or STARTED. A stream processor can be automatically started when creating it if the state is set to STARTED. + +### Read-Only + +- `id` (String) Unique 24-hexadecimal character string that identifies the stream processor. +- `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information. + + +### Nested Schema for `options` + +Required: + +- `dlq` (Attributes) Dead letter queue for the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/reference/glossary/#std-term-dead-letter-queue) for more information. (see [below for nested schema](#nestedatt--options--dlq)) + + +### Nested Schema for `options.dlq` + +Required: + +- `coll` (String) Name of the collection to use for the DLQ. +- `connection_name` (String) Name of the connection to write DLQ messages to. Must be an Atlas connection. +- `db` (String) Name of the database to use for the DLQ. + +# Import +Stream Processor resource can be imported using the Project ID, Stream Instance name and Stream Processor name, in the format `INSTANCE_NAME-PROJECT_ID-PROCESSOR_NAME`, e.g. +``` +$ terraform import mongodbatlas_stream_processor.test yourInstanceName-6117ac2fe2a3d04ed27a987v-yourProcessorName +``` + +For more information see: [MongoDB Atlas API - Stream Processor](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v2/#tag/Streams/operation/createStreamProcessor) Documentation. diff --git a/examples/mongodbatlas_stream_instance/atlas-streams-user-journey.md b/examples/mongodbatlas_stream_instance/atlas-streams-user-journey.md deleted file mode 100644 index a5b4e25d14..0000000000 --- a/examples/mongodbatlas_stream_instance/atlas-streams-user-journey.md +++ /dev/null @@ -1,22 +0,0 @@ -# MongoDB Atlas Provider - Atlas Streams with Terraform - -Atlas Stream Processing is composed of multiple components, and users can leverage Terraform to define a subset of these. To obtain more details on each of the components please refer to the [Atlas Stream Processing Documentation](https://www.mongodb.com/docs/atlas/atlas-sp/overview/#atlas-stream-processing-overview). - -### Resources supported by Terraform - -- `mongodbatlas_stream_instance`: Enables creating, modifying, and deleting Stream Instances. as part of this resource, a computed `hostnames` attribute is available for connecting to the created instance. -- `mongodbatlas_stream_connection`: Enables creating, modifying, and deleting Stream Instance Connections, which serve as data sources and sinks for your instance. - -### Managing Stream Processors - -Once a stream instance and its connections have been defined, `Stream Processors` can be created to define how your data will be processed in your instance. There are currently no resources defined in Terraform to provide this configuration. To obtain information on how this can be configured refer to [Manage Stream Processors](https://www.mongodb.com/docs/atlas/atlas-sp/manage-stream-processor/#manage-stream-processors). - -Connect to your stream instance defined in terraform using the following code block: - -``` -output "stream_instance_hostname" { - value = mongodbatlas_stream_instance.test.hostnames -} -``` - -This value can then be used to connect to the stream instance using `mongosh`, as described in the [Get Started Tutorial](https://www.mongodb.com/docs/atlas/atlas-sp/tutorial/). diff --git a/examples/mongodbatlas_stream_processor/README.md b/examples/mongodbatlas_stream_processor/README.md new file mode 100644 index 0000000000..91578fe4cb --- /dev/null +++ b/examples/mongodbatlas_stream_processor/README.md @@ -0,0 +1,14 @@ +# MongoDB Atlas Provider - Atlas Stream Processor defined in a Project + +This example shows how to use Atlas Stream Processors in Terraform. It also creates a project, which is a prerequisite. + +You must set the following variables: + +- `public_key`: Atlas public key +- `private_key`: Atlas private key +- `project_id`: Unique 24-hexadecimal digit string that identifies the project where the stream instance will be created. +- `kafka_username`: Username used for connecting to your external Kafka Cluster. +- `kafka_password`: Password used for connecting to your external Kafka Cluster. +- `cluster_name`: Name of Cluster that will be used for creating a connection. + +To learn more, see the [Stream Processor Documentation](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/). \ No newline at end of file diff --git a/examples/mongodbatlas_stream_processor/atlas-streams-user-journey.md b/examples/mongodbatlas_stream_processor/atlas-streams-user-journey.md new file mode 100644 index 0000000000..1ad2592260 --- /dev/null +++ b/examples/mongodbatlas_stream_processor/atlas-streams-user-journey.md @@ -0,0 +1,9 @@ +# MongoDB Atlas Provider - Atlas Streams with Terraform + +Atlas Stream Processing is composed of multiple components, and users can leverage Terraform to define a subset of these. To obtain more details on each of the components please refer to the [Atlas Stream Processing Documentation](https://www.mongodb.com/docs/atlas/atlas-sp/overview/#atlas-stream-processing-overview). + +### Resources supported by Terraform + +- `mongodbatlas_stream_instance`: Enables creating, modifying, and deleting Stream Instances. As part of this resource, a computed `hostnames` attribute is available for connecting to the created instance. +- `mongodbatlas_stream_connection`: Enables creating, modifying, and deleting Stream Instance Connections, which serve as data sources and sinks for your instance. +- `mongodbatlas_stream_processor`: Enables creating, deleting, starting and stopping a Stream Processor, which define how your data will be processed in your instance. diff --git a/examples/mongodbatlas_stream_processor/main.tf b/examples/mongodbatlas_stream_processor/main.tf new file mode 100644 index 0000000000..af6839aab4 --- /dev/null +++ b/examples/mongodbatlas_stream_processor/main.tf @@ -0,0 +1,97 @@ +resource "mongodbatlas_stream_instance" "example" { + project_id = var.project_id + instance_name = "InstanceName" + data_process_region = { + region = "VIRGINIA_USA" + cloud_provider = "AWS" + } +} + +resource "mongodbatlas_stream_connection" "example-sample" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "sample_stream_solar" + type = "Sample" +} + +resource "mongodbatlas_stream_connection" "example-cluster" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "ClusterConnection" + type = "Cluster" + cluster_name = var.cluster_name + db_role_to_execute = { + role = "atlasAdmin" + type = "BUILT_IN" + } +} + +resource "mongodbatlas_stream_connection" "example-kafka" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + connection_name = "KafkaPlaintextConnection" + type = "Kafka" + authentication = { + mechanism = "PLAIN" + username = var.kafka_username + password = var.kafka_password + } + bootstrap_servers = "localhost:9092,localhost:9092" + config = { + "auto.offset.reset" : "earliest" + } + security = { + protocol = "PLAINTEXT" + } +} + +resource "mongodbatlas_stream_processor" "stream-processor-sample-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "sampleProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-sample.connection_name } }, { "$emit" = { "connectionName" : "__testLog" } }]) + state = "CREATED" +} + +resource "mongodbatlas_stream_processor" "stream-processor-cluster-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "clusterProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } }, { "$emit" = { "connectionName" : "__testLog" } }]) + state = "STARTED" +} + +resource "mongodbatlas_stream_processor" "stream-processor-kafka-example" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = "kafkaProcessorName" + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } }, { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-kafka.connection_name, "topic" : "example_topic" } }]) + state = "CREATED" + options = { + dlq = { + coll = "exampleColumn" + connection_name = resource.mongodbatlas_stream_connection.example-cluster.connection_name + db = "exampleDb" + } + } +} + +data "mongodbatlas_stream_processors" "example-stream-processors" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name +} + +data "mongodbatlas_stream_processor" "example-stream-processor" { + project_id = var.project_id + instance_name = mongodbatlas_stream_instance.example.instance_name + processor_name = mongodbatlas_stream_processor.stream-processor-sample-example.processor_name +} + +# example making use of data sources +output "stream_processors_state" { + value = data.mongodbatlas_stream_processor.example-stream-processor.state +} + +output "stream_processors_results" { + value = data.mongodbatlas_stream_processors.example-stream-processors.results +} diff --git a/examples/mongodbatlas_stream_processor/provider.tf b/examples/mongodbatlas_stream_processor/provider.tf new file mode 100644 index 0000000000..18c430e061 --- /dev/null +++ b/examples/mongodbatlas_stream_processor/provider.tf @@ -0,0 +1,4 @@ +provider "mongodbatlas" { + public_key = var.public_key + private_key = var.private_key +} diff --git a/examples/mongodbatlas_stream_processor/variables.tf b/examples/mongodbatlas_stream_processor/variables.tf new file mode 100644 index 0000000000..349ed8fbfa --- /dev/null +++ b/examples/mongodbatlas_stream_processor/variables.tf @@ -0,0 +1,29 @@ +variable "public_key" { + description = "Public API key to authenticate to Atlas" + type = string +} + +variable "private_key" { + description = "Private API key to authenticate to Atlas" + type = string +} + +variable "project_id" { + description = "Unique 24-hexadecimal digit string that identifies your project" + type = string +} + +variable "kafka_username" { + description = "Username for connecting to your Kafka cluster" + type = string +} + +variable "kafka_password" { + description = "Password for connecting to your Kafka cluster" + type = string +} + +variable "cluster_name" { + description = "Name of an existing cluster in your project that will be used to create a stream connection" + type = string +} diff --git a/examples/mongodbatlas_stream_processor/versions.tf b/examples/mongodbatlas_stream_processor/versions.tf new file mode 100644 index 0000000000..9b4be6c14c --- /dev/null +++ b/examples/mongodbatlas_stream_processor/versions.tf @@ -0,0 +1,9 @@ +terraform { + required_providers { + mongodbatlas = { + source = "mongodb/mongodbatlas" + version = "~> 1.18" + } + } + required_version = ">= 1.0" +} diff --git a/internal/common/fwtypes/json_string.go b/internal/common/fwtypes/json_string.go new file mode 100644 index 0000000000..6423d5fc1c --- /dev/null +++ b/internal/common/fwtypes/json_string.go @@ -0,0 +1,141 @@ +package fwtypes + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" + "github.com/hashicorp/terraform-plugin-go/tftypes" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/schemafunc" +) + +var ( + _ basetypes.StringTypable = (*jsonStringType)(nil) + _ basetypes.StringValuable = (*JSONString)(nil) + _ basetypes.StringValuableWithSemanticEquals = (*JSONString)(nil) +) + +type jsonStringType struct { + basetypes.StringType +} + +var ( + JSONStringType = jsonStringType{} +) + +func (t jsonStringType) Equal(o attr.Type) bool { + other, ok := o.(jsonStringType) + if !ok { + return false + } + return t.StringType.Equal(other.StringType) +} + +func (t jsonStringType) String() string { + return "jsonStringType" +} + +func (t jsonStringType) ValueFromString(_ context.Context, in types.String) (basetypes.StringValuable, diag.Diagnostics) { + var diags diag.Diagnostics + if in.IsNull() { + return JSONStringNull(), diags + } + if in.IsUnknown() { + return JSONStringUnknown(), diags + } + return JSONString{StringValue: in}, diags +} + +func (t jsonStringType) ValueFromTerraform(ctx context.Context, in tftypes.Value) (attr.Value, error) { + attrValue, err := t.StringType.ValueFromTerraform(ctx, in) + if err != nil { + return nil, err + } + stringValue, ok := attrValue.(basetypes.StringValue) + if !ok { + return nil, fmt.Errorf("unexpected value type of %T", attrValue) + } + stringValuable, diags := t.ValueFromString(ctx, stringValue) + if diags.HasError() { + return nil, fmt.Errorf("unexpected error converting StringValue to StringValuable: %v", diags) + } + return stringValuable, nil +} + +func (t jsonStringType) ValueType(context.Context) attr.Value { + return JSONString{} +} + +func (t jsonStringType) Validate(ctx context.Context, in tftypes.Value, attrPath path.Path) diag.Diagnostics { + var diags diag.Diagnostics + if !in.IsKnown() || in.IsNull() { + return diags + } + var value string + err := in.As(&value) + if err != nil { + diags.AddAttributeError( + attrPath, + "Invalid Terraform Value", + "An unexpected error occurred while attempting to convert a Terraform value to a string. "+ + "This generally is an issue with the provider schema implementation. "+ + "Please contact the provider developers.\n\n"+ + "Path: "+attrPath.String()+"\n"+ + "Error: "+err.Error(), + ) + return diags + } + if !json.Valid([]byte(value)) { + diags.AddAttributeError( + attrPath, + "Invalid JSON String Value", + "A string value was provided that is not valid JSON string format (RFC 7159).\n\n"+ + "Path: "+attrPath.String()+"\n"+ + "Given Value: "+value+"\n", + ) + return diags + } + return diags +} + +func JSONStringNull() JSONString { + return JSONString{StringValue: basetypes.NewStringNull()} +} + +func JSONStringUnknown() JSONString { + return JSONString{StringValue: basetypes.NewStringUnknown()} +} + +func JSONStringValue(value string) JSONString { + return JSONString{StringValue: basetypes.NewStringValue(value)} +} + +type JSONString struct { + basetypes.StringValue +} + +func (v JSONString) Equal(o attr.Value) bool { + other, ok := o.(JSONString) + if !ok { + return false + } + return v.StringValue.Equal(other.StringValue) +} + +func (v JSONString) Type(context.Context) attr.Type { + return JSONStringType +} + +func (v JSONString) StringSemanticEquals(_ context.Context, newValuable basetypes.StringValuable) (bool, diag.Diagnostics) { + var diags diag.Diagnostics + newValue, ok := newValuable.(JSONString) + if !ok { + return false, diags + } + return schemafunc.EqualJSON(v.ValueString(), newValue.ValueString(), "JsonString"), diags +} diff --git a/internal/common/schemafunc/json.go b/internal/common/schemafunc/json.go new file mode 100644 index 0000000000..de02d80012 --- /dev/null +++ b/internal/common/schemafunc/json.go @@ -0,0 +1,28 @@ +package schemafunc + +import ( + "encoding/json" + "log" + "reflect" +) + +func EqualJSON(old, newStr, errContext string) bool { + var j, j2 any + + if old == "" { + old = "{}" + } + + if newStr == "" { + newStr = "{}" + } + if err := json.Unmarshal([]byte(old), &j); err != nil { + log.Printf("[ERROR] cannot unmarshal old %s json %v", errContext, err) + return false + } + if err := json.Unmarshal([]byte(newStr), &j2); err != nil { + log.Printf("[ERROR] cannot unmarshal new %s json %v", errContext, err) + return false + } + return reflect.DeepEqual(&j, &j2) +} diff --git a/internal/common/schemafunc/json_test.go b/internal/common/schemafunc/json_test.go new file mode 100644 index 0000000000..9dd3051305 --- /dev/null +++ b/internal/common/schemafunc/json_test.go @@ -0,0 +1,30 @@ +package schemafunc_test + +import ( + "testing" + + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/schemafunc" +) + +func Test_EqualJSON(t *testing.T) { + testCases := map[string]struct { + old string + new string + expected bool + }{ + "empty strings": {"", "", true}, + "different objects": {`{"a": 1}`, `{"b": 2}`, false}, + "invalid object": {`{{"a": 1}`, `{"b": 2}`, false}, + "double invalid object": {`{{"a": 1}`, `{"b": 2}}`, false}, + "equal objects with different order": {`{"a": 1, "b": 2}`, `{"b": 2, "a": 1}`, true}, + "equal objects whitespace": {`{"a": 1, "b": 2}`, `{"a":1,"b":2}`, true}, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + actual := schemafunc.EqualJSON(tc.old, tc.new, "vector search index") + if actual != tc.expected { + t.Errorf("Expected: %v, got: %v", tc.expected, actual) + } + }) + } +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 6c2341da86..7556324185 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -39,6 +39,7 @@ import ( "github.com/mongodb/terraform-provider-mongodbatlas/internal/service/searchdeployment" "github.com/mongodb/terraform-provider-mongodbatlas/internal/service/streamconnection" "github.com/mongodb/terraform-provider-mongodbatlas/internal/service/streaminstance" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/service/streamprocessor" "github.com/mongodb/terraform-provider-mongodbatlas/version" ) @@ -436,12 +437,15 @@ func (p *MongodbtlasProvider) DataSources(context.Context) []func() datasource.D streamconnection.PluralDataSource, controlplaneipaddresses.DataSource, projectipaddresses.DataSource, + streamprocessor.DataSource, + streamprocessor.PluralDataSource, encryptionatrest.DataSource, } previewDataSources := []func() datasource.DataSource{ // Data sources not yet in GA encryptionatrestprivateendpoint.DataSource, encryptionatrestprivateendpoint.PluralDataSource, } + if providerEnablePreview { dataSources = append(dataSources, previewDataSources...) } @@ -459,6 +463,7 @@ func (p *MongodbtlasProvider) Resources(context.Context) []func() resource.Resou pushbasedlogexport.Resource, streaminstance.Resource, streamconnection.Resource, + streamprocessor.Resource, } previewResources := []func() resource.Resource{ // Resources not yet in GA encryptionatrestprivateendpoint.Resource, diff --git a/internal/service/searchindex/model_search_index.go b/internal/service/searchindex/model_search_index.go index 16227ea7ae..fdad9a06e0 100644 --- a/internal/service/searchindex/model_search_index.go +++ b/internal/service/searchindex/model_search_index.go @@ -4,14 +4,13 @@ import ( "bytes" "context" "encoding/json" - "log" - "reflect" "strconv" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/schemafunc" "go.mongodb.org/atlas-sdk/v20240805003/admin" ) @@ -115,27 +114,7 @@ func UnmarshalStoredSource(str string) (any, diag.Diagnostics) { } func diffSuppressJSON(k, old, newStr string, d *schema.ResourceData) bool { - var j, j2 any - - if old == "" { - old = "{}" - } - - if newStr == "" { - newStr = "{}" - } - - if err := json.Unmarshal([]byte(old), &j); err != nil { - log.Printf("[ERROR] cannot unmarshal old search index analyzer json %v", err) - } - if err := json.Unmarshal([]byte(newStr), &j2); err != nil { - log.Printf("[ERROR] cannot unmarshal new search index analyzer json %v", err) - } - if !reflect.DeepEqual(&j, &j2) { - return false - } - - return true + return schemafunc.EqualJSON(old, newStr, "vector search index") } func resourceSearchIndexRefreshFunc(ctx context.Context, clusterName, projectID, indexID string, connV2 *admin.APIClient) retry.StateRefreshFunc { diff --git a/internal/service/streamprocessor/data_source.go b/internal/service/streamprocessor/data_source.go new file mode 100644 index 0000000000..958c12bcbf --- /dev/null +++ b/internal/service/streamprocessor/data_source.go @@ -0,0 +1,54 @@ +package streamprocessor + +import ( + "context" + + "github.com/hashicorp/terraform-plugin-framework/datasource" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/config" +) + +var _ datasource.DataSource = &StreamProccesorDS{} +var _ datasource.DataSourceWithConfigure = &StreamProccesorDS{} + +func DataSource() datasource.DataSource { + return &StreamProccesorDS{ + DSCommon: config.DSCommon{ + DataSourceName: StreamProcessorName, + }, + } +} + +type StreamProccesorDS struct { + config.DSCommon +} + +func (d *StreamProccesorDS) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) { + // TODO: Schema and model must be defined in data_source_schema.go. Details on scaffolding this file found in contributing/development-best-practices.md under "Scaffolding Schema and Model Definitions" + resp.Schema = DataSourceSchema(ctx) +} + +func (d *StreamProccesorDS) Read(ctx context.Context, req datasource.ReadRequest, resp *datasource.ReadResponse) { + var streamProccesorConfig TFStreamProcessorDSModel + resp.Diagnostics.Append(req.Config.Get(ctx, &streamProccesorConfig)...) + if resp.Diagnostics.HasError() { + return + } + + connV2 := d.Client.AtlasV2 + projectID := streamProccesorConfig.ProjectID.ValueString() + instanceName := streamProccesorConfig.InstanceName.ValueString() + processorName := streamProccesorConfig.ProcessorName.ValueString() + apiResp, _, err := connV2.StreamsApi.GetStreamProcessor(ctx, projectID, instanceName, processorName).Execute() + + if err != nil { + resp.Diagnostics.AddError("error fetching resource", err.Error()) + return + } + + newStreamTFStreamprocessorDSModelModel, diags := NewTFStreamprocessorDSModel(ctx, projectID, instanceName, apiResp) + if diags.HasError() { + resp.Diagnostics.Append(diags...) + return + } + resp.Diagnostics.Append(resp.State.Set(ctx, newStreamTFStreamprocessorDSModelModel)...) +} diff --git a/internal/service/streamprocessor/data_source_plural.go b/internal/service/streamprocessor/data_source_plural.go new file mode 100644 index 0000000000..faec4e6a82 --- /dev/null +++ b/internal/service/streamprocessor/data_source_plural.go @@ -0,0 +1,43 @@ +package streamprocessor + +import ( + "context" + "net/http" + + "github.com/hashicorp/terraform-plugin-framework/datasource" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/dsschema" + "go.mongodb.org/atlas-sdk/v20240805003/admin" +) + +func (d *streamProcessorsDS) Read(ctx context.Context, req datasource.ReadRequest, resp *datasource.ReadResponse) { + var streamConnectionsConfig TFStreamProcessorsDSModel + resp.Diagnostics.Append(req.Config.Get(ctx, &streamConnectionsConfig)...) + if resp.Diagnostics.HasError() { + return + } + + connV2 := d.Client.AtlasV2 + projectID := streamConnectionsConfig.ProjectID.ValueString() + instanceName := streamConnectionsConfig.InstanceName.ValueString() + + params := admin.ListStreamProcessorsApiParams{ + GroupId: projectID, + TenantName: instanceName, + } + sdkProcessors, err := dsschema.AllPages(ctx, func(ctx context.Context, pageNum int) (dsschema.PaginateResponse[admin.StreamsProcessorWithStats], *http.Response, error) { + request := connV2.StreamsApi.ListStreamProcessorsWithParams(ctx, ¶ms) + request = request.PageNum(pageNum) + return request.Execute() + }) + if err != nil { + resp.Diagnostics.AddError("error fetching results", err.Error()) + return + } + + newStreamConnectionsModel, diags := NewTFStreamProcessors(ctx, &streamConnectionsConfig, sdkProcessors) + if diags.HasError() { + resp.Diagnostics.Append(diags...) + return + } + resp.Diagnostics.Append(resp.State.Set(ctx, newStreamConnectionsModel)...) +} diff --git a/internal/service/streamprocessor/data_source_plural_schema.go b/internal/service/streamprocessor/data_source_plural_schema.go new file mode 100644 index 0000000000..aec8a0e560 --- /dev/null +++ b/internal/service/streamprocessor/data_source_plural_schema.go @@ -0,0 +1,57 @@ +package streamprocessor + +import ( + "context" + "fmt" + + "github.com/hashicorp/terraform-plugin-framework/datasource" + "github.com/hashicorp/terraform-plugin-framework/datasource/schema" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/config" +) + +var _ datasource.DataSource = &StreamProccesorDS{} +var _ datasource.DataSourceWithConfigure = &StreamProccesorDS{} + +func PluralDataSource() datasource.DataSource { + return &streamProcessorsDS{ + DSCommon: config.DSCommon{ + DataSourceName: fmt.Sprintf("%ss", StreamProcessorName), + }, + } +} + +type streamProcessorsDS struct { + config.DSCommon +} + +func (d *streamProcessorsDS) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) { + resp.Schema = schema.Schema{ + Attributes: map[string]schema.Attribute{ + "project_id": schema.StringAttribute{ + Required: true, + Description: "Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access.\n\n**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.", + MarkdownDescription: "Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access.\n\n**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.", + }, + "instance_name": schema.StringAttribute{ + Required: true, + Description: "Human-readable label that identifies the stream instance.", + MarkdownDescription: "Human-readable label that identifies the stream instance.", + }, + "results": schema.ListNestedAttribute{ + Computed: true, + NestedObject: schema.NestedAttributeObject{ + Attributes: DSAttributes(false), + }, + Description: "Returns all Stream Processors within the specified stream instance.\n\nTo use this resource, the requesting API Key must have the Project Owner\n\nrole or Project Stream Processing Owner role.", + MarkdownDescription: "Returns all Stream Processors within the specified stream instance.\n\nTo use this resource, the requesting API Key must have the Project Owner\n\nrole or Project Stream Processing Owner role.", + }, + }, + } +} + +type TFStreamProcessorsDSModel struct { + ProjectID types.String `tfsdk:"project_id"` + InstanceName types.String `tfsdk:"instance_name"` + Results []TFStreamProcessorDSModel `tfsdk:"results"` +} diff --git a/internal/service/streamprocessor/data_source_schema.go b/internal/service/streamprocessor/data_source_schema.go new file mode 100644 index 0000000000..7e2aa23191 --- /dev/null +++ b/internal/service/streamprocessor/data_source_schema.go @@ -0,0 +1,70 @@ +package streamprocessor + +import ( + "context" + + "github.com/hashicorp/terraform-plugin-framework/types" + + "github.com/hashicorp/terraform-plugin-framework/datasource/schema" +) + +func DataSourceSchema(ctx context.Context) schema.Schema { + return schema.Schema{ + Attributes: DSAttributes(true), + } +} + +func DSAttributes(withArguments bool) map[string]schema.Attribute { + return map[string]schema.Attribute{ + "id": schema.StringAttribute{ + Computed: true, + Description: "Unique 24-hexadecimal character string that identifies the stream processor.", + MarkdownDescription: "Unique 24-hexadecimal character string that identifies the stream processor.", + }, + "instance_name": schema.StringAttribute{ + Required: withArguments, + Computed: !withArguments, + Description: "Human-readable label that identifies the stream instance.", + MarkdownDescription: "Human-readable label that identifies the stream instance.", + }, + "pipeline": schema.StringAttribute{ + Computed: true, + Description: "Stream aggregation pipeline you want to apply to your streaming data.", + MarkdownDescription: "Stream aggregation pipeline you want to apply to your streaming data.", + }, + "processor_name": schema.StringAttribute{ + Required: withArguments, + Computed: !withArguments, + Description: "Human-readable label that identifies the stream processor.", + MarkdownDescription: "Human-readable label that identifies the stream processor.", + }, + "project_id": schema.StringAttribute{ + Required: withArguments, + Computed: !withArguments, + Description: "Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access.\n\n**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.", + MarkdownDescription: "Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access.\n\n**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.", + }, + "state": schema.StringAttribute{ + Computed: true, + Description: "The state of the stream processor.", + MarkdownDescription: "The state of the stream processor.", + }, + "stats": schema.StringAttribute{ + Computed: true, + Description: "The stats associated with the stream processor.", + MarkdownDescription: "The stats associated with the stream processor.", + }, + "options": optionsSchema(true), + } +} + +type TFStreamProcessorDSModel struct { + ID types.String `tfsdk:"id"` + InstanceName types.String `tfsdk:"instance_name"` + Options types.Object `tfsdk:"options"` + Pipeline types.String `tfsdk:"pipeline"` + ProcessorName types.String `tfsdk:"processor_name"` + ProjectID types.String `tfsdk:"project_id"` + State types.String `tfsdk:"state"` + Stats types.String `tfsdk:"stats"` +} diff --git a/internal/service/streamprocessor/main_test.go b/internal/service/streamprocessor/main_test.go new file mode 100644 index 0000000000..4c663869b2 --- /dev/null +++ b/internal/service/streamprocessor/main_test.go @@ -0,0 +1,15 @@ +package streamprocessor_test + +import ( + "os" + "testing" + + "github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/acc" +) + +func TestMain(m *testing.M) { + cleanup := acc.SetupSharedResources() + exitCode := m.Run() + cleanup() + os.Exit(exitCode) +} diff --git a/internal/service/streamprocessor/model.go b/internal/service/streamprocessor/model.go new file mode 100644 index 0000000000..e6e4861b17 --- /dev/null +++ b/internal/service/streamprocessor/model.go @@ -0,0 +1,184 @@ +package streamprocessor + +import ( + "context" + "encoding/json" + + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/fwtypes" + "go.mongodb.org/atlas-sdk/v20240805003/admin" +) + +func NewStreamProcessorReq(ctx context.Context, plan *TFStreamProcessorRSModel) (*admin.StreamsProcessor, diag.Diagnostics) { + pipeline, diags := convertPipelineToSdk(plan.Pipeline.ValueString()) + if diags != nil { + return nil, diags + } + streamProcessor := &admin.StreamsProcessor{ + Name: plan.ProcessorName.ValueStringPointer(), + Pipeline: &pipeline, + } + + if !plan.Options.IsNull() && !plan.Options.IsUnknown() { + optionsModel := &TFOptionsModel{} + if diags := plan.Options.As(ctx, optionsModel, basetypes.ObjectAsOptions{}); diags.HasError() { + return nil, diags + } + dlqModel := &TFDlqModel{} + if diags := optionsModel.Dlq.As(ctx, dlqModel, basetypes.ObjectAsOptions{}); diags.HasError() { + return nil, diags + } + streamProcessor.Options = &admin.StreamsOptions{ + Dlq: &admin.StreamsDLQ{ + Coll: dlqModel.Coll.ValueStringPointer(), + ConnectionName: dlqModel.ConnectionName.ValueStringPointer(), + Db: dlqModel.DB.ValueStringPointer(), + }, + } + } + + return streamProcessor, nil +} + +func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName string, apiResp *admin.StreamsProcessorWithStats) (*TFStreamProcessorRSModel, diag.Diagnostics) { + if apiResp == nil { + return nil, diag.Diagnostics{diag.NewErrorDiagnostic("streamProcessor API response is nil", "")} + } + pipelineTF, diags := convertPipelineToTF(apiResp.GetPipeline()) + if diags.HasError() { + return nil, diags + } + statsTF, diags := convertStatsToTF(apiResp.GetStats()) + if diags.HasError() { + return nil, diags + } + optionsTF, diags := ConvertOptionsToTF(ctx, apiResp.Options) + if diags.HasError() { + return nil, diags + } + tfModel := &TFStreamProcessorRSModel{ + InstanceName: types.StringPointerValue(&instanceName), + Options: *optionsTF, + Pipeline: pipelineTF, + ProcessorID: types.StringPointerValue(&apiResp.Id), + ProcessorName: types.StringPointerValue(&apiResp.Name), + ProjectID: types.StringPointerValue(&projectID), + State: types.StringPointerValue(&apiResp.State), + Stats: statsTF, + } + return tfModel, nil +} + +func NewTFStreamprocessorDSModel(ctx context.Context, projectID, instanceName string, apiResp *admin.StreamsProcessorWithStats) (*TFStreamProcessorDSModel, diag.Diagnostics) { + if apiResp == nil { + return nil, diag.Diagnostics{diag.NewErrorDiagnostic("streamProcessor API response is nil", "")} + } + pipelineTF, diags := convertPipelineToTF(apiResp.GetPipeline()) + if diags.HasError() { + return nil, diags + } + statsTF, diags := convertStatsToTF(apiResp.GetStats()) + if diags.HasError() { + return nil, diags + } + optionsTF, diags := ConvertOptionsToTF(ctx, apiResp.Options) + if diags.HasError() { + return nil, diags + } + tfModel := &TFStreamProcessorDSModel{ + ID: types.StringPointerValue(&apiResp.Id), + InstanceName: types.StringPointerValue(&instanceName), + Options: *optionsTF, + Pipeline: types.StringValue(pipelineTF.ValueString()), + ProcessorName: types.StringPointerValue(&apiResp.Name), + ProjectID: types.StringPointerValue(&projectID), + State: types.StringPointerValue(&apiResp.State), + Stats: statsTF, + } + return tfModel, nil +} + +func ConvertOptionsToTF(ctx context.Context, options *admin.StreamsOptions) (*types.Object, diag.Diagnostics) { + if options == nil || !options.HasDlq() { + optionsTF := types.ObjectNull(OptionsObjectType.AttributeTypes()) + return &optionsTF, nil + } + dlqTF, diags := convertDlqToTF(ctx, options.Dlq) + if diags.HasError() { + return nil, diags + } + optionsTF := &TFOptionsModel{ + Dlq: *dlqTF, + } + optionsObject, diags := types.ObjectValueFrom(ctx, OptionsObjectType.AttributeTypes(), optionsTF) + if diags.HasError() { + return nil, diags + } + return &optionsObject, nil +} + +func convertDlqToTF(ctx context.Context, dlq *admin.StreamsDLQ) (*types.Object, diag.Diagnostics) { + if dlq == nil { + dlqTF := types.ObjectNull(DlqObjectType.AttributeTypes()) + return &dlqTF, nil + } + dlqModel := TFDlqModel{ + Coll: types.StringPointerValue(dlq.Coll), + ConnectionName: types.StringPointerValue(dlq.ConnectionName), + DB: types.StringPointerValue(dlq.Db), + } + dlqObject, diags := types.ObjectValueFrom(ctx, DlqObjectType.AttributeTypes(), dlqModel) + if diags.HasError() { + return nil, diags + } + return &dlqObject, nil +} +func convertPipelineToTF(pipeline []any) (fwtypes.JSONString, diag.Diagnostics) { + pipelineJSON, err := json.Marshal(pipeline) + if err != nil { + return fwtypes.JSONStringValue(""), diag.Diagnostics{diag.NewErrorDiagnostic("failed to marshal pipeline", err.Error())} + } + return fwtypes.JSONStringValue(string(pipelineJSON)), nil +} + +func convertStatsToTF(stats any) (types.String, diag.Diagnostics) { + if stats == nil { + return types.StringNull(), nil + } + statsJSON, err := json.Marshal(stats) + if err != nil { + return types.StringValue(""), diag.Diagnostics{diag.NewErrorDiagnostic("failed to marshal stats", err.Error())} + } + return types.StringValue(string(statsJSON)), nil +} + +func NewTFStreamProcessors(ctx context.Context, + streamProcessorsConfig *TFStreamProcessorsDSModel, + sdkResults []admin.StreamsProcessorWithStats) (*TFStreamProcessorsDSModel, diag.Diagnostics) { + results := make([]TFStreamProcessorDSModel, len(sdkResults)) + projectID := streamProcessorsConfig.ProjectID.ValueString() + instanceName := streamProcessorsConfig.InstanceName.ValueString() + for i := range sdkResults { + processorModel, diags := NewTFStreamprocessorDSModel(ctx, projectID, instanceName, &sdkResults[i]) + if diags.HasError() { + return nil, diags + } + results[i] = *processorModel + } + return &TFStreamProcessorsDSModel{ + ProjectID: streamProcessorsConfig.ProjectID, + InstanceName: streamProcessorsConfig.InstanceName, + Results: results, + }, nil +} + +func convertPipelineToSdk(pipeline string) ([]any, diag.Diagnostics) { + var pipelineSliceOfMaps []any + err := json.Unmarshal([]byte(pipeline), &pipelineSliceOfMaps) + if err != nil { + return nil, diag.Diagnostics{diag.NewErrorDiagnostic("failed to unmarshal pipeline", err.Error())} + } + return pipelineSliceOfMaps, nil +} diff --git a/internal/service/streamprocessor/model_test.go b/internal/service/streamprocessor/model_test.go new file mode 100644 index 0000000000..68f5733dac --- /dev/null +++ b/internal/service/streamprocessor/model_test.go @@ -0,0 +1,296 @@ +package streamprocessor_test + +import ( + "context" + "encoding/json" + "testing" + + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/fwtypes" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/schemafunc" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/service/streamprocessor" + "github.com/stretchr/testify/assert" + "go.mongodb.org/atlas-sdk/v20240805003/admin" +) + +var ( + projectID = "661fe3ad234b02027dabcabc" + instanceName = "test-instance-name" + pipelineStageSourceSample = map[string]any{ + "$source": map[string]any{ + "connectionName": "sample_stream_solar", + }, + } + pipelineStageEmitLog = map[string]any{ + "$emit": map[string]any{ + "connectionName": "__testLog", + }, + } + processorName = "processor1" + processorID = "66b39806187592e8d721215d" + stateCreated = streamprocessor.CreatedState + stateStarted = streamprocessor.StartedState + streamOptionsExample = admin.StreamsOptions{ + Dlq: &admin.StreamsDLQ{ + Coll: conversion.StringPtr("testColl"), + ConnectionName: conversion.StringPtr("testConnection"), + Db: conversion.StringPtr("testDB"), + }, + } +) + +var statsExample = ` +{ + "dlqMessageCount": 0, + "dlqMessageSize": 0.0, + "inputMessageCount": 12, + "inputMessageSize": 4681.0, + "memoryTrackerBytes": 0.0, + "name": "processor1", + "ok": 1.0, + "changeStreamState": { "_data": "8266C37388000000012B0429296E1404" }, + "operatorStats": [ + { + "dlqMessageCount": 0, + "dlqMessageSize": 0.0, + "executionTimeSecs": 0, + "inputMessageCount": 12, + "inputMessageSize": 4681.0, + "maxMemoryUsage": 0.0, + "name": "SampleDataSourceOperator", + "outputMessageCount": 12, + "outputMessageSize": 0.0, + "stateSize": 0.0, + "timeSpentMillis": 0 + }, + { + "dlqMessageCount": 0, + "dlqMessageSize": 0.0, + "executionTimeSecs": 0, + "inputMessageCount": 12, + "inputMessageSize": 4681.0, + "maxMemoryUsage": 0.0, + "name": "LogSinkOperator", + "outputMessageCount": 12, + "outputMessageSize": 4681.0, + "stateSize": 0.0, + "timeSpentMillis": 0 + } + ], + "outputMessageCount": 12, + "outputMessageSize": 4681.0, + "processorId": "66b3941109bbccf048ccff06", + "scaleFactor": 1, + "stateSize": 0.0, + "status": "running" +}` + +func streamProcessorWithStats(t *testing.T, options *admin.StreamsOptions) *admin.StreamsProcessorWithStats { + t.Helper() + processor := admin.NewStreamsProcessorWithStats( + processorID, processorName, []any{pipelineStageSourceSample, pipelineStageEmitLog}, stateStarted, + ) + var stats any + err := json.Unmarshal([]byte(statsExample), &stats) + if err != nil { + t.Fatal(err) + } + processor.SetStats(stats) + if options != nil { + processor.SetOptions(*options) + } + return processor +} + +func streamProcessorDSTFModel(t *testing.T, state, stats string, options types.Object) *streamprocessor.TFStreamProcessorDSModel { + t.Helper() + return &streamprocessor.TFStreamProcessorDSModel{ + ID: types.StringValue(processorID), + InstanceName: types.StringValue(instanceName), + Options: options, + Pipeline: types.StringValue("[{\"$source\":{\"connectionName\":\"sample_stream_solar\"}},{\"$emit\":{\"connectionName\":\"__testLog\"}}]"), + ProcessorName: types.StringValue(processorName), + ProjectID: types.StringValue(projectID), + State: conversion.StringNullIfEmpty(state), + Stats: conversion.StringNullIfEmpty(stats), + } +} + +func optionsToTFModel(t *testing.T, options *admin.StreamsOptions) types.Object { + t.Helper() + ctx := context.Background() + result, diags := streamprocessor.ConvertOptionsToTF(ctx, options) + if diags.HasError() { + t.Fatal(diags) + } + assert.NotNil(t, result) + return *result +} + +func TestDSSDKToTFModel(t *testing.T) { + testCases := []struct { + sdkModel *admin.StreamsProcessorWithStats + expectedTFModel *streamprocessor.TFStreamProcessorDSModel + name string + }{ + { + name: "afterCreate", + sdkModel: admin.NewStreamsProcessorWithStats( + processorID, processorName, []any{pipelineStageSourceSample, pipelineStageEmitLog}, stateCreated, + ), + expectedTFModel: streamProcessorDSTFModel(t, stateCreated, "", optionsToTFModel(t, nil)), + }, + { + name: "afterStarted", + sdkModel: streamProcessorWithStats(t, nil), + expectedTFModel: streamProcessorDSTFModel(t, stateStarted, statsExample, optionsToTFModel(t, nil)), + }, + { + name: "withOptions", + sdkModel: streamProcessorWithStats(t, &streamOptionsExample), + expectedTFModel: streamProcessorDSTFModel(t, stateStarted, statsExample, optionsToTFModel(t, &streamOptionsExample)), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sdkModel := tc.sdkModel + resultModel, diags := streamprocessor.NewTFStreamprocessorDSModel(context.Background(), projectID, instanceName, sdkModel) + if diags.HasError() { + t.Fatalf("unexpected errors found: %s", diags.Errors()[0].Summary()) + } + assert.Equal(t, tc.expectedTFModel.Options, resultModel.Options) + if sdkModel.Stats != nil { + assert.True(t, schemafunc.EqualJSON(resultModel.Pipeline.String(), tc.expectedTFModel.Pipeline.String(), "test stream processor schema")) + var statsResult any + err := json.Unmarshal([]byte(resultModel.Stats.ValueString()), &statsResult) + if err != nil { + t.Fatal(err) + } + assert.Len(t, sdkModel.Stats, 15) + assert.Len(t, statsResult, 15) + } else { + assert.Equal(t, tc.expectedTFModel, resultModel) + } + }) + } +} + +func TestSDKToTFModel(t *testing.T) { + testCases := []struct { + sdkModel *admin.StreamsProcessorWithStats + expectedTFModel *streamprocessor.TFStreamProcessorRSModel + name string + }{ + { + name: "afterCreate", + sdkModel: admin.NewStreamsProcessorWithStats( + processorID, processorName, []any{pipelineStageSourceSample, pipelineStageEmitLog}, "CREATED", + ), + expectedTFModel: &streamprocessor.TFStreamProcessorRSModel{ + InstanceName: types.StringValue(instanceName), + Options: types.ObjectNull(streamprocessor.OptionsObjectType.AttrTypes), + ProcessorID: types.StringValue(processorID), + Pipeline: fwtypes.JSONStringValue("[{\"$source\":{\"connectionName\":\"sample_stream_solar\"}},{\"$emit\":{\"connectionName\":\"__testLog\"}}]"), + ProcessorName: types.StringValue(processorName), + ProjectID: types.StringValue(projectID), + State: types.StringValue("CREATED"), + Stats: types.StringNull(), + }, + }, + { + name: "afterStarted", + sdkModel: streamProcessorWithStats(t, nil), + expectedTFModel: &streamprocessor.TFStreamProcessorRSModel{ + InstanceName: types.StringValue(instanceName), + Options: types.ObjectNull(streamprocessor.OptionsObjectType.AttrTypes), + ProcessorID: types.StringValue(processorID), + Pipeline: fwtypes.JSONStringValue("[{\"$source\":{\"connectionName\":\"sample_stream_solar\"}},{\"$emit\":{\"connectionName\":\"__testLog\"}}]"), + ProcessorName: types.StringValue(processorName), + ProjectID: types.StringValue(projectID), + State: types.StringValue("STARTED"), + Stats: types.StringValue(statsExample), + }, + }, + { + name: "withOptions", + sdkModel: streamProcessorWithStats(t, &streamOptionsExample), + expectedTFModel: &streamprocessor.TFStreamProcessorRSModel{ + InstanceName: types.StringValue(instanceName), + Options: optionsToTFModel(t, &streamOptionsExample), + ProcessorID: types.StringValue(processorID), + Pipeline: fwtypes.JSONStringValue("[{\"$source\":{\"connectionName\":\"sample_stream_solar\"}},{\"$emit\":{\"connectionName\":\"__testLog\"}}]"), + ProcessorName: types.StringValue(processorName), + ProjectID: types.StringValue(projectID), + State: types.StringValue("STARTED"), + Stats: types.StringNull(), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sdkModel := tc.sdkModel + resultModel, diags := streamprocessor.NewStreamProcessorWithStats(context.Background(), projectID, instanceName, sdkModel) + if diags.HasError() { + t.Fatalf("unexpected errors found: %s", diags.Errors()[0].Summary()) + } + assert.Equal(t, tc.expectedTFModel.Options, resultModel.Options) + if sdkModel.Stats != nil { + assert.True(t, schemafunc.EqualJSON(resultModel.Pipeline.String(), tc.expectedTFModel.Pipeline.String(), "test stream processor schema")) + var statsResult any + err := json.Unmarshal([]byte(resultModel.Stats.ValueString()), &statsResult) + if err != nil { + t.Fatal(err) + } + assert.Len(t, sdkModel.Stats, 15) + assert.Len(t, statsResult, 15) + } else { + assert.Equal(t, tc.expectedTFModel, resultModel) + } + }) + } +} +func TestPluralDSSDKToTFModel(t *testing.T) { + testCases := map[string]struct { + sdkModel *admin.PaginatedApiStreamsStreamProcessorWithStats + expectedTFModel *streamprocessor.TFStreamProcessorsDSModel + }{ + "noResults": {sdkModel: &admin.PaginatedApiStreamsStreamProcessorWithStats{ + Results: &[]admin.StreamsProcessorWithStats{}, + TotalCount: admin.PtrInt(0), + }, expectedTFModel: &streamprocessor.TFStreamProcessorsDSModel{ + ProjectID: types.StringValue(projectID), + InstanceName: types.StringValue(instanceName), + Results: []streamprocessor.TFStreamProcessorDSModel{}, + }}, + "oneResult": {sdkModel: &admin.PaginatedApiStreamsStreamProcessorWithStats{ + Results: &[]admin.StreamsProcessorWithStats{*admin.NewStreamsProcessorWithStats( + processorID, processorName, []any{pipelineStageSourceSample, pipelineStageEmitLog}, stateCreated, + )}, + TotalCount: admin.PtrInt(1), + }, expectedTFModel: &streamprocessor.TFStreamProcessorsDSModel{ + ProjectID: types.StringValue(projectID), + InstanceName: types.StringValue(instanceName), + Results: []streamprocessor.TFStreamProcessorDSModel{ + *streamProcessorDSTFModel(t, stateCreated, "", optionsToTFModel(t, nil)), + }, + }}, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + sdkModel := tc.sdkModel + existingConfig := &streamprocessor.TFStreamProcessorsDSModel{ + ProjectID: types.StringValue(projectID), + InstanceName: types.StringValue(instanceName), + } + resultModel, diags := streamprocessor.NewTFStreamProcessors(context.Background(), existingConfig, sdkModel.GetResults()) + if diags.HasError() { + t.Fatalf("unexpected errors found: %s", diags.Errors()[0].Summary()) + } + assert.Equal(t, tc.expectedTFModel, resultModel) + }) + } +} diff --git a/internal/service/streamprocessor/resource.go b/internal/service/streamprocessor/resource.go new file mode 100644 index 0000000000..a83d090591 --- /dev/null +++ b/internal/service/streamprocessor/resource.go @@ -0,0 +1,264 @@ +package streamprocessor + +import ( + "context" + "errors" + "fmt" + "net/http" + "regexp" + + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/config" + "go.mongodb.org/atlas-sdk/v20240805003/admin" +) + +const StreamProcessorName = "stream_processor" + +var _ resource.ResourceWithConfigure = &streamProcessorRS{} +var _ resource.ResourceWithImportState = &streamProcessorRS{} + +func Resource() resource.Resource { + return &streamProcessorRS{ + RSCommon: config.RSCommon{ + ResourceName: StreamProcessorName, + }, + } +} + +type streamProcessorRS struct { + config.RSCommon +} + +func (r *streamProcessorRS) Schema(ctx context.Context, req resource.SchemaRequest, resp *resource.SchemaResponse) { + resp.Schema = ResourceSchema(ctx) +} + +func (r *streamProcessorRS) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) { + var plan TFStreamProcessorRSModel + resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) + if resp.Diagnostics.HasError() { + return + } + + streamProcessorReq, diags := NewStreamProcessorReq(ctx, &plan) + if diags.HasError() { + resp.Diagnostics.Append(diags...) + return + } + + var needsStarting bool + if !plan.State.IsNull() && !plan.State.IsUnknown() { + switch plan.State.ValueString() { + case StartedState: + needsStarting = true + case CreatedState: + needsStarting = false + default: + resp.Diagnostics.AddError("When creating a stream processor, the only valid states are CREATED and STARTED", "") + return + } + } + + connV2 := r.Client.AtlasV2 + projectID := plan.ProjectID.ValueString() + instanceName := plan.InstanceName.ValueString() + processorName := plan.ProcessorName.ValueString() + _, _, err := connV2.StreamsApi.CreateStreamProcessor(ctx, projectID, instanceName, streamProcessorReq).Execute() + + if err != nil { + resp.Diagnostics.AddError("error creating resource", err.Error()) + return + } + + streamProcessorParams := &admin.GetStreamProcessorApiParams{ + GroupId: projectID, + TenantName: instanceName, + ProcessorName: processorName, + } + + streamProcessorResp, err := WaitStateTransition(ctx, streamProcessorParams, connV2.StreamsApi, []string{InitiatingState, CreatingState}, []string{CreatedState}) + if err != nil { + resp.Diagnostics.AddError("Error creating stream processor", err.Error()) + } + + if needsStarting { + _, _, err := connV2.StreamsApi.StartStreamProcessorWithParams(ctx, + &admin.StartStreamProcessorApiParams{ + GroupId: projectID, + TenantName: instanceName, + ProcessorName: processorName, + }, + ).Execute() + if err != nil { + resp.Diagnostics.AddError("Error starting stream processor", err.Error()) + } + streamProcessorResp, err = WaitStateTransition(ctx, streamProcessorParams, connV2.StreamsApi, []string{CreatedState}, []string{StartedState}) + if err != nil { + resp.Diagnostics.AddError("Error changing state of stream processor", err.Error()) + } + } + + newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessorResp) + if diags.HasError() { + resp.Diagnostics.Append(diags...) + return + } + resp.Diagnostics.Append(resp.State.Set(ctx, newStreamProcessorModel)...) +} + +func (r *streamProcessorRS) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) { + var state TFStreamProcessorRSModel + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } + + connV2 := r.Client.AtlasV2 + + projectID := state.ProjectID.ValueString() + instanceName := state.InstanceName.ValueString() + streamProcessor, apiResp, err := connV2.StreamsApi.GetStreamProcessor(ctx, projectID, instanceName, state.ProcessorName.ValueString()).Execute() + if err != nil { + if apiResp != nil && apiResp.StatusCode == http.StatusNotFound { + resp.State.RemoveResource(ctx) + return + } + resp.Diagnostics.AddError("error fetching resource", err.Error()) + return + } + + newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessor) + if diags.HasError() { + resp.Diagnostics.Append(diags...) + return + } + resp.Diagnostics.Append(resp.State.Set(ctx, newStreamProcessorModel)...) +} + +func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) { + var plan TFStreamProcessorRSModel + var state TFStreamProcessorRSModel + resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + + if resp.Diagnostics.HasError() { + return + } + + connV2 := r.Client.AtlasV2 + pendingStates := []string{CreatedState} + desiredState := []string{} + projectID := plan.ProjectID.ValueString() + instanceName := plan.InstanceName.ValueString() + processorName := plan.ProcessorName.ValueString() + currentState := state.State.ValueString() + if !updatedStateOnly(&plan, &state) { + resp.Diagnostics.AddError("updating a Stream Processor is not supported", "") + return + } + switch plan.State.ValueString() { + case StartedState: + desiredState = append(desiredState, StartedState) + pendingStates = append(pendingStates, StoppedState) + _, _, err := connV2.StreamsApi.StartStreamProcessorWithParams(ctx, + &admin.StartStreamProcessorApiParams{ + GroupId: projectID, + TenantName: instanceName, + ProcessorName: processorName, + }, + ).Execute() + if err != nil { + resp.Diagnostics.AddError("Error starting stream processor", err.Error()) + } + case StoppedState: + if currentState != StartedState { + resp.Diagnostics.AddError(fmt.Sprintf("Stream Processor must be in %s state to transition to %s state", StartedState, StoppedState), "") + return + } + desiredState = append(desiredState, StoppedState) + pendingStates = append(pendingStates, StartedState) + _, _, err := connV2.StreamsApi.StopStreamProcessorWithParams(ctx, + &admin.StopStreamProcessorApiParams{ + GroupId: projectID, + TenantName: instanceName, + ProcessorName: processorName, + }, + ).Execute() + if err != nil { + resp.Diagnostics.AddError("Error stopping stream processor", err.Error()) + } + default: + resp.Diagnostics.AddError("transitions to states other than STARTED or STOPPED are not supported", "") + return + } + + requestParams := &admin.GetStreamProcessorApiParams{ + GroupId: projectID, + TenantName: instanceName, + ProcessorName: processorName, + } + + streamProcessorResp, err := WaitStateTransition(ctx, requestParams, connV2.StreamsApi, pendingStates, desiredState) + if err != nil { + resp.Diagnostics.AddError("Error changing state of stream processor", err.Error()) + } + + newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessorResp) + if diags.HasError() { + resp.Diagnostics.Append(diags...) + return + } + resp.Diagnostics.Append(resp.State.Set(ctx, newStreamProcessorModel)...) +} + +func (r *streamProcessorRS) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) { + var streamProcessorState *TFStreamProcessorRSModel + resp.Diagnostics.Append(req.State.Get(ctx, &streamProcessorState)...) + if resp.Diagnostics.HasError() { + return + } + + connV2 := r.Client.AtlasV2 + if _, err := connV2.StreamsApi.DeleteStreamProcessor(ctx, streamProcessorState.ProjectID.ValueString(), streamProcessorState.InstanceName.ValueString(), streamProcessorState.ProcessorName.ValueString()).Execute(); err != nil { + resp.Diagnostics.AddError("error deleting resource", err.Error()) + return + } +} + +func (r *streamProcessorRS) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) { + projectID, instanceName, processorName, err := splitImportID(req.ID) + if err != nil { + resp.Diagnostics.AddError("error splitting import ID", err.Error()) + return + } + + resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("project_id"), projectID)...) + resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("instance_name"), instanceName)...) + resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("processor_name"), processorName)...) +} + +func splitImportID(id string) (projectID, instanceName, processorName *string, err error) { + var re = regexp.MustCompile(`^(.*)-([0-9a-fA-F]{24})-(.*)$`) + parts := re.FindStringSubmatch(id) + + if len(parts) != 4 { + err = errors.New("import format error: to import a stream processor, use the format {instance_name}-{project_id}-(processor_name)") + return + } + + instanceName = &parts[1] + projectID = &parts[2] + processorName = &parts[3] + + return +} + +func updatedStateOnly(plan, state *TFStreamProcessorRSModel) bool { + return plan.ProjectID.Equal(state.ProjectID) && + plan.InstanceName.Equal(state.InstanceName) && + plan.ProcessorName.Equal(state.ProcessorName) && + plan.Pipeline.Equal(state.Pipeline) && + (plan.Options.Equal(state.Options) || plan.Options.IsUnknown()) && + !plan.State.Equal(state.State) +} diff --git a/internal/service/streamprocessor/resource_migration_test.go b/internal/service/streamprocessor/resource_migration_test.go new file mode 100644 index 0000000000..e01a4b27cc --- /dev/null +++ b/internal/service/streamprocessor/resource_migration_test.go @@ -0,0 +1,12 @@ +package streamprocessor_test + +import ( + "testing" + + "github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/mig" +) + +func TestMigStreamProcessor_basic(t *testing.T) { + mig.SkipIfVersionBelow(t, "1.19.0") // when resource 1st released + mig.CreateAndRunTest(t, basicTestCase(t)) +} diff --git a/internal/service/streamprocessor/resource_schema.go b/internal/service/streamprocessor/resource_schema.go new file mode 100644 index 0000000000..2e8ce79d12 --- /dev/null +++ b/internal/service/streamprocessor/resource_schema.go @@ -0,0 +1,133 @@ +package streamprocessor + +import ( + "context" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/fwtypes" +) + +func optionsSchema(isDatasource bool) schema.SingleNestedAttribute { + return schema.SingleNestedAttribute{ + Attributes: map[string]schema.Attribute{ + "dlq": schema.SingleNestedAttribute{ + Attributes: map[string]schema.Attribute{ + "coll": schema.StringAttribute{ + Required: !isDatasource, + Computed: isDatasource, + Description: "Name of the collection to use for the DLQ.", + MarkdownDescription: "Name of the collection to use for the DLQ.", + }, + "connection_name": schema.StringAttribute{ + Required: !isDatasource, + Computed: isDatasource, + Description: "Name of the connection to write DLQ messages to. Must be an Atlas connection.", + MarkdownDescription: "Name of the connection to write DLQ messages to. Must be an Atlas connection.", + }, + "db": schema.StringAttribute{ + Required: !isDatasource, + Computed: isDatasource, + Description: "Name of the database to use for the DLQ.", + MarkdownDescription: "Name of the database to use for the DLQ.", + }, + }, + Required: !isDatasource, + Computed: isDatasource, + Description: "Dead letter queue for the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/reference/glossary/#std-term-dead-letter-queue) for more information.", + MarkdownDescription: "Dead letter queue for the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/reference/glossary/#std-term-dead-letter-queue) for more information.", + }, + }, + Optional: !isDatasource, + Computed: isDatasource, + Description: "Optional configuration for the stream processor.", + MarkdownDescription: "Optional configuration for the stream processor.", + } +} + +func ResourceSchema(ctx context.Context) schema.Schema { + return schema.Schema{ + Attributes: map[string]schema.Attribute{ + "id": schema.StringAttribute{ + Computed: true, + Description: "Unique 24-hexadecimal character string that identifies the stream processor.", + MarkdownDescription: "Unique 24-hexadecimal character string that identifies the stream processor.", + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, + }, + "instance_name": schema.StringAttribute{ + Required: true, + Description: "Human-readable label that identifies the stream instance.", + MarkdownDescription: "Human-readable label that identifies the stream instance.", + }, + "pipeline": schema.StringAttribute{ + CustomType: fwtypes.JSONStringType, + Required: true, + Description: "Stream aggregation pipeline you want to apply to your streaming data. [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/stream-aggregation/#std-label-stream-aggregation)" + + " contain more information. Using [jsonencode](https://developer.hashicorp.com/terraform/language/functions/jsonencode) is recommended when settig this attribute. For more details see [Aggregation Pipelines Documentation](https://www.mongodb.com/docs/atlas/atlas-stream-processing/stream-aggregation/)", + MarkdownDescription: "Stream aggregation pipeline you want to apply to your streaming data. [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/stream-aggregation/#std-label-stream-aggregation)" + + " contain more information. Using [jsonencode](https://developer.hashicorp.com/terraform/language/functions/jsonencode) is recommended when settig this attribute. For more details see [Aggregation Pipelines Documentation](https://www.mongodb.com/docs/atlas/atlas-stream-processing/stream-aggregation/)", + }, + "processor_name": schema.StringAttribute{ + Required: true, + Description: "Human-readable label that identifies the stream processor.", + MarkdownDescription: "Human-readable label that identifies the stream processor.", + }, + "project_id": schema.StringAttribute{ + Required: true, + Description: "Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access.\n\n**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.", + MarkdownDescription: "Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access.\n\n**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.", + }, + "state": schema.StringAttribute{ + Optional: true, + Computed: true, + Description: "The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`." + + " When a Stream Processor is created without specifying the state, it will default to `CREATED` state.\n\n**NOTE** When a stream processor is created, the only valid states are CREATED or STARTED. A stream processor can be automatically started when creating it if the state is set to STARTED.", + MarkdownDescription: "The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`." + + " When a Stream Processor is created without specifying the state, it will default to `CREATED` state.\n\n**NOTE** When a stream processor is created, the only valid states are CREATED or STARTED. A stream processor can be automatically started when creating it if the state is set to STARTED.", + }, + "options": optionsSchema(false), + "stats": schema.StringAttribute{ + Computed: true, + Description: "The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.", + MarkdownDescription: "The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.", + }, + }, + } +} + +type TFStreamProcessorRSModel struct { + InstanceName types.String `tfsdk:"instance_name"` + Options types.Object `tfsdk:"options"` + Pipeline fwtypes.JSONString `tfsdk:"pipeline"` + ProcessorID types.String `tfsdk:"id"` + ProcessorName types.String `tfsdk:"processor_name"` + ProjectID types.String `tfsdk:"project_id"` + State types.String `tfsdk:"state"` + Stats types.String `tfsdk:"stats"` +} + +type TFOptionsModel struct { + Dlq types.Object `tfsdk:"dlq"` +} + +type TFDlqModel struct { + Coll types.String `tfsdk:"coll"` + ConnectionName types.String `tfsdk:"connection_name"` + DB types.String `tfsdk:"db"` +} + +var OptionsObjectType = types.ObjectType{AttrTypes: map[string]attr.Type{ + "dlq": DlqObjectType, +}} + +var DlqObjectType = types.ObjectType{AttrTypes: map[string]attr.Type{ + "coll": types.StringType, + "connection_name": types.StringType, + "db": types.StringType, +}, +} diff --git a/internal/service/streamprocessor/resource_test.go b/internal/service/streamprocessor/resource_test.go new file mode 100644 index 0000000000..ea7ce98b0a --- /dev/null +++ b/internal/service/streamprocessor/resource_test.go @@ -0,0 +1,472 @@ +package streamprocessor_test + +import ( + "context" + "fmt" + "regexp" + "strings" + "testing" + + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" + "github.com/stretchr/testify/assert" + + "github.com/mongodb/terraform-provider-mongodbatlas/internal/service/streamprocessor" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/acc" +) + +type connectionConfig struct { + connectionType string + clusterName string + pipelineStepIsSource bool + useAsDLQ bool + extraWhitespace bool + invalidJSON bool +} + +var ( + resourceName = "mongodbatlas_stream_processor.processor" + dataSourceName = "data.mongodbatlas_stream_processor.test" + pluralDataSourceName = "data.mongodbatlas_stream_processors.test" + connTypeSample = "Sample" + connTypeCluster = "Cluster" + connTypeKafka = "Kafka" + connTypeTestLog = "TestLog" + sampleSrcConfig = connectionConfig{connectionType: connTypeSample, pipelineStepIsSource: true} + testLogDestConfig = connectionConfig{connectionType: connTypeTestLog, pipelineStepIsSource: false} +) + +func TestAccStreamProcessor_basic(t *testing.T) { + resource.ParallelTest(t, *basicTestCase(t)) +} + +func basicTestCase(t *testing.T) *resource.TestCase { + t.Helper() + var ( + projectID = acc.ProjectIDExecution(t) + processorName = "new-processor" + instanceName = acc.RandomName() + ) + + return &resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: config(t, projectID, instanceName, processorName, "", sampleSrcConfig, testLogDestConfig), + Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.CreatedState, false, false), + }, + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, sampleSrcConfig, testLogDestConfig), + Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.StartedState, true, false), + }, + { + ResourceName: resourceName, + ImportStateIdFunc: importStateIDFunc(resourceName), + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"stats"}, + }, + }} +} + +func TestAccStreamProcessor_JSONWhiteSpaceFormat(t *testing.T) { + var ( + projectID = acc.ProjectIDExecution(t) + processorName = "new-processor-json-unchanged" + instanceName = acc.RandomName() + sampleSrcConfigExtraSpaces = connectionConfig{connectionType: connTypeSample, pipelineStepIsSource: true, extraWhitespace: true} + ) + resource.ParallelTest(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + PreCheck: func() { acc.PreCheckBasic(t) }, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.CreatedState, sampleSrcConfigExtraSpaces, testLogDestConfig), + Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.CreatedState, false, false), + }, + }}) +} + +func TestAccStreamProcessor_withOptions(t *testing.T) { + var ( + projectID, clusterName = acc.ClusterNameExecution(t) + processorName = "new-processor" + instanceName = acc.RandomName() + src = connectionConfig{connectionType: connTypeCluster, clusterName: clusterName, pipelineStepIsSource: true, useAsDLQ: true} + dest = connectionConfig{connectionType: connTypeKafka, pipelineStepIsSource: false} + ) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.CreatedState, src, dest), + Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.CreatedState, false, true), + }, + { + ResourceName: resourceName, + ImportStateIdFunc: importStateIDFunc(resourceName), + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"stats"}, + }, + }}) +} + +func TestAccStreamProcessor_createWithAutoStartAndStop(t *testing.T) { + var ( + projectID = acc.ProjectIDExecution(t) + processorName = "new-processor" + instanceName = acc.RandomName() + ) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, sampleSrcConfig, testLogDestConfig), + Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.StartedState, true, false), + }, + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.StoppedState, sampleSrcConfig, testLogDestConfig), + Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.StoppedState, true, false), + }, + }}) +} + +func TestAccStreamProcessor_clusterType(t *testing.T) { + var ( + projectID, clusterName = acc.ClusterNameExecution(t) + processorName = "new-processor" + instanceName = acc.RandomName() + srcConfig = connectionConfig{connectionType: connTypeCluster, clusterName: clusterName, pipelineStepIsSource: true} + ) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, srcConfig, testLogDestConfig), + Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.StartedState, true, false), + }, + }}) +} + +func TestAccStreamProcessor_createErrors(t *testing.T) { + var ( + projectID = acc.ProjectIDExecution(t) + processorName = "new-processor" + instanceName = acc.RandomName() + invalidJSONConfig = connectionConfig{connectionType: connTypeSample, pipelineStepIsSource: true, invalidJSON: true} + ) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.StoppedState, invalidJSONConfig, testLogDestConfig), + ExpectError: regexp.MustCompile("Invalid JSON String Value"), + }, + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.StoppedState, sampleSrcConfig, testLogDestConfig), + ExpectError: regexp.MustCompile("When creating a stream processor, the only valid states are CREATED and STARTED"), + }, + }}) +} + +func TestAccStreamProcessor_updateErrors(t *testing.T) { + var ( + processorName = "new-processor" + instanceName = acc.RandomName() + projectID, clusterName = acc.ClusterNameExecution(t) + src = connectionConfig{connectionType: connTypeCluster, clusterName: clusterName, pipelineStepIsSource: true, useAsDLQ: false} + srcWithOptions = connectionConfig{connectionType: connTypeCluster, clusterName: clusterName, pipelineStepIsSource: true, useAsDLQ: true} + ) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: checkDestroyStreamProcessor, + Steps: []resource.TestStep{ + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.CreatedState, src, testLogDestConfig), + Check: composeStreamProcessorChecks(projectID, instanceName, processorName, streamprocessor.CreatedState, false, false), + }, + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.StoppedState, src, testLogDestConfig), + ExpectError: regexp.MustCompile(`Stream Processor must be in \w+ state to transition to \w+ state`), + }, + { + Config: config(t, projectID, instanceName, processorName, streamprocessor.StartedState, srcWithOptions, testLogDestConfig), + ExpectError: regexp.MustCompile("updating a Stream Processor is not supported"), + }, + }}) +} + +func checkExists(resourceName string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[resourceName] + if !ok { + return fmt.Errorf("not found: %s", resourceName) + } + projectID := rs.Primary.Attributes["project_id"] + instanceName := rs.Primary.Attributes["instance_name"] + processorName := rs.Primary.Attributes["processor_name"] + _, _, err := acc.ConnV2().StreamsApi.GetStreamProcessor(context.Background(), projectID, instanceName, processorName).Execute() + + if err != nil { + return fmt.Errorf("Stream processor (%s) does not exist", processorName) + } + + return nil + } +} + +func checkDestroyStreamProcessor(s *terraform.State) error { + for _, rs := range s.RootModule().Resources { + if rs.Type != "mongodbatlas_stream_processor" { + continue + } + projectID := rs.Primary.Attributes["project_id"] + instanceName := rs.Primary.Attributes["instance_name"] + processorName := rs.Primary.Attributes["processor_name"] + _, _, err := acc.ConnV2().StreamsApi.GetStreamProcessor(context.Background(), projectID, instanceName, processorName).Execute() + if err == nil { + return fmt.Errorf("Stream processor (%s) still exists", processorName) + } + } + + return nil +} + +func importStateIDFunc(resourceName string) resource.ImportStateIdFunc { + return func(s *terraform.State) (string, error) { + rs, ok := s.RootModule().Resources[resourceName] + if !ok { + return "", fmt.Errorf("not found: %s", resourceName) + } + + return fmt.Sprintf("%s-%s-%s", rs.Primary.Attributes["instance_name"], rs.Primary.Attributes["project_id"], rs.Primary.Attributes["processor_name"]), nil + } +} + +func composeStreamProcessorChecks(projectID, instanceName, processorName, state string, includeStats, includeOptions bool) resource.TestCheckFunc { + checks := []resource.TestCheckFunc{checkExists(resourceName)} + attributes := map[string]string{ + "project_id": projectID, + "instance_name": instanceName, + "processor_name": processorName, + "state": state, + } + checks = acc.AddAttrChecks(resourceName, checks, attributes) + checks = acc.AddAttrChecks(dataSourceName, checks, attributes) + checks = acc.AddAttrChecks(pluralDataSourceName, checks, map[string]string{ + "project_id": projectID, + "instance_name": instanceName, + "results.#": "1", + "results.0.processor_name": processorName, + "results.0.state": state, + "results.0.instance_name": instanceName, + }) + if includeStats { + checks = acc.AddAttrSetChecks(resourceName, checks, "stats", "pipeline") + checks = acc.AddAttrSetChecks(dataSourceName, checks, "stats", "pipeline") + checks = acc.AddAttrSetChecks(pluralDataSourceName, checks, "results.0.stats", "results.0.pipeline") + } + if includeOptions { + checks = acc.AddAttrSetChecks(resourceName, checks, "options.dlq.db", "options.dlq.coll", "options.dlq.connection_name") + checks = acc.AddAttrSetChecks(dataSourceName, checks, "options.dlq.db", "options.dlq.coll", "options.dlq.connection_name") + checks = acc.AddAttrSetChecks(pluralDataSourceName, checks, "results.0.options.dlq.db", "results.0.options.dlq.coll", "results.0.options.dlq.connection_name") + } + return resource.ComposeAggregateTestCheckFunc(checks...) +} + +func config(t *testing.T, projectID, instanceName, processorName, state string, src, dest connectionConfig) string { + t.Helper() + stateConfig := "" + if state != "" { + stateConfig = fmt.Sprintf(`state = %[1]q`, state) + } + + connectionConfigSrc, connectionIDSrc, pipelineStepSrc := configConnection(t, projectID, src) + connectionConfigDest, connectionIDDest, pipelineStepDest := configConnection(t, projectID, dest) + dependsOn := []string{} + if connectionIDSrc != "" { + dependsOn = append(dependsOn, connectionIDSrc) + } + if connectionIDDest != "" { + dependsOn = append(dependsOn, connectionIDDest) + } + dependsOnStr := strings.Join(dependsOn, ", ") + pipeline := fmt.Sprintf("[{\"$source\":%1s},{\"$emit\":%2s}]", pipelineStepSrc, pipelineStepDest) + optionsStr := "" + if src.useAsDLQ { + assert.Equal(t, connTypeCluster, src.connectionType) + optionsStr = fmt.Sprintf(` + options = { + dlq = { + coll = "dlq_coll" + connection_name = %[1]s.connection_name + db = "dlq_db" + } + }`, connectionIDSrc) + } + + dataSource := fmt.Sprintf(` + data "mongodbatlas_stream_processor" "test" { + project_id = %[1]q + instance_name = %[2]q + processor_name = %[3]q + depends_on = [%4s] + }`, projectID, instanceName, processorName, resourceName) + dataSourcePlural := fmt.Sprintf(` + data "mongodbatlas_stream_processors" "test" { + project_id = %[1]q + instance_name = %[2]q + depends_on = [%3s] + }`, projectID, instanceName, resourceName) + + return fmt.Sprintf(` + resource "mongodbatlas_stream_instance" "instance" { + project_id = %[1]q + instance_name = %[2]q + data_process_region = { + region = "VIRGINIA_USA" + cloud_provider = "AWS" + } + } + + %[3]s + %[4]s + + resource "mongodbatlas_stream_processor" "processor" { + project_id = %[1]q + instance_name = mongodbatlas_stream_instance.instance.instance_name + processor_name = %[5]q + pipeline = %[6]q + %[7]s + %[8]s + depends_on = [%[9]s] + } + %[10]s + %[11]s + + `, projectID, instanceName, connectionConfigSrc, connectionConfigDest, processorName, pipeline, stateConfig, optionsStr, dependsOnStr, dataSource, dataSourcePlural) +} + +func configConnection(t *testing.T, projectID string, config connectionConfig) (connectionConfig, resourceID, pipelineStep string) { + t.Helper() + assert.False(t, config.extraWhitespace && config.connectionType != connTypeSample, "extraWhitespace is only supported for Sample connection") + assert.False(t, config.invalidJSON && config.connectionType != connTypeSample, "invalidJson is only supported for Sample connection") + connectionType := config.connectionType + pipelineStepIsSource := config.pipelineStepIsSource + switch connectionType { + case "Cluster": + var connectionName, resourceName string + clusterName := config.clusterName + assert.NotEqual(t, "", clusterName) + if pipelineStepIsSource { + connectionName = "ClusterConnectionSrc" + resourceName = "cluster_src" + } else { + connectionName = "ClusterConnectionDest" + resourceName = "cluster_dest" + } + connectionConfig = fmt.Sprintf(` + resource "mongodbatlas_stream_connection" %[4]q { + project_id = %[1]q + cluster_name = %[2]q + instance_name = mongodbatlas_stream_instance.instance.instance_name + connection_name = %[3]q + type = "Cluster" + db_role_to_execute = { + role = "atlasAdmin" + type = "BUILT_IN" + } + depends_on = [mongodbatlas_stream_instance.instance] + } + `, projectID, clusterName, connectionName, resourceName) + resourceID = fmt.Sprintf("mongodbatlas_stream_connection.%s", resourceName) + pipelineStep = fmt.Sprintf("{\"connectionName\":%q}", connectionName) + return connectionConfig, resourceID, pipelineStep + case "Kafka": + var connectionName, resourceName, pipelineStep string + if pipelineStepIsSource { + connectionName = "KafkaConnectionSrc" + resourceName = "kafka_src" + pipelineStep = fmt.Sprintf("{\"connectionName\":%q}", connectionName) + } else { + connectionName = "KafkaConnectionDest" + resourceName = "kafka_dest" + pipelineStep = fmt.Sprintf("{\"connectionName\":%q,\"topic\":\"random_topic\"}", connectionName) + } + connectionConfig = fmt.Sprintf(` + resource "mongodbatlas_stream_connection" %[3]q{ + project_id = %[1]q + instance_name = mongodbatlas_stream_instance.instance.instance_name + connection_name = %[2]q + type = "Kafka" + authentication = { + mechanism = "PLAIN" + username = "user" + password = "rawpassword" + } + bootstrap_servers = "localhost:9092,localhost:9092" + config = { + "auto.offset.reset" : "earliest" + } + security = { + protocol = "PLAINTEXT" + } + depends_on = [mongodbatlas_stream_instance.instance] + } + `, projectID, connectionName, resourceName) + resourceID = fmt.Sprintf("mongodbatlas_stream_connection.%s", resourceName) + return connectionConfig, resourceID, pipelineStep + case "Sample": + if !pipelineStepIsSource { + t.Fatal("Sample connection must be used as a source") + } + connectionConfig = fmt.Sprintf(` + resource "mongodbatlas_stream_connection" "sample" { + project_id = %[1]q + instance_name = mongodbatlas_stream_instance.instance.instance_name + connection_name = "sample_stream_solar" + type = "Sample" + depends_on = [mongodbatlas_stream_instance.instance] + } + `, projectID) + resourceID = "mongodbatlas_stream_connection.sample" + if config.extraWhitespace { + pipelineStep = "{\"connectionName\": \"sample_stream_solar\"}" + } else { + pipelineStep = "{\"connectionName\":\"sample_stream_solar\"}" + } + if config.invalidJSON { + pipelineStep = "{\"connectionName\": \"sample_stream_solar\"" // missing closing bracket + } + return connectionConfig, resourceID, pipelineStep + + case "TestLog": + if pipelineStepIsSource { + t.Fatal("TestLog connection must be used as a destination") + } + connectionConfig = "" + resourceID = "" + pipelineStep = "{\"connectionName\":\"__testLog\"}" + return connectionConfig, resourceID, pipelineStep + } + t.Fatalf("Unknown connection type: %s", connectionType) + return connectionConfig, resourceID, pipelineStep +} diff --git a/internal/service/streamprocessor/state_transition.go b/internal/service/streamprocessor/state_transition.go new file mode 100644 index 0000000000..66bfc62289 --- /dev/null +++ b/internal/service/streamprocessor/state_transition.go @@ -0,0 +1,61 @@ +package streamprocessor + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" + "go.mongodb.org/atlas-sdk/v20240805003/admin" +) + +const ( + InitiatingState = "INIT" + CreatingState = "CREATING" + CreatedState = "CREATED" + StartedState = "STARTED" + StoppedState = "STOPPED" + DroppedState = "DROPPED" + FailedState = "FAILED" +) + +func WaitStateTransition(ctx context.Context, requestParams *admin.GetStreamProcessorApiParams, client admin.StreamsApi, pendingStates, desiredStates []string) (*admin.StreamsProcessorWithStats, error) { + stateConf := &retry.StateChangeConf{ + Pending: pendingStates, + Target: desiredStates, + Refresh: refreshFunc(ctx, requestParams, client), + Timeout: 5 * time.Minute, // big pipelines can take a while to stop due to checkpointing. We prefer the API to raise the error (~ 3min) than having to expose custom timeouts. + MinTimeout: 3 * time.Second, + Delay: 0, + } + + streamProcessorResp, err := stateConf.WaitForStateContext(ctx) + if err != nil { + return nil, err + } + + if streamProcessor, ok := streamProcessorResp.(*admin.StreamsProcessorWithStats); ok && streamProcessor != nil { + return streamProcessor, nil + } + + return nil, errors.New("did not obtain valid result when waiting for stream processor state transition") +} + +func refreshFunc(ctx context.Context, requestParams *admin.GetStreamProcessorApiParams, client admin.StreamsApi) retry.StateRefreshFunc { + return func() (any, string, error) { + streamProcessor, resp, err := client.GetStreamProcessorWithParams(ctx, requestParams).Execute() + if err != nil { + if resp.StatusCode == http.StatusNotFound { + return "", DroppedState, err + } + return nil, FailedState, err + } + state := streamProcessor.GetState() + if state == FailedState { + return nil, state, fmt.Errorf("error creating MongoDB Stream Processor(%s) status was: %s", requestParams.ProcessorName, state) + } + return streamProcessor, state, nil + } +} diff --git a/internal/service/streamprocessor/state_transition_test.go b/internal/service/streamprocessor/state_transition_test.go new file mode 100644 index 0000000000..783e41006a --- /dev/null +++ b/internal/service/streamprocessor/state_transition_test.go @@ -0,0 +1,156 @@ +package streamprocessor_test + +import ( + "context" + "errors" + "net/http" + "testing" + + "go.mongodb.org/atlas-sdk/v20240805003/admin" + "go.mongodb.org/atlas-sdk/v20240805003/mockadmin" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/service/streamprocessor" +) + +var ( + InitiatingState = "INIT" + CreatingState = "CREATING" + CreatedState = "CREATED" + StartedState = "STARTED" + StoppedState = "STOPPED" + DroppedState = "DROPPED" + FailedState = "FAILED" + sc500 = conversion.IntPtr(500) + sc200 = conversion.IntPtr(200) + sc404 = conversion.IntPtr(404) + streamProcessorName = "processorName" + requestParams = &admin.GetStreamProcessorApiParams{ + GroupId: "groupId", + TenantName: "tenantName", + ProcessorName: streamProcessorName, + } +) + +type testCase struct { + expectedState *string + name string + mockResponses []response + desiredStates []string + pendingStates []string + expectedError bool +} + +func TestStreamProcessorStateTransition(t *testing.T) { + testCases := []testCase{ + { + name: "Successful transition to CREATED", + mockResponses: []response{ + {state: &InitiatingState, statusCode: sc200}, + {state: &CreatingState, statusCode: sc200}, + {state: &CreatedState, statusCode: sc200}, + }, + expectedState: &CreatedState, + expectedError: false, + desiredStates: []string{CreatedState}, + pendingStates: []string{InitiatingState, CreatingState}, + }, + { + name: "Successful transition to STARTED", + mockResponses: []response{ + {state: &CreatedState, statusCode: sc200}, + {state: &StartedState, statusCode: sc200}, + }, + expectedState: &StartedState, + expectedError: false, + desiredStates: []string{StartedState}, + pendingStates: []string{CreatedState, StoppedState}, + }, + { + name: "Successful transition to STOPPED", + mockResponses: []response{ + {state: &StartedState, statusCode: sc200}, + {state: &StoppedState, statusCode: sc200}, + }, + expectedState: &StoppedState, + expectedError: false, + desiredStates: []string{StoppedState}, + pendingStates: []string{StartedState}, + }, + { + name: "Error when transitioning to FAILED state", + mockResponses: []response{ + {state: &InitiatingState, statusCode: sc200}, + {state: &FailedState, statusCode: sc200}, + }, + expectedState: nil, + expectedError: true, + desiredStates: []string{CreatedState}, + pendingStates: []string{InitiatingState, CreatingState}, + }, + { + name: "Error when API responds with error", + mockResponses: []response{ + {statusCode: sc500, err: errors.New("Internal server error")}, + }, + expectedState: nil, + expectedError: true, + desiredStates: []string{CreatedState, FailedState}, + pendingStates: []string{InitiatingState, CreatingState}, + }, + { + name: "Dropped state when 404 is returned", + mockResponses: []response{ + {statusCode: sc404, err: errors.New("Not found")}, + }, + expectedState: &DroppedState, + expectedError: true, + desiredStates: []string{CreatedState, FailedState}, + pendingStates: []string{InitiatingState, CreatingState}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + m := mockadmin.NewStreamsApi(t) + m.EXPECT().GetStreamProcessorWithParams(mock.Anything, mock.Anything).Return(admin.GetStreamProcessorApiRequest{ApiService: m}) + + for _, resp := range tc.mockResponses { + modelResp, httpResp, err := resp.get() + m.EXPECT().GetStreamProcessorExecute(mock.Anything).Return(modelResp, httpResp, err).Once() + } + resp, err := streamprocessor.WaitStateTransition(context.Background(), requestParams, m, tc.pendingStates, tc.desiredStates) + assert.Equal(t, tc.expectedError, err != nil) + if resp != nil { + assert.Equal(t, *tc.expectedState, resp.State) + } + }) + } +} + +type response struct { + state *string + statusCode *int + err error +} + +func (r *response) get() (*admin.StreamsProcessorWithStats, *http.Response, error) { + var httpResp *http.Response + if r.statusCode != nil { + httpResp = &http.Response{StatusCode: *r.statusCode} + } + return responseWithState(r.state), httpResp, r.err +} + +func responseWithState(state *string) *admin.StreamsProcessorWithStats { + if state == nil { + return nil + } + return &admin.StreamsProcessorWithStats{ + Name: streamProcessorName, + State: *state, + } +} diff --git a/internal/service/streamprocessor/tfplugingen/generator_config.yml b/internal/service/streamprocessor/tfplugingen/generator_config.yml new file mode 100644 index 0000000000..7e36507e66 --- /dev/null +++ b/internal/service/streamprocessor/tfplugingen/generator_config.yml @@ -0,0 +1,24 @@ +provider: + name: mongodbatlas + +resources: + stream_processor: + read: + path: /api/atlas/v2/groups/{groupId}/streams/{tenantName}/processor/{processorName} + method: GET + create: + path: /api/atlas/v2/groups/{groupId}/streams/{tenantName}/processor + method: POST + delete: + path: /api/atlas/v2/groups/{groupId}/streams/{tenantName}/processor/{processorName} + method: DELETE + +data_sources: + stream_processor: + read: + path: /api/atlas/v2/groups/{groupId}/streams/{tenantName}/processor/{processorName} + method: GET + stream_processors: + read: + path: /api/atlas/v2/groups/{groupId}/streams/{tenantName}/processors + method: GET diff --git a/scripts/schema-scaffold.sh b/scripts/schema-scaffold.sh index 1438fe73c3..070e96d99c 100755 --- a/scripts/schema-scaffold.sh +++ b/scripts/schema-scaffold.sh @@ -50,5 +50,5 @@ rename_file() { } rename_file "${resource_name_snake_case}_data_source_gen.go" "data_source_schema.go" -rename_file "${resource_name_snake_case}s_data_source_gen.go" "pural_data_source_schema.go" +rename_file "${resource_name_snake_case}s_data_source_gen.go" "plural_data_source_schema.go" rename_file "${resource_name_snake_case}_resource_gen.go" "resource_schema.go" diff --git a/templates/data-sources/stream_processor.md.tmpl b/templates/data-sources/stream_processor.md.tmpl new file mode 100644 index 0000000000..07dc70b478 --- /dev/null +++ b/templates/data-sources/stream_processor.md.tmpl @@ -0,0 +1,10 @@ +# {{.Type}}: {{.Name}} + +`{{.Name}}` describes a stream processor. + +## Example Usages +{{ tffile (printf "examples/%s/main.tf" .Name )}} + +{{ .SchemaMarkdown | trimspace }} + +For more information see: [MongoDB Atlas API - Stream Processor](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v2/#tag/Streams/operation/createStreamProcessor) Documentation. diff --git a/templates/data-sources/stream_processors.md.tmpl b/templates/data-sources/stream_processors.md.tmpl new file mode 100644 index 0000000000..df4a654e56 --- /dev/null +++ b/templates/data-sources/stream_processors.md.tmpl @@ -0,0 +1,10 @@ +# {{.Type}}: {{.Name}} + +`{{.Name}}` returns all stream processors in a stream instance. + +## Example Usages +{{ tffile (printf "examples/mongodbatlas_stream_processor/main.tf" )}} + +{{ .SchemaMarkdown | trimspace }} + +For more information see: [MongoDB Atlas API - Stream Processor](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v2/#tag/Streams/operation/createStreamProcessor) Documentation. diff --git a/templates/resources.md.tmpl b/templates/resources.md.tmpl index ed9ba98760..b2f176cdf3 100644 --- a/templates/resources.md.tmpl +++ b/templates/resources.md.tmpl @@ -56,6 +56,7 @@ {{ else if eq .Name "mongodbatlas_ldap_verify" }} {{ else if eq .Name "mongodbatlas_third_party_integration" }} {{ else if eq .Name "mongodbatlas_x509_authentication_database_user" }} + {{ else if eq .Name "mongodbatlas_stream_processor" }} {{ else if eq .Name "mongodbatlas_privatelink_endpoint_service_data_federation_online_archive" }} {{ else }} {{ tffile (printf "examples/%s/main.tf" .Name )}} diff --git a/templates/resources/stream_processor.md.tmpl b/templates/resources/stream_processor.md.tmpl new file mode 100644 index 0000000000..22e03c261b --- /dev/null +++ b/templates/resources/stream_processor.md.tmpl @@ -0,0 +1,30 @@ +# {{.Type}}: {{.Name}} + +`{{.Name}}` provides a Stream Processor resource. The resource lets you create, delete, import, start and stop a stream processor in a stream instance. + +**NOTE**: Updating an Atlas Stream Processor is currently not supported. As a result, the following steps are needed to be able to change an Atlas Stream Processor with an Atlas Change Stream Source: +1. Retrieve the value of Change Stream Source Token `changeStreamState` from the computed `stats` attribute in `mongodbatlas_stream_processor` resource or datasource or from the Terraform state file. This takes the form of a [resume token](https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens-from-change-events). The Stream Processor has to be running in the state `STARTED` for the `stats` attribute to be available. However, before you retrieve the value, you should set the `state` to `STOPPED` to get the latest `changeStreamState`. + - Example: + ``` + {\"changeStreamState\":{\"_data\":\"8266C71670000000012B0429296E1404\"} + ``` +2. Update the `pipeline` argument setting `config.StartAfter` with the value retrieved in the previous step. More details in the [MongoDB Collection Change Stream](https://www.mongodb.com/docs/atlas/atlas-stream-processing/sp-agg-source/#mongodb-collection-change-stream) documentation. + - Example: + ``` + pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name, "config" = { "startAfter" = { "_data" : "8266C71562000000012B0429296E1404" } } } }, { "$emit" = { "connectionName" : "__testLog" } }]) + ``` +3. Delete the existing Atlas Stream Processor and then create a new Atlas Stream Processor with updated pipeline parameter and the updated values. + +## Example Usages + +{{ tffile (printf "examples/%s/main.tf" .Name )}} + +{{ .SchemaMarkdown | trimspace }} + +# Import +Stream Processor resource can be imported using the Project ID, Stream Instance name and Stream Processor name, in the format `INSTANCE_NAME-PROJECT_ID-PROCESSOR_NAME`, e.g. +``` +$ terraform import mongodbatlas_stream_processor.test yourInstanceName-6117ac2fe2a3d04ed27a987v-yourProcessorName +``` + +For more information see: [MongoDB Atlas API - Stream Processor](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v2/#tag/Streams/operation/createStreamProcessor) Documentation.