-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathDMSCDC_ProcessTable.py
122 lines (112 loc) · 4.22 KB
/
DMSCDC_ProcessTable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import sys
import json
import datetime
import boto3
import time
from awsglue.utils import getResolvedOptions
import logging
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.addHandler(handler)
ddbconn = boto3.client('dynamodb')
glue = boto3.client('glue')
s3conn = boto3.client('s3')
#Required Parameters
args = getResolvedOptions(sys.argv, ['item'])
item = json.loads(args['item'])
partitionKey = item['PartitionKey']['S']
lastFullLoadDate = item['LastFullLoadDate']['S']
lastIncrementalFile = item['LastIncrementalFile']['S']
activeFlag = item['ActiveFlag']['S']
primaryKey = item['PrimaryKey']['S']
folder = item['folder']['S']
prefix = item['prefix']['S']
full_folder = prefix + folder
path = item['path']['S']
bucket = item['bucket']['S']
out_path = item['out_path']['S']
def runGlueJob(job, args):
logger.info('starting runGlueJob: ' + job + ' args: ' + json.dumps(args))
response = glue.start_job_run(JobName=job,Arguments=args)
count=90
sec=10
jobId=response['JobRunId']
i = 0
while i < count:
response = glue.get_job_run(JobName=job, RunId=jobId)
status = response['JobRun']['JobRunState']
if status == 'SUCCEEDED':
return 0
elif (status == 'RUNNING' or status == 'STARTING' or status == 'STOPPING'):
time.sleep(sec)
i+=1
else:
logger.error('Error during loadIncremental execution: ' + status)
return 1
if i == count:
logger.error('Execution timeout: ' + count*sec + ' seconds')
return 1
logger.warn('starting processTable: ' + path)
loadInitial = False
#determine if need to run initial --> Run Initial --> Update DDB
initialfiles = s3conn.list_objects(Bucket=bucket, Prefix=full_folder+'LOAD').get('Contents')
if initialfiles is not None :
s3FileTS = initialfiles[0]['LastModified'].replace(tzinfo=None)
ddbFileTS = datetime.datetime.strptime(lastFullLoadDate, '%Y-%m-%d %H:%M:%S')
if s3FileTS > ddbFileTS:
message='Starting to process Initial file: ' + full_folder
loadInitial = True
lastFullLoadDate = datetime.datetime.strftime(s3FileTS,'%Y-%m-%d %H:%M:%S')
else:
message='Intial files already processed: ' + full_folder
else:
message='No initial files to process: ' + full_folder
logger.warn(message)
#Call Initial Glue Job for this source
if loadInitial:
args={
'--bucket':bucket,
'--prefix':full_folder,
'--folder':folder,
'--out_path':out_path,
'--partitionKey':partitionKey}
if runGlueJob('DMSCDC_LoadInitial',args) != 1:
ddbconn.update_item(
TableName='DMSCDC_Controller',
Key={"path": {"S":path}},
AttributeUpdates={"LastFullLoadDate": {"Value": {"S": lastFullLoadDate}}})
loadIncremental = False
#Get the latest incremental file
incrementalFiles = s3conn.list_objects_v2(Bucket=bucket, Prefix=full_folder+'2', StartAfter=lastIncrementalFile).get('Contents')
if incrementalFiles is not None:
filecount = len(incrementalFiles)
newIncrementalFile = bucket + '/' + incrementalFiles[filecount-1]['Key']
if newIncrementalFile != lastIncrementalFile:
loadIncremental = True
message = 'Starting to process incremental files: ' + full_folder
else:
message = 'Incremental files already processed: ' + full_folder
else:
message = 'No incremental files to process: ' + full_folder
logger.warn(message)
#Call Incremental Glue Job for this source
if loadIncremental:
args = {
'--bucket':bucket,
'--prefix':full_folder,
'--folder':folder,
'--out_path':out_path,
'--partitionKey':partitionKey,
'--lastIncrementalFile' : lastIncrementalFile,
'--newIncrementalFile' : newIncrementalFile,
'--primaryKey' : primaryKey
}
if runGlueJob('DMSCDC_LoadIncremental',args) != 1:
ddbconn.update_item(
TableName='DMSCDC_Controller',
Key={"path": {"S":path}},
AttributeUpdates={"LastIncrementalFile": {"Value": {"S": newIncrementalFile}}})