Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloudwatch_events_target - add sagemaker pipeline support #32882

Merged
merged 4 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/32882.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_cloudwatch_events_target: Add `sagemaker_pipeline_target` argument
```
101 changes: 101 additions & 0 deletions internal/service/events/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,33 @@ func ResourceTarget() *schema.Resource {
},
},
},
"sagemaker_pipeline_target": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"pipeline_parameter_list": {
Type: schema.TypeSet,
Optional: true,
MaxItems: 200,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
},
"value": {
Type: schema.TypeString,
Required: true,
},
},
},
},
},
},
},
"sqs_target": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -540,6 +567,12 @@ func resourceTargetRead(ctx context.Context, d *schema.ResourceData, meta interf
}
}

if t.SageMakerPipelineParameters != nil {
if err := d.Set("sagemaker_pipeline_target", flattenTargetSageMakerPipelineParameters(t.SageMakerPipelineParameters)); err != nil {
return diag.Errorf("setting sagemaker_pipeline_parameters: %s", err)
}
}

if t.SqsParameters != nil {
if err := d.Set("sqs_target", flattenTargetSQSParameters(t.SqsParameters)); err != nil {
return diag.Errorf("setting sqs_target: %s", err)
Expand Down Expand Up @@ -701,6 +734,10 @@ func buildPutTargetInputStruct(ctx context.Context, d *schema.ResourceData) *eve
e.SqsParameters = expandTargetSQSParameters(v.([]interface{}))
}

if v, ok := d.GetOk("sagemaker_pipeline_target"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
e.SageMakerPipelineParameters = expandTargetSageMakerPipelineParameters(v.([]interface{}))
}

if v, ok := d.GetOk("input_transformer"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
e.InputTransformer = expandTransformerParameters(v.([]interface{}))
}
Expand Down Expand Up @@ -918,6 +955,48 @@ func expandTargetSQSParameters(config []interface{}) *eventbridge.SqsParameters
return sqsParameters
}

func expandTargetSageMakerPipelineParameterList(tfList []interface{}) []*eventbridge.SageMakerPipelineParameter {
if len(tfList) == 0 {
return nil
}

var result []*eventbridge.SageMakerPipelineParameter

for _, tfMapRaw := range tfList {
if tfMapRaw == nil {
continue
}

tfMap := tfMapRaw.(map[string]interface{})

apiObject := &eventbridge.SageMakerPipelineParameter{}

if v, ok := tfMap["name"].(string); ok && v != "" {
apiObject.Name = aws.String(v)
}

if v, ok := tfMap["value"].(string); ok && v != "" {
apiObject.Value = aws.String(v)
}

result = append(result, apiObject)
}

return result
}

func expandTargetSageMakerPipelineParameters(config []interface{}) *eventbridge.SageMakerPipelineParameters {
sageMakerPipelineParameters := &eventbridge.SageMakerPipelineParameters{}
for _, c := range config {
param := c.(map[string]interface{})
if v, ok := param["pipeline_parameter_list"].(*schema.Set); ok && v.Len() > 0 {
sageMakerPipelineParameters.PipelineParameterList = expandTargetSageMakerPipelineParameterList(v.List())
}
}

return sageMakerPipelineParameters
}

func expandTargetHTTPParameters(tfMap map[string]interface{}) *eventbridge.HttpParameters {
if tfMap == nil {
return nil
Expand Down Expand Up @@ -1069,6 +1148,28 @@ func flattenTargetKinesisParameters(kinesisParameters *eventbridge.KinesisParame
return result
}

func flattenTargetSageMakerPipelineParameters(sageMakerParameters *eventbridge.SageMakerPipelineParameters) []map[string]interface{} {
config := make(map[string]interface{})
config["pipeline_parameter_list"] = flattenTargetSageMakerPipelineParameter(sageMakerParameters.PipelineParameterList)
result := []map[string]interface{}{config}
return result
}

func flattenTargetSageMakerPipelineParameter(pcs []*eventbridge.SageMakerPipelineParameter) []map[string]interface{} {
if len(pcs) == 0 {
return nil
}
results := make([]map[string]interface{}, 0)
for _, pc := range pcs {
c := make(map[string]interface{})
c["name"] = aws.StringValue(pc.Name)
c["value"] = aws.StringValue(pc.Value)

results = append(results, c)
}
return results
}

func flattenTargetSQSParameters(sqsParameters *eventbridge.SqsParameters) []map[string]interface{} {
config := make(map[string]interface{})
config["message_group_id"] = aws.StringValue(sqsParameters.MessageGroupId)
Expand Down
159 changes: 149 additions & 10 deletions internal/service/events/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@ func TestAccEventsTarget_basic(t *testing.T) {
Steps: []resource.TestStep{
{
Config: testAccTargetConfig_basic(rName),
Check: resource.ComposeTestCheckFunc(
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckTargetExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "rule", rName),
resource.TestCheckResourceAttr(resourceName, "event_bus_name", "default"),
resource.TestCheckResourceAttr(resourceName, "target_id", rName),
resource.TestCheckResourceAttrPair(resourceName, "arn", snsTopicResourceName, "arn"),

resource.TestCheckResourceAttr(resourceName, "input", ""),
resource.TestCheckResourceAttr(resourceName, "input_path", ""),
resource.TestCheckResourceAttr(resourceName, "role_arn", ""),
resource.TestCheckResourceAttr(resourceName, "run_command_targets.#", "0"),
resource.TestCheckResourceAttr(resourceName, "batch_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "dead_letter_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "ecs_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "event_bus_name", "default"),
resource.TestCheckResourceAttr(resourceName, "http_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "input", ""),
resource.TestCheckResourceAttr(resourceName, "input_path", ""),
resource.TestCheckResourceAttr(resourceName, "input_transformer.#", "0"),
resource.TestCheckResourceAttr(resourceName, "kinesis_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "redshift_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "retry_policy.#", "0"),
resource.TestCheckResourceAttr(resourceName, "role_arn", ""),
resource.TestCheckResourceAttr(resourceName, "rule", rName),
resource.TestCheckResourceAttr(resourceName, "run_command_targets.#", "0"),
resource.TestCheckResourceAttr(resourceName, "sagemaker_pipeline_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "sqs_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "input_transformer.#", "0"),
resource.TestCheckResourceAttr(resourceName, "target_id", rName),
),
},
{
Expand Down Expand Up @@ -818,6 +821,41 @@ func TestAccEventsTarget_sqs(t *testing.T) {
})
}

func TestAccEventsTarget_sageMakerPipeline(t *testing.T) {
ctx := acctest.Context(t)
var v eventbridge.Target
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_cloudwatch_event_target.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, eventbridge.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckTargetDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccTargetConfig_sageMakerPipeline(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckTargetExists(ctx, resourceName, &v),
resource.TestCheckResourceAttrPair(resourceName, "arn", "aws_sagemaker_pipeline.test", "arn"),
resource.TestCheckResourceAttr(resourceName, "sagemaker_pipeline_target.#", "1"),
resource.TestCheckResourceAttr(resourceName, "sagemaker_pipeline_target.0.pipeline_parameter_list.#", "1"),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "sagemaker_pipeline_target.0.pipeline_parameter_list.*", map[string]string{
"name": "key",
"value": "value",
}),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateIdFunc: testAccTargetImportStateIdFunc(resourceName),
ImportStateVerify: true,
},
},
})
}

func TestAccEventsTarget_Input_transformer(t *testing.T) {
ctx := acctest.Context(t)
resourceName := "aws_cloudwatch_event_target.test"
Expand Down Expand Up @@ -2183,6 +2221,107 @@ resource "aws_sqs_queue" "test" {
`, rName)
}

