Skip to content

Commit

Permalink
New Resource: aws_athena_database (#1922)
Browse files Browse the repository at this point in the history
* Make files

* wip

* implement CRD methods

* make docs

* Reflect reviews

* Reflect 2nd reviews
  • Loading branch information
atsushi-ishibashi authored and radeksimko committed Nov 9, 2017
1 parent e398f97 commit 76052ff
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 0 deletions.
3 changes: 3 additions & 0 deletions aws/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aws/aws-sdk-go/service/acm"
"github.com/aws/aws-sdk-go/service/apigateway"
"github.com/aws/aws-sdk-go/service/applicationautoscaling"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/batch"
"github.com/aws/aws-sdk-go/service/cloudformation"
Expand Down Expand Up @@ -181,6 +182,7 @@ type AWSClient struct {
wafregionalconn *wafregional.WAFRegional
iotconn *iot.IoT
batchconn *batch.Batch
athenaconn *athena.Athena
dxconn *directconnect.DirectConnect
}

Expand Down Expand Up @@ -392,6 +394,7 @@ func (c *Config) Client() (interface{}, error) {
client.wafconn = waf.New(sess)
client.wafregionalconn = wafregional.New(sess)
client.batchconn = batch.New(sess)
client.athenaconn = athena.New(sess)
client.dxconn = directconnect.New(sess)

// Workaround for https://github.com/aws/aws-sdk-go/issues/1376
Expand Down
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func Provider() terraform.ResourceProvider {
"aws_app_cookie_stickiness_policy": resourceAwsAppCookieStickinessPolicy(),
"aws_appautoscaling_target": resourceAwsAppautoscalingTarget(),
"aws_appautoscaling_policy": resourceAwsAppautoscalingPolicy(),
"aws_athena_database": resourceAwsAthenaDatabase(),
"aws_autoscaling_attachment": resourceAwsAutoscalingAttachment(),
"aws_autoscaling_group": resourceAwsAutoscalingGroup(),
"aws_autoscaling_notification": resourceAwsAutoscalingNotification(),
Expand Down
183 changes: 183 additions & 0 deletions aws/resource_aws_athena_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package aws

import (
"fmt"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)

func resourceAwsAthenaDatabase() *schema.Resource {
return &schema.Resource{
Create: resourceAwsAthenaDatabaseCreate,
Read: resourceAwsAthenaDatabaseRead,
Delete: resourceAwsAthenaDatabaseDelete,

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"bucket": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
},
}
}

func resourceAwsAthenaDatabaseCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).athenaconn

input := &athena.StartQueryExecutionInput{
QueryString: aws.String(fmt.Sprintf("create database %s;", d.Get("name").(string))),
ResultConfiguration: &athena.ResultConfiguration{
OutputLocation: aws.String("s3://" + d.Get("bucket").(string)),
},
}

resp, err := conn.StartQueryExecution(input)
if err != nil {
return err
}

if err := executeAndExpectNoRowsWhenCreate(*resp.QueryExecutionId, d, conn); err != nil {
return err
}
d.SetId(d.Get("name").(string))
return resourceAwsAthenaDatabaseRead(d, meta)
}

func resourceAwsAthenaDatabaseRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).athenaconn

bucket := d.Get("bucket").(string)
input := &athena.StartQueryExecutionInput{
QueryString: aws.String(fmt.Sprint("show databases;")),
ResultConfiguration: &athena.ResultConfiguration{
OutputLocation: aws.String("s3://" + bucket),
},
}

resp, err := conn.StartQueryExecution(input)
if err != nil {
return err
}

if err := executeAndExpectMatchingRow(*resp.QueryExecutionId, d.Get("name").(string), conn); err != nil {
return err
}
return nil
}

func resourceAwsAthenaDatabaseDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).athenaconn

bucket := d.Get("bucket").(string)
input := &athena.StartQueryExecutionInput{
QueryString: aws.String(fmt.Sprintf("drop database %s;", d.Get("name").(string))),
ResultConfiguration: &athena.ResultConfiguration{
OutputLocation: aws.String("s3://" + bucket),
},
}

resp, err := conn.StartQueryExecution(input)
if err != nil {
return err
}

if err := executeAndExpectNoRowsWhenDrop(*resp.QueryExecutionId, d, conn); err != nil {
return err
}
return nil
}

func executeAndExpectNoRowsWhenCreate(qeid string, d *schema.ResourceData, conn *athena.Athena) error {
rs, err := queryExecutionResult(qeid, conn)
if err != nil {
return err
}
if len(rs.Rows) != 0 {
return fmt.Errorf("[ERROR] Athena create database, unexpected query result: %s", flattenAthenaResultSet(rs))
}
return nil
}

func executeAndExpectMatchingRow(qeid string, dbName string, conn *athena.Athena) error {
rs, err := queryExecutionResult(qeid, conn)
if err != nil {
return err
}
for _, row := range rs.Rows {
for _, datum := range row.Data {
if *datum.VarCharValue == dbName {
return nil
}
}
}
return fmt.Errorf("[ERROR] Athena not found database: %s, query result: %s", dbName, flattenAthenaResultSet(rs))
}

func executeAndExpectNoRowsWhenDrop(qeid string, d *schema.ResourceData, conn *athena.Athena) error {
rs, err := queryExecutionResult(qeid, conn)
if err != nil {
return err
}
if len(rs.Rows) != 0 {
return fmt.Errorf("[ERROR] Athena drop database, unexpected query result: %s", flattenAthenaResultSet(rs))
}
return nil
}

