Skip to content

Commit

Permalink
Merge pull request #22963 from DrFaust92/glue-trigger-event
Browse files Browse the repository at this point in the history
r/glue_trigger - add `event_batching_condition`
  • Loading branch information
ewbankkit authored Feb 11, 2022
2 parents c75c99a + 609e431 commit f12686a
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .changelog/22963.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_glue_trigger: Add `event_batching_condition` argument.
```
101 changes: 83 additions & 18 deletions internal/service/glue/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/flex"
tfiam "github.com/hashicorp/terraform-provider-aws/internal/service/iam"
tftags "github.com/hashicorp/terraform-provider-aws/internal/tags"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
Expand Down Expand Up @@ -95,6 +96,26 @@ func ResourceTrigger() *schema.Resource {
Optional: true,
Default: true,
},
"event_batching_condition": {
Type: schema.TypeList,
Optional: true,
MinItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"batch_size": {
Type: schema.TypeInt,
Required: true,
ValidateFunc: validation.IntBetween(1, 100),
},
"batch_window": {
Type: schema.TypeInt,
Optional: true,
Default: 900,
ValidateFunc: validation.IntBetween(1, 900),
},
},
},
},
"name": {
Type: schema.TypeString,
Required: true,
Expand Down Expand Up @@ -186,16 +207,21 @@ func resourceTriggerCreate(d *schema.ResourceData, meta interface{}) error {
triggerType := d.Get("type").(string)

input := &glue.CreateTriggerInput{
Actions: expandGlueActions(d.Get("actions").([]interface{})),
Name: aws.String(name),
Tags: Tags(tags.IgnoreAWS()),
Type: aws.String(triggerType),
Actions: expandGlueActions(d.Get("actions").([]interface{})),
Name: aws.String(name),
Tags: Tags(tags.IgnoreAWS()),
Type: aws.String(triggerType),
StartOnCreation: aws.Bool(d.Get("start_on_creation").(bool)),
}

if v, ok := d.GetOk("description"); ok {
input.Description = aws.String(v.(string))
}

if v, ok := d.GetOk("event_batching_condition"); ok {
input.EventBatchingCondition = expandGlueEventBatchingCondition(v.([]interface{}))
}

if v, ok := d.GetOk("predicate"); ok {
input.Predicate = expandGluePredicate(v.([]interface{}))
}
Expand All @@ -209,7 +235,13 @@ func resourceTriggerCreate(d *schema.ResourceData, meta interface{}) error {
}

if d.Get("enabled").(bool) && triggerType != glue.TriggerTypeOnDemand {
input.StartOnCreation = aws.Bool(true)
start := true

if triggerType == glue.TriggerTypeEvent {
start = false
}

input.StartOnCreation = aws.Bool(start)
}

if v, ok := d.GetOk("workflow_name"); ok {
Expand All @@ -219,7 +251,6 @@ func resourceTriggerCreate(d *schema.ResourceData, meta interface{}) error {
if v, ok := d.GetOk("start_on_creation"); ok {
input.StartOnCreation = aws.Bool(v.(bool))
}

log.Printf("[DEBUG] Creating Glue Trigger: %s", input)
err := resource.Retry(tfiam.PropagationTimeout, func() *resource.RetryError {
_, err := conn.CreateTrigger(input)
Expand Down Expand Up @@ -305,7 +336,7 @@ func resourceTriggerRead(d *schema.ResourceData, meta interface{}) error {
state := aws.StringValue(trigger.State)
d.Set("state", state)

if aws.StringValue(trigger.Type) == glue.TriggerTypeOnDemand {
if aws.StringValue(trigger.Type) == glue.TriggerTypeOnDemand || aws.StringValue(trigger.Type) == glue.TriggerTypeEvent {
enabled = (state == glue.TriggerStateCreated || state == glue.TriggerStateCreating) && d.Get("enabled").(bool)
} else {
enabled = (state == glue.TriggerStateActivated || state == glue.TriggerStateActivating)
Expand All @@ -316,6 +347,10 @@ func resourceTriggerRead(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("error setting predicate: %w", err)
}

if err := d.Set("event_batching_condition", flattenGlueEventBatchingCondition(trigger.EventBatchingCondition)); err != nil {
return fmt.Errorf("error setting event_batching_condition: %w", err)
}

d.Set("name", trigger.Name)
d.Set("schedule", trigger.Schedule)

Expand Down Expand Up @@ -345,7 +380,7 @@ func resourceTriggerRead(d *schema.ResourceData, meta interface{}) error {
func resourceTriggerUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).GlueConn

if d.HasChanges("actions", "description", "predicate", "schedule") {
if d.HasChanges("actions", "description", "predicate", "schedule", "event_batching_condition") {
triggerUpdate := &glue.TriggerUpdate{
Actions: expandGlueActions(d.Get("actions").([]interface{})),
}
Expand All @@ -361,6 +396,11 @@ func resourceTriggerUpdate(d *schema.ResourceData, meta interface{}) error {
if v, ok := d.GetOk("schedule"); ok {
triggerUpdate.Schedule = aws.String(v.(string))
}

if v, ok := d.GetOk("event_batching_condition"); ok {
triggerUpdate.EventBatchingCondition = expandGlueEventBatchingCondition(v.([]interface{}))
}

input := &glue.UpdateTriggerInput{
Name: aws.String(d.Id()),
TriggerUpdate: triggerUpdate,
Expand Down Expand Up @@ -466,14 +506,12 @@ func expandGlueActions(l []interface{}) []*glue.Action {
action.JobName = aws.String(v)
}

argumentsMap := make(map[string]string)
for k, v := range m["arguments"].(map[string]interface{}) {
argumentsMap[k] = v.(string)
if v, ok := m["arguments"].(map[string]interface{}); ok && len(v) > 0 {
action.Arguments = flex.ExpandStringMap(v)
}
action.Arguments = aws.StringMap(argumentsMap)

if v, ok := m["timeout"]; ok && v.(int) > 0 {
action.Timeout = aws.Int64(int64(v.(int)))
if v, ok := m["timeout"].(int); ok && v > 0 {
action.Timeout = aws.Int64(int64(v))
}

if v, ok := m["security_configuration"].(string); ok && v != "" {
Expand All @@ -495,8 +533,8 @@ func expandGlueTriggerNotificationProperty(l []interface{}) *glue.NotificationPr

property := &glue.NotificationProperty{}

if v, ok := m["notify_delay_after"]; ok && v.(int) > 0 {
property.NotifyDelayAfter = aws.Int64(int64(v.(int)))
if v, ok := m["notify_delay_after"].(int); ok && v > 0 {
property.NotifyDelayAfter = aws.Int64(int64(v))
}

return property
Expand Down Expand Up @@ -541,8 +579,8 @@ func expandGluePredicate(l []interface{}) *glue.Predicate {
Conditions: expandGlueConditions(m["conditions"].([]interface{})),
}

if v, ok := m["logical"]; ok && v.(string) != "" {
predicate.Logical = aws.String(v.(string))
if v, ok := m["logical"].(string); ok && v != "" {
predicate.Logical = aws.String(v)
}

return predicate
Expand Down Expand Up @@ -633,3 +671,30 @@ func flattenGlueTriggerNotificationProperty(property *glue.NotificationProperty)

return []map[string]interface{}{m}
}

func expandGlueEventBatchingCondition(l []interface{}) *glue.EventBatchingCondition {
m := l[0].(map[string]interface{})

ebc := &glue.EventBatchingCondition{
BatchSize: aws.Int64(int64(m["batch_size"].(int))),
}

if v, ok := m["batch_window"].(int); ok && v > 0 {
ebc.BatchWindow = aws.Int64(int64(v))
}

return ebc
}

func flattenGlueEventBatchingCondition(ebc *glue.EventBatchingCondition) []map[string]interface{} {
if ebc == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"batch_size": aws.Int64Value(ebc.BatchSize),
"batch_window": aws.Int64Value(ebc.BatchWindow),
}

return []map[string]interface{}{m}
}
90 changes: 90 additions & 0 deletions internal/service/glue/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestAccGlueTrigger_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "tags.%", "0"),
resource.TestCheckResourceAttr(resourceName, "type", "ON_DEMAND"),
resource.TestCheckResourceAttr(resourceName, "workflow_name", ""),
resource.TestCheckResourceAttr(resourceName, "event_batching_condition.#", "0"),
),
},
{
Expand Down Expand Up @@ -496,6 +497,48 @@ func TestAccGlueTrigger_onDemandDisable(t *testing.T) {
})
}

func TestAccGlueTrigger_eventBatchingCondition(t *testing.T) {
var trigger glue.Trigger

rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_glue_trigger.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckTriggerDestroy,
Steps: []resource.TestStep{
{
Config: testAccTriggerConfigEvent(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckTriggerExists(resourceName, &trigger),
resource.TestCheckResourceAttr(resourceName, "event_batching_condition.#", "1"),
resource.TestCheckResourceAttr(resourceName, "event_batching_condition.0.batch_size", "1"),
resource.TestCheckResourceAttr(resourceName, "event_batching_condition.0.batch_window", "900"),
resource.TestCheckResourceAttr(resourceName, "type", "EVENT"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"enabled", "start_on_creation"},
},
{
Config: testAccTriggerConfigEventUpdated(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckTriggerExists(resourceName, &trigger),
resource.TestCheckResourceAttr(resourceName, "event_batching_condition.#", "1"),
resource.TestCheckResourceAttr(resourceName, "event_batching_condition.0.batch_size", "1"),
resource.TestCheckResourceAttr(resourceName, "event_batching_condition.0.batch_window", "50"),
resource.TestCheckResourceAttr(resourceName, "type", "EVENT"),
),
},
},
})
}

func TestAccGlueTrigger_disappears(t *testing.T) {
var trigger glue.Trigger

Expand Down Expand Up @@ -828,3 +871,50 @@ resource "aws_glue_trigger" "test" {
}
`, rName))
}