func testAccTargetConfig_sageMakerPipeline(rName string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}

resource "aws_cloudwatch_event_rule" "test" {
name = %[1]q
description = "schedule_batch_test"
schedule_expression = "rate(5 minutes)"
}

resource "aws_cloudwatch_event_target" "test" {
arn = aws_sagemaker_pipeline.test.arn
rule = aws_cloudwatch_event_rule.test.id
role_arn = aws_iam_role.test.arn

sagemaker_pipeline_target {
pipeline_parameter_list {
name = "key"
value = "value"
}
}

target_id = %[1]q
}

resource "aws_iam_role" "test" {
name = %[1]q

assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "events.${data.aws_partition.current.dns_suffix}"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}

resource "aws_iam_role_policy" "test" {
name = %[1]q
role = aws_iam_role.test.id

policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sagemaker:*",
"Effect": "Allow",
"Resource": [
"*"
]
}
]
}
EOF
}

resource "aws_iam_role" "sagemaker" {
name = "%[1]s-2"
path = "/"
assume_role_policy = data.aws_iam_policy_document.test.json
}

data "aws_iam_policy_document" "test" {
statement {
actions = ["sts:AssumeRole"]

principals {
type = "Service"
identifiers = ["sagemaker.amazonaws.com"]
}
}
}

resource "aws_sagemaker_pipeline" "test" {
pipeline_name = %[1]q
pipeline_display_name = %[1]q
role_arn = aws_iam_role.sagemaker.arn

pipeline_definition = jsonencode({
Version = "2020-12-01"
Steps = [{
Name = "Test"
Type = "Fail"
Arguments = {
ErrorMessage = "test"
}
}]
})
}
`, rName)
}

func testAccTargetConfig_inputTransformer(rName string, inputPathKeys []string) string {
var inputPaths, inputTemplates strings.Builder

Expand Down
10 changes: 10 additions & 0 deletions website/docs/r/cloudwatch_event_target.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ The following arguments are optional:
* `run_command_targets` - (Optional) Parameters used when you are using the rule to invoke Amazon EC2 Run Command. Documented below. A maximum of 5 are allowed.
* `redshift_target` - (Optional) Parameters used when you are using the rule to invoke an Amazon Redshift Statement. Documented below. A maximum of 1 are allowed.
* `retry_policy` - (Optional) Parameters used when you are providing retry policies. Documented below. A maximum of 1 are allowed.
* `sagemaker_pipeline_target` - (Optional) Parameters used when you are using the rule to invoke an Amazon SageMaker Pipeline. Documented below. A maximum of 1 are allowed.
* `sqs_target` - (Optional) Parameters used when you are using the rule to invoke an Amazon SQS Queue. Documented below. A maximum of 1 are allowed.
* `target_id` - (Optional) The unique target assignment ID. If missing, will generate a random, unique id.

Expand Down Expand Up @@ -572,6 +573,15 @@ For more information, see [Task Networking](https://docs.aws.amazon.com/AmazonEC

* `message_group_id` - (Optional) The FIFO message group ID to use as the target.

### sagemaker_pipeline_target

* `pipeline_parameter_list` - (Optional) List of Parameter names and values for SageMaker Model Building Pipeline execution.

#### pipeline_parameter_list

* `name` - (Required) Name of parameter to start execution of a SageMaker Model Building Pipeline.
* `value` - (Required) Value of parameter to start execution of a SageMaker Model Building Pipeline.

## Attribute Reference

This resource exports no additional attributes.
Expand Down
Loading