-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_specific_table_ddl.py
84 lines (71 loc) · 2.93 KB
/
generate_specific_table_ddl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import time
import boto3
import pandas as pd
class QueryAthena:
def __init__(self, query, database, query_output_bucket):
self.database = database
self.folder = 'temp/ddl/'
self.bucket = query_output_bucket
self.s3_output = 's3://' + self.bucket + '/' + self.folder
self.query = query
def load_conf(self, q):
try:
self.client = boto3.client('athena')
response = self.client.start_query_execution(
QueryString = q,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
self.filename = response['QueryExecutionId']
print('Execution ID: ' + response['QueryExecutionId'])
except Exception as e:
print(e)
return response
def run_query(self):
queries = [self.query]
for q in queries:
res = self.load_conf(q)
try:
query_status = None
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
print(query_status)
if query_status == 'FAILED' or query_status == 'CANCELLED':
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
time.sleep(10)
print('Query "{}" finished.'.format(self.query))
create_ddl = self.obtain_data()
self.clean_up()
return create_ddl
except Exception as e:
print(e)
def obtain_data(self):
try:
self.resource = boto3.resource('s3')
response = self.resource \
.Bucket(self.bucket) \
.Object(key= self.folder + self.filename + '.txt') \
.get()
ddl = (response['Body'].read()).decode('utf-8')
return ddl
except Exception as e:
print(e)
def clean_up(self):
self.s3 = boto3.resource('s3')
bucket = self.s3.Bucket(self.bucket)
for obj in bucket.objects.filter(Prefix='Query-Results/'):
self.s3.Object(bucket.name,obj.key).delete()
for obj in bucket.objects.filter(Prefix='temp/ddl'):
self.s3.Object(bucket.name, obj.key).delete()
if __name__ == "__main__":
database = input("Enter Database Name ")
tablename = input("Enter Table Name ")
output_bucket = input("Enter Query Output Bucket Name ")
query = f"SHOW CREATE TABLE {database}.{tablename};"
qa = QueryAthena(query=query, query_output_bucket=output_bucket, database=database)
ddl = qa.run_query()
print(ddl)