Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Resource: aws_athena_database #1922

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aws/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aws/aws-sdk-go/service/acm"
"github.com/aws/aws-sdk-go/service/apigateway"
"github.com/aws/aws-sdk-go/service/applicationautoscaling"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/batch"
"github.com/aws/aws-sdk-go/service/cloudformation"
Expand Down Expand Up @@ -181,6 +182,7 @@ type AWSClient struct {
wafregionalconn *wafregional.WAFRegional
iotconn *iot.IoT
batchconn *batch.Batch
athenaconn *athena.Athena
dxconn *directconnect.DirectConnect
}

Expand Down Expand Up @@ -392,6 +394,7 @@ func (c *Config) Client() (interface{}, error) {
client.wafconn = waf.New(sess)
client.wafregionalconn = wafregional.New(sess)
client.batchconn = batch.New(sess)
client.athenaconn = athena.New(sess)
client.dxconn = directconnect.New(sess)

// Workaround for https://github.com/aws/aws-sdk-go/issues/1376
Expand Down
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func Provider() terraform.ResourceProvider {
"aws_app_cookie_stickiness_policy": resourceAwsAppCookieStickinessPolicy(),
"aws_appautoscaling_target": resourceAwsAppautoscalingTarget(),
"aws_appautoscaling_policy": resourceAwsAppautoscalingPolicy(),
"aws_athena_database": resourceAwsAthenaDatabase(),
"aws_autoscaling_attachment": resourceAwsAutoscalingAttachment(),
"aws_autoscaling_group": resourceAwsAutoscalingGroup(),
"aws_autoscaling_notification": resourceAwsAutoscalingNotification(),
Expand Down
183 changes: 183 additions & 0 deletions aws/resource_aws_athena_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package aws

import (
"fmt"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)

func resourceAwsAthenaDatabase() *schema.Resource {
return &schema.Resource{
Create: resourceAwsAthenaDatabaseCreate,
Read: resourceAwsAthenaDatabaseRead,
Delete: resourceAwsAthenaDatabaseDelete,

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"bucket": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
},
}
}

func resourceAwsAthenaDatabaseCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).athenaconn

input := &athena.StartQueryExecutionInput{
QueryString: aws.String(fmt.Sprintf("create database %s;", d.Get("name").(string))),
ResultConfiguration: &athena.ResultConfiguration{
OutputLocation: aws.String("s3://" + d.Get("bucket").(string)),
},
}

resp, err := conn.StartQueryExecution(input)
if err != nil {
return err
}

if err := executeAndExpectNoRowsWhenCreate(*resp.QueryExecutionId, d, conn); err != nil {
return err
}
d.SetId(d.Get("name").(string))
return resourceAwsAthenaDatabaseRead(d, meta)
}

func resourceAwsAthenaDatabaseRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).athenaconn

bucket := d.Get("bucket").(string)
input := &athena.StartQueryExecutionInput{
QueryString: aws.String(fmt.Sprint("show databases;")),
ResultConfiguration: &athena.ResultConfiguration{
OutputLocation: aws.String("s3://" + bucket),
},
}

resp, err := conn.StartQueryExecution(input)
if err != nil {
return err
}

if err := executeAndExpectMatchingRow(*resp.QueryExecutionId, d.Get("name").(string), conn); err != nil {
return err
}
return nil
}

func resourceAwsAthenaDatabaseDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).athenaconn

bucket := d.Get("bucket").(string)
input := &athena.StartQueryExecutionInput{
QueryString: aws.String(fmt.Sprintf("drop database %s;", d.Get("name").(string))),
ResultConfiguration: &athena.ResultConfiguration{
OutputLocation: aws.String("s3://" + bucket),
},
}

resp, err := conn.StartQueryExecution(input)
if err != nil {
return err
}

if err := executeAndExpectNoRowsWhenDrop(*resp.QueryExecutionId, d, conn); err != nil {
return err
}
return nil
}

func executeAndExpectNoRowsWhenCreate(qeid string, d *schema.ResourceData, conn *athena.Athena) error {
rs, err := queryExecutionResult(qeid, conn)
if err != nil {
return err
}
if len(rs.Rows) != 0 {
return fmt.Errorf("[ERROR] Athena create database, unexpected query result: %s", flattenAthenaResultSet(rs))
}
return nil
}

func executeAndExpectMatchingRow(qeid string, dbName string, conn *athena.Athena) error {
rs, err := queryExecutionResult(qeid, conn)
if err != nil {
return err
}
for _, row := range rs.Rows {
for _, datum := range row.Data {
if *datum.VarCharValue == dbName {
return nil
}
}
}
return fmt.Errorf("[ERROR] Athena not found database: %s, query result: %s", dbName, flattenAthenaResultSet(rs))
}

func executeAndExpectNoRowsWhenDrop(qeid string, d *schema.ResourceData, conn *athena.Athena) error {
rs, err := queryExecutionResult(qeid, conn)
if err != nil {
return err
}
if len(rs.Rows) != 0 {
return fmt.Errorf("[ERROR] Athena drop database, unexpected query result: %s", flattenAthenaResultSet(rs))
}
return nil
}