func queryExecutionResult(qeid string, conn *athena.Athena) (*athena.ResultSet, error) {
executionStateConf := &resource.StateChangeConf{
Pending: []string{athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning},
Target: []string{athena.QueryExecutionStateSucceeded},
Refresh: queryExecutionStateRefreshFunc(qeid, conn),
Timeout: 10 * time.Minute,
Delay: 3 * time.Second,
MinTimeout: 3 * time.Second,
}
_, err := executionStateConf.WaitForState()
if err != nil {
return nil, err
}

qrinput := &athena.GetQueryResultsInput{
QueryExecutionId: aws.String(qeid),
}
resp, err := conn.GetQueryResults(qrinput)
if err != nil {
return nil, err
}
return resp.ResultSet, nil
}

func queryExecutionStateRefreshFunc(qeid string, conn *athena.Athena) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
input := &athena.GetQueryExecutionInput{
QueryExecutionId: aws.String(qeid),
}
out, err := conn.GetQueryExecution(input)
if err != nil {
return nil, "failed", err
}
return out, *out.QueryExecution.Status.State, nil
}
}

func flattenAthenaResultSet(rs *athena.ResultSet) string {
ss := make([]string, 0)
for _, row := range rs.Rows {
for _, datum := range row.Data {
ss = append(ss, *datum.VarCharValue)
}
}
return strings.Join(ss, "\n")
}
143 changes: 143 additions & 0 deletions aws/resource_aws_athena_database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package aws

import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)

func TestAccAWSAthenaDatabase_basic(t *testing.T) {
rInt := acctest.RandInt()
dbName := acctest.RandString(8)
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSAthenaDatabaseDestroy,
Steps: []resource.TestStep{
{
Config: testAccAthenaDatabaseConfig(rInt, dbName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAWSAthenaDatabaseExists("aws_athena_database.hoge"),
),
},
},
})
}

// StartQueryExecution requires OutputLocation but terraform destroy deleted S3 bucket as well.
// So temporary S3 bucket as OutputLocation is created to confirm whether the database is acctually deleted.
func testAccCheckAWSAthenaDatabaseDestroy(s *terraform.State) error {
athenaconn := testAccProvider.Meta().(*AWSClient).athenaconn
s3conn := testAccProvider.Meta().(*AWSClient).s3conn
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_athena_database" {
continue
}

rInt := acctest.RandInt()
bucketName := fmt.Sprintf("tf-athena-db-%s-%d", rs.Primary.Attributes["name"], rInt)
_, err := s3conn.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
return err
}

input := &athena.StartQueryExecutionInput{
QueryString: aws.String(fmt.Sprint("show databases;")),
ResultConfiguration: &athena.ResultConfiguration{
OutputLocation: aws.String("s3://" + bucketName),
},
}

resp, err := athenaconn.StartQueryExecution(input)
if err != nil {
return err
}

ers, err := queryExecutionResult(*resp.QueryExecutionId, athenaconn)
if err != nil {
return err
}
found := false
dbName := rs.Primary.Attributes["name"]
for _, row := range ers.Rows {
for _, datum := range row.Data {
if *datum.VarCharValue == dbName {
found = true
}
}
}
if found {
return fmt.Errorf("[DELETE ERROR] Athena failed to drop database: %s", dbName)
}

loresp, err := s3conn.ListObjectsV2(
&s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
},
)
if err != nil {
return fmt.Errorf("[DELETE ERROR] S3 Bucket list Objects err: %s", err)
}

objectsToDelete := make([]*s3.ObjectIdentifier, 0)

if len(loresp.Contents) != 0 {
for _, v := range loresp.Contents {
objectsToDelete = append(objectsToDelete, &s3.ObjectIdentifier{
Key: v.Key,
})
}
}

_, err = s3conn.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(bucketName),
Delete: &s3.Delete{
Objects: objectsToDelete,
},
})
if err != nil {
return fmt.Errorf("[DELETE ERROR] S3 Bucket delete Objects err: %s", err)
}

_, err = s3conn.DeleteBucket(&s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
return fmt.Errorf("[DELETE ERROR] S3 Bucket delete Bucket err: %s", err)
}

}
return nil
}

func testAccCheckAWSAthenaDatabaseExists(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
_, ok := s.RootModule().Resources[name]
if !ok {
return fmt.Errorf("Not found: %s, %v", name, s.RootModule().Resources)
}
return nil
}
}

func testAccAthenaDatabaseConfig(randInt int, dbName string) string {
return fmt.Sprintf(`
resource "aws_s3_bucket" "hoge" {
bucket = "tf-athena-db-%s-%d"
force_destroy = true
}
resource "aws_athena_database" "hoge" {
name = "%s"
bucket = "${aws_s3_bucket.hoge.bucket}"
}
`, dbName, randInt, dbName)
}
9 changes: 9 additions & 0 deletions website/aws.erb
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@
</ul>
</li>

<li<%= sidebar_current("docs-aws-resource-athena") %>>
<a href="#">Athena Resources</a>
<ul class="nav nav-visible">
<li<%= sidebar_current("docs-aws-resource-athena-database") %>>
<a href="/docs/providers/aws/r/athena_database.html">aws_athena_database</a>
</li>
</ul>
</li>

<li<%= sidebar_current("docs-aws-resource-batch") %>>
<a href="#">Batch Resources</a>
<ul class="nav nav-visible">
Expand Down
Loading

0 comments on commit 76052ff

Please sign in to comment.