Skip to content

Commit

Permalink
feat: Add support for error notifications for Snowpipe (#595)
Browse files Browse the repository at this point in the history
  • Loading branch information
gouline authored Jul 13, 2021
1 parent 8ff2a83 commit 90af4cf
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 53 deletions.
10 changes: 7 additions & 3 deletions docs/resources/notification_integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ description: |-

### Required

- **azure_storage_queue_primary_uri** (String) The queue ID for the Azure Queue Storage queue created for Event Grid notifications
- **azure_tenant_id** (String) The ID of the Azure Active Directory tenant used for identity management
- **name** (String)

### Optional

- **aws_sqs_arn** (String) AWS SQS queue ARN for notification integration to connect to
- **aws_sqs_external_id** (String) The external ID that Snowflake will use when assuming the AWS role
- **aws_sqs_role_arn** (String) AWS IAM role ARN for notification integration to assume
- **azure_storage_queue_primary_uri** (String) The queue ID for the Azure Queue Storage queue created for Event Grid notifications
- **azure_tenant_id** (String) The ID of the Azure Active Directory tenant used for identity management
- **comment** (String)
- **direction** (String) Direction of the cloud messaging with respect to Snowflake (required only for error notifications)
- **enabled** (Boolean)
- **id** (String) The ID of this resource.
- **notification_provider** (String) The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE)
- **notification_provider** (String) The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE, AWS_SQS)
- **type** (String) A type of integration

### Read-Only
Expand Down
1 change: 1 addition & 0 deletions docs/resources/pipe.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ resource snowflake_pipe pipe {
- **auto_ingest** (Boolean) Specifies a auto_ingest param for the pipe.
- **aws_sns_topic_arn** (String) Specifies the Amazon Resource Name (ARN) for the SNS topic for your S3 bucket.
- **comment** (String) Specifies a comment for the pipe.
- **error_integration** (String) Specifies the name of the notification integration used for error notifications.
- **id** (String) The ID of this resource.
- **integration** (String) Specifies an integration for the pipe.

Expand Down
74 changes: 69 additions & 5 deletions pkg/resources/notification_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,44 @@ var notificationIntegrationSchema = map[string]*schema.Schema{
ValidateFunc: validation.StringInSlice([]string{"QUEUE"}, true),
Description: "A type of integration",
},
"direction": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringInSlice([]string{"INBOUND", "OUTBOUND"}, true),
Description: "Direction of the cloud messaging with respect to Snowflake (required only for error notifications)",
},
// This part of the schema is the cloudProviderParams in the Snowflake documentation and differs between vendors
"notification_provider": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "AZURE_STORAGE_QUEUE",
ValidateFunc: validation.StringInSlice([]string{"AZURE_STORAGE_QUEUE"}, true),
Description: "The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE)",
ValidateFunc: validation.StringInSlice([]string{"AZURE_STORAGE_QUEUE", "AWS_SQS"}, true),
Description: "The third-party cloud message queuing service (e.g. AZURE_STORAGE_QUEUE, AWS_SQS)",
},
"azure_storage_queue_primary_uri": &schema.Schema{
Type: schema.TypeString,
Required: true,
Optional: true,
Description: "The queue ID for the Azure Queue Storage queue created for Event Grid notifications",
},
"azure_tenant_id": &schema.Schema{
Type: schema.TypeString,
Required: true,
Optional: true,
Description: "The ID of the Azure Active Directory tenant used for identity management",
},
"aws_sqs_external_id": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Description: "The external ID that Snowflake will use when assuming the AWS role",
},
"aws_sqs_arn": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Description: "AWS SQS queue ARN for notification integration to connect to",
},
"aws_sqs_role_arn": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Description: "AWS IAM role ARN for notification integration to assume",
},
"comment": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -90,6 +111,12 @@ func CreateNotificationIntegration(data *schema.ResourceData, meta interface{})
if v, ok := data.GetOk("comment"); ok {
stmt.SetString(`COMMENT`, v.(string))
}
if v, ok := data.GetOk("direction"); ok {
stmt.SetString(`DIRECTION`, v.(string))
}
if v, ok := data.GetOk("azure_tenant_id"); ok {
stmt.SetString(`AZURE_TENANT_ID`, v.(string))
}
if v, ok := data.GetOk("notification_provider"); ok {
stmt.SetString(`NOTIFICATION_PROVIDER`, v.(string))
}
Expand All @@ -99,6 +126,12 @@ func CreateNotificationIntegration(data *schema.ResourceData, meta interface{})
if v, ok := data.GetOk("azure_tenant_id"); ok {
stmt.SetString(`AZURE_TENANT_ID`, v.(string))
}
if v, ok := data.GetOk("aws_sqs_arn"); ok {
stmt.SetString(`AWS_SQS_ARN`, v.(string))
}
if v, ok := data.GetOk("aws_sqs_role_arn"); ok {
stmt.SetString(`AWS_SQS_ROLE_ARN`, v.(string))
}