func testAccTriggerConfigEvent(rName string) string {
return acctest.ConfigCompose(testAccJobConfig_Required(rName), fmt.Sprintf(`
resource "aws_glue_workflow" test {
name = %[1]q
}
resource "aws_glue_trigger" "test" {
name = %[1]q
type = "EVENT"
workflow_name = aws_glue_workflow.test.name
start_on_creation = false
actions {
job_name = aws_glue_job.test.name
}
event_batching_condition {
batch_size = 1
}
}
`, rName))
}

func testAccTriggerConfigEventUpdated(rName string) string {
return acctest.ConfigCompose(testAccJobConfig_Required(rName), fmt.Sprintf(`
resource "aws_glue_workflow" test {
name = %[1]q
}
resource "aws_glue_trigger" "test" {
name = %[1]q
type = "EVENT"
workflow_name = aws_glue_workflow.test.name
start_on_creation = false
actions {
job_name = aws_glue_job.test.name
}
event_batching_condition {
batch_size = 1
batch_window = 50
}
}
`, rName))
}
7 changes: 7 additions & 0 deletions website/docs/r/glue_trigger.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ The following arguments are supported:
* `start_on_creation` – (Optional) Set to true to start `SCHEDULED` and `CONDITIONAL` triggers when created. True is not supported for `ON_DEMAND` triggers.
* `type` – (Required) The type of trigger. Valid values are `CONDITIONAL`, `ON_DEMAND`, and `SCHEDULED`.
* `workflow_name` - (Optional) A workflow to which the trigger should be associated to. Every workflow graph (DAG) needs a starting trigger (`ON_DEMAND` or `SCHEDULED` type) and can contain multiple additional `CONDITIONAL` triggers.
* `event_batching_condition` - (Optional) Batch condition that must be met (specified number of events received or batch time window expired) before EventBridge event trigger fires. See [Event Batching Condition](#event-batching-condition).

### Actions

Expand Down Expand Up @@ -144,6 +145,12 @@ The following arguments are supported:
* `crawl_state` - (Optional) The condition crawl state. Currently, the values supported are `RUNNING`, `SUCCEEDED`, `CANCELLED`, and `FAILED`. If this is specified, `crawler_name` must also be specified. Conflicts with `state`.
* `logical_operator` - (Optional) A logical operator. Defaults to `EQUALS`.

### Event Batching Condition

* `batch_size` - (Required)Number of events that must be received from Amazon EventBridge before EventBridge event trigger fires.
* `batch_window` - (Optional) Window of time in seconds after which EventBridge event trigger fires. Window starts when first event is received. Default value is `900`.


## Attributes Reference

In addition to all arguments above, the following attributes are exported:
Expand Down

0 comments on commit f12686a

Please sign in to comment.