func queryExecutionResult(qeid string, conn *athena.Athena) (*athena.ResultSet, error) {
executionStateConf := &resource.StateChangeConf{
Pending: []string{athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning},
Target: []string{athena.QueryExecutionStateSucceeded},
Refresh: queryExecutionStateRefreshFunc(qeid, conn),
Timeout: 10 * time.Minute,
Delay: 3 * time.Second,
MinTimeout: 3 * time.Second,
}
_, err := executionStateConf.WaitForState()
if err != nil {
return nil, err
}

qrinput := &athena.GetQueryResultsInput{
QueryExecutionId: aws.String(qeid),
}
resp, err := conn.GetQueryResults(qrinput)
if err != nil {
return nil, err
}
return resp.ResultSet, nil
}

func queryExecutionStateRefreshFunc(qeid string, conn *athena.Athena) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
input := &athena.GetQueryExecutionInput{
QueryExecutionId: aws.String(qeid),
}
out, err := conn.GetQueryExecution(input)
if err != nil {
return nil, "failed", err
}
return out, *out.QueryExecution.Status.State, nil
}
}

func flattenAthenaResultSet(rs *athena.ResultSet) string {
ss := make([]string, 0)
for _, row := range rs.Rows {
for _, datum := range row.Data {
ss = append(ss, *datum.VarCharValue)
}
}
return strings.Join(ss, "\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicky question - is there a reason we can't just use fmt.Printf("%s") to print out the result set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to return the exact same query result. But the result is separated by \n and is stored in each datum.VarCharValue.

}
143 changes: 143 additions & 0 deletions aws/resource_aws_athena_database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package aws

import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)

func TestAccAWSAthenaDatabase_basic(t *testing.T) {
rInt := acctest.RandInt()
dbName := acctest.RandString(8)
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSAthenaDatabaseDestroy,
Steps: []resource.TestStep{
{
Config: testAccAthenaDatabaseConfig(rInt, dbName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAWSAthenaDatabaseExists("aws_athena_database.hoge"),
),
},
},
})
}

// StartQueryExecution requires OutputLocation but terraform destroy deleted S3 bucket as well.
// So temporary S3 bucket as OutputLocation is created to confirm whether the database is acctually deleted.
func testAccCheckAWSAthenaDatabaseDestroy(s *terraform.State) error {
athenaconn := testAccProvider.Meta().(*AWSClient).athenaconn
s3conn := testAccProvider.Meta().(*AWSClient).s3conn
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_athena_database" {
continue
}

rInt := acctest.RandInt()
bucketName := fmt.Sprintf("tf-athena-db-%s-%d", rs.Primary.Attributes["name"], rInt)
_, err := s3conn.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
return err
}

input := &athena.StartQueryExecutionInput{
QueryString: aws.String(fmt.Sprint("show databases;")),
ResultConfiguration: &athena.ResultConfiguration{
OutputLocation: aws.String("s3://" + bucketName),
},
}

resp, err := athenaconn.StartQueryExecution(input)
if err != nil {
return err
}

ers, err := queryExecutionResult(*resp.QueryExecutionId, athenaconn)
if err != nil {
return err
}
found := false
dbName := rs.Primary.Attributes["name"]
for _, row := range ers.Rows {
for _, datum := range row.Data {
if *datum.VarCharValue == dbName {
found = true
}
}
}
if found {
return fmt.Errorf("[DELETE ERROR] Athena failed to drop database: %s", dbName)
}

loresp, err := s3conn.ListObjectsV2(
&s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
},
)
if err != nil {
return fmt.Errorf("[DELETE ERROR] S3 Bucket list Objects err: %s", err)
}

objectsToDelete := make([]*s3.ObjectIdentifier, 0)

if len(loresp.Contents) != 0 {
for _, v := range loresp.Contents {
objectsToDelete = append(objectsToDelete, &s3.ObjectIdentifier{
Key: v.Key,
})
}
}

_, err = s3conn.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(bucketName),
Delete: &s3.Delete{
Objects: objectsToDelete,
},
})
if err != nil {
return fmt.Errorf("[DELETE ERROR] S3 Bucket delete Objects err: %s", err)
}

_, err = s3conn.DeleteBucket(&s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
return fmt.Errorf("[DELETE ERROR] S3 Bucket delete Bucket err: %s", err)
}

}
return nil
}

func testAccCheckAWSAthenaDatabaseExists(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
_, ok := s.RootModule().Resources[name]
if !ok {
return fmt.Errorf("Not found: %s, %v", name, s.RootModule().Resources)
}
return nil
}
}

func testAccAthenaDatabaseConfig(randInt int, dbName string) string {
return fmt.Sprintf(`
resource "aws_s3_bucket" "hoge" {
bucket = "tf-athena-db-%s-%d"
force_destroy = true
}

resource "aws_athena_database" "hoge" {
name = "%s"
bucket = "${aws_s3_bucket.hoge.bucket}"
}
`, dbName, randInt, dbName)
}
9 changes: 9 additions & 0 deletions website/aws.erb
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@
</ul>
</li>

<li<%= sidebar_current("docs-aws-resource-athena") %>>
<a href="#">Athena Resources</a>
<ul class="nav nav-visible">
<li<%= sidebar_current("docs-aws-resource-athena-database") %>>
<a href="/docs/providers/aws/r/athena_database.html">aws_athena_database</a>
</li>
</ul>
</li>

<li<%= sidebar_current("docs-aws-resource-batch") %>>
<a href="#">Batch Resources</a>
<ul class="nav nav-visible">
Expand Down
Loading