err := snowflake.Exec(db, stmt.Statement())
if err != nil {
Expand Down Expand Up @@ -167,6 +200,10 @@ func ReadNotificationIntegration(data *schema.ResourceData, meta interface{}) er
switch k {
case "ENABLED":
// We set this using the SHOW INTEGRATION call so let's ignore it here
case "DIRECTION":
if err = data.Set("direction", v.(string)); err != nil {
return err
}
case "NOTIFICATION_PROVIDER":
if err = data.Set("notification_provider", v.(string)); err != nil {
return err
Expand All @@ -179,6 +216,18 @@ func ReadNotificationIntegration(data *schema.ResourceData, meta interface{}) er
if err = data.Set("azure_tenant_id", v.(string)); err != nil {
return err
}
case "AWS_SQS_ARN":
if err = data.Set("aws_sqs_arn", v.(string)); err != nil {
return err
}
case "AWS_SQS_ROLE_ARN":
if err = data.Set("aws_sqs_role_arn", v.(string)); err != nil {
return err
}
case "AWS_SQS_EXTERNAL_ID":
if err = data.Set("aws_sqs_external_id", v.(string)); err != nil {
return err
}
default:
log.Printf("[WARN] unexpected property %v returned from Snowflake", k)
}
Expand Down Expand Up @@ -213,6 +262,11 @@ func UpdateNotificationIntegration(data *schema.ResourceData, meta interface{})
stmt.SetBool(`ENABLED`, data.Get("enabled").(bool))
}

if data.HasChange("direction") {
runSetStatement = true
stmt.SetString("DIRECTION", data.Get("direction").(string))
}

if data.HasChange("notification_provider") {
runSetStatement = true
stmt.SetString("NOTIFICATION_PROVIDER", data.Get("notification_provider").(string))
Expand All @@ -228,6 +282,16 @@ func UpdateNotificationIntegration(data *schema.ResourceData, meta interface{})
stmt.SetString("AZURE_TENANT_ID", data.Get("azure_tenant_id").(string))
}

if data.HasChange("aws_sqs_arn") {
runSetStatement = true
stmt.SetString("AWS_SQS_ARN", data.Get("aws_sqs_arn").(string))
}

if data.HasChange("aws_sqs_role_arn") {
runSetStatement = true
stmt.SetString("AWS_SQS_ROLE_ARN", data.Get("aws_sqs_role_arn").(string))
}

if runSetStatement {
if err := snowflake.Exec(db, stmt.Statement()); err != nil {
return fmt.Errorf("error updating notification integration: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions pkg/resources/notification_integration_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ func TestAcc_NotificationIntegration(t *testing.T) {
Providers: providers(),
Steps: []resource.TestStep{
{
Config: notificationIntegrationConfig(accName, storageUri, tenant),
Config: azureNotificationIntegrationConfig(accName, storageUri, tenant),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("snowflake_notification_integration.test", "name", accName),
resource.TestCheckResourceAttr("snowflake_notification_integration.test", "notification_provider", "AZURE_STORAGE_QUEUE"),
resource.TestCheckResourceAttr("snowflake_notification_integration.test", "azure_storage_queue_primary_uri", storageUri),
resource.TestCheckResourceAttr("snowflake_notification_integration.test", "azure_tenant_id", tenant),
),
Expand All @@ -33,13 +34,14 @@ func TestAcc_NotificationIntegration(t *testing.T) {
})
}

func notificationIntegrationConfig(name string, azureStorageQueuePrimaryUri string, azureTenantId string) string {
func azureNotificationIntegrationConfig(name string, azureStorageQueuePrimaryUri string, azureTenantId string) string {
s := `
resource "snowflake_notification_integration" "test" {
name = "%s"
notification_provider = "%s"
azure_storage_queue_primary_uri = "%s"
azure_tenant_id = "%s"
}
`
return fmt.Sprintf(s, name, azureStorageQueuePrimaryUri, azureTenantId)
return fmt.Sprintf(s, name, "AZURE_STORAGE_QUEUE", azureStorageQueuePrimaryUri, azureTenantId)
}
108 changes: 77 additions & 31 deletions pkg/resources/notification_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,73 @@ func TestNotificationIntegration(t *testing.T) {
}

func TestNotificationIntegrationCreate(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "test_notification_integration",
"comment": "great comment",
"azure_storage_queue_primary_uri": "azure://great-bucket/great-path/",
"azure_tenant_id": "some-guid",
testCases := []struct {
notificationProvider string
raw map[string]interface{}
expectSQL string
}{
{
notificationProvider: "AZURE_STORAGE_QUEUE",
raw: map[string]interface{}{
"name": "test_notification_integration",
"comment": "great comment",
"notification_provider": "AZURE_STORAGE_QUEUE",
"azure_storage_queue_primary_uri": "azure://great-bucket/great-path/",
"azure_tenant_id": "some-guid",
},
expectSQL: `^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AZURE_STORAGE_QUEUE_PRIMARY_URI='azure://great-bucket/great-path/' AZURE_TENANT_ID='some-guid' COMMENT='great comment' NOTIFICATION_PROVIDER='AZURE_STORAGE_QUEUE' TYPE='QUEUE' ENABLED=true$`,
},
{
notificationProvider: "AWS_SQS",
raw: map[string]interface{}{
"name": "test_notification_integration",
"comment": "great comment",
"direction": "OUTBOUND",
"notification_provider": "AWS_SQS",
"aws_sqs_arn": "some-sqs-arn",
"aws_sqs_role_arn": "some-iam-role-arn",
},
expectSQL: `^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AWS_SQS_ARN='some-sqs-arn' AWS_SQS_ROLE_ARN='some-iam-role-arn' COMMENT='great comment' DIRECTION='OUTBOUND' NOTIFICATION_PROVIDER='AWS_SQS' TYPE='QUEUE' ENABLED=true$`,
},
}
for _, testCase := range testCases {
r := require.New(t)
d := schema.TestResourceDataRaw(t, resources.NotificationIntegration().Schema, testCase.raw)
r.NotNil(d)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(testCase.expectSQL).WillReturnResult(sqlmock.NewResult(1, 1))
expectReadNotificationIntegration(mock, testCase.notificationProvider)

err := resources.CreateNotificationIntegration(d, db)
r.NoError(err)
})
}
d := schema.TestResourceDataRaw(t, resources.NotificationIntegration().Schema, in)
r.NotNil(d)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
`^CREATE NOTIFICATION INTEGRATION "test_notification_integration" AZURE_STORAGE_QUEUE_PRIMARY_URI='azure://great-bucket/great-path/' AZURE_TENANT_ID='some-guid' COMMENT='great comment' NOTIFICATION_PROVIDER='AZURE_STORAGE_QUEUE' TYPE='QUEUE' ENABLED=true$`,
).WillReturnResult(sqlmock.NewResult(1, 1))
expectReadNotificationIntegration(mock)

err := resources.CreateNotificationIntegration(d, db)
r.NoError(err)
})
}

func TestNotificationIntegrationRead(t *testing.T) {
r := require.New(t)
testCases := []struct {
notificationProvider string
}{
{
notificationProvider: "AZURE_STORAGE_QUEUE",
},
{
notificationProvider: "AWS_SQS",
},
}
for _, testCase := range testCases {
r := require.New(t)

d := notificationIntegration(t, "test_notification_integration", map[string]interface{}{"name": "test_notification_integration"})
d := notificationIntegration(t, "test_notification_integration", map[string]interface{}{"name": "test_notification_integration"})

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
expectReadNotificationIntegration(mock)
WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
expectReadNotificationIntegration(mock, testCase.notificationProvider)

err := resources.ReadNotificationIntegration(d, db)
r.NoError(err)
})
err := resources.ReadNotificationIntegration(d, db)
r.NoError(err)
})
}
}

func TestNotificationIntegrationDelete(t *testing.T) {
Expand All @@ -66,18 +100,30 @@ func TestNotificationIntegrationDelete(t *testing.T) {
})
}

func expectReadNotificationIntegration(mock sqlmock.Sqlmock) {
func expectReadNotificationIntegration(mock sqlmock.Sqlmock, notificationProvider string) {
showRows := sqlmock.NewRows([]string{
"name", "type", "category", "enabled", "created_on"},
).AddRow("test_notification_integration", "QUEUE", "NOTIFICATION", true, "now")
mock.ExpectQuery(`^SHOW NOTIFICATION INTEGRATIONS LIKE 'test_notification_integration'$`).WillReturnRows(showRows)

descRows := sqlmock.NewRows([]string{
"property", "property_type", "property_value", "property_default",
}).AddRow("ENABLED", "Boolean", true, false).
AddRow("NOTIFICATION_PROVIDER", "String", "AZURE_STORAGE_QUEUE", nil).
AddRow("AZURE_STORAGE_QUEUE_PRIMARY_URI", "String", "azure://great-bucket/great-path/", nil).
AddRow("AZURE_TENANT_ID", "String", "some-guid", nil)
}).AddRow("ENABLED", "Boolean", true, false)

switch notificationProvider {
case "AZURE_STORAGE_QUEUE":
descRows = descRows.
AddRow("NOTIFICATION_PROVIDER", "String", notificationProvider, nil).
AddRow("AZURE_STORAGE_QUEUE_PRIMARY_URI", "String", "azure://great-bucket/great-path/", nil).
AddRow("AZURE_TENANT_ID", "String", "some-guid", nil)
case "AWS_SQS":
descRows = descRows.
AddRow("NOTIFICATION_PROVIDER", "String", notificationProvider, nil).
AddRow("DIRECTION", "String", "OUTBOUND", nil).
AddRow("AWS_SQS_ARN", "String", "some-sqs-arn", nil).
AddRow("AWS_SQS_ROLE_ARN", "String", "some-iam-role-arn", nil).
AddRow("AWS_SQS_EXTERNAL_ID", "String", "AGreatExternalID", nil)
}

mock.ExpectQuery(`DESCRIBE NOTIFICATION INTEGRATION "test_notification_integration"$`).WillReturnRows(descRows)
}
23 changes: 23 additions & 0 deletions pkg/resources/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ var pipeSchema = map[string]*schema.Schema{
Computed: true,
Description: "Name of the role that owns the pipe.",
},
"error_integration": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Description: "Specifies the name of the notification integration used for error notifications.",
},
}

func Pipe() *schema.Resource {
Expand Down Expand Up @@ -176,6 +181,10 @@ func CreatePipe(d *schema.ResourceData, meta interface{}) error {
builder.WithIntegration(v.(string))
}

if v, ok := d.GetOk("error_integration"); ok {
builder.WithErrorIntegration((v.(string)))
}

q := builder.Create()

err := snowflake.Exec(db, q)
Expand Down Expand Up @@ -267,6 +276,11 @@ func ReadPipe(d *schema.ResourceData, meta interface{}) error {
return err
}

err = d.Set("error_integration", pipe.ErrorIntegration.String)
if err != nil {
return err
}

return nil
}

Expand All @@ -293,6 +307,15 @@ func UpdatePipe(d *schema.ResourceData, meta interface{}) error {
}
}

if d.HasChange("error_integration") {
errorIntegration := d.Get("error_integration")
q := builder.ChangeErrorIntegration(errorIntegration.(string))
err := snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error updating pipe error_integration on %v", d.Id())
}
}

return ReadPipe(d, meta)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/resources/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestPipeRead(t *testing.T) {

func expectReadPipe(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{
"created_on", "name", "database_name", "schema_name", "definition", "owner", "notification_channel", "comment"},
).AddRow("2019-12-23 17:20:50.088 +0000", "test_pipe", "test_db", "test_schema", "test definition", "N", "test", "great comment")
"created_on", "name", "database_name", "schema_name", "definition", "owner", "notification_channel", "comment", "error_integration"},
).AddRow("2019-12-23 17:20:50.088 +0000", "test_pipe", "test_db", "test_schema", "test definition", "N", "test", "great comment", "test_integration")
mock.ExpectQuery(`^SHOW PIPES LIKE 'test_pipe' IN SCHEMA "test_db"."test_schema"$`).WillReturnRows(rows)
}
Loading

0 comments on commit 90af4cf

Please sign in to comment.