This repository has been archived by the owner on Feb 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathload_items.py
executable file
·119 lines (104 loc) · 3.98 KB
/
load_items.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import boto3
from botocore.exceptions import ClientError
from faker import Factory
from time import sleep
import time
import uuid
from decimal import Decimal
import argparse
from datetime import datetime
fake = Factory.create()
def item_gen(id, region_name):
p = fake.profile()
i = dict()
i['PK'] = str(id)
i['first_name'] = fake.first_name()
i['last_name'] = fake.last_name()
i['email'] = p['mail']
i['sex'] = p['sex']
i['street_address'] = fake.street_address()
i['state'] = fake.state()
i['city'] = fake.city()
i['zipcode'] = fake.zipcode()
i['country'] = fake.country()
i['govid'] = fake.ssn()
i['last_update_timestamp'] = Decimal(time.time())
i['last_updater_region'] = region_name
return i
def update_stats_metrics(stats_table, cloudwatch, loaded_count):
if loaded_count:
response = stats_table.update_item(
Key={'PK': 'loaded_count'},
UpdateExpression="set cnt = cnt + :val",
ExpressionAttributeValues={
':val': loaded_count
},
ReturnValues="UPDATED_NEW"
)
total = int(response['Attributes']['cnt'])
cloudwatch.put_metric_data(
MetricData=[
{
'MetricName': 'Total_loaded',
'Dimensions': [
{
'Name': 'loader',
'Value': 'ddb-loader'
},
],
'Unit': 'Count',
'StorageResolution': 1,
'Value': total
},
],
Namespace='DDB-Loader'
)
if __name__ == "__main__":
retries = 0 # used for backoff function
RETRY_EXCEPTIONS = ('ProvisionedThroughputExceededException',
'ThrottlingException')
parser = argparse.ArgumentParser()
parser.add_argument('-n', type=int, default=10000, help='Number of items to generate and write to DynamoDB table')
parser.add_argument('-r', required=True, default='ap-southeast-1', help='Region of source DynamoDB table')
parser.add_argument('-b', action='store_true', help='Write to DynamoDB table using batched write for higher load')
parser.add_argument('-t', required=True, help='DynamoDB table to load on')
args = parser.parse_args()
count = args.n
region_name = args.r
batched = args.b
table_name = args.t
session = boto3.Session(region_name=region_name)
ddb_source = session.resource('dynamodb')
cloudwatch = session.client('cloudwatch')
loader_stats_table = ddb_source.Table('loader_stats')
start_time = datetime.now()
source_table = ddb_source.Table(table_name)
if batched:
with source_table.batch_writer() as batch:
for i in range(count):
user = item_gen(uuid.uuid4(), region_name)
batch.put_item(Item=user)
if i % 100 == 0:
print("Done for {} items.".format(i))
update_stats_metrics(loader_stats_table, cloudwatch, 100)
else:
for i in range(count):
user = item_gen(uuid.uuid4(), region_name)
while True:
try:
source_table.put_item(Item=user)
retries = 0
break
except ClientError as err:
if err.response['Error']['Code'] not in RETRY_EXCEPTIONS:
raise
print("Retrying for item {}. Retry count: {}".format(i, retries))
sleep(2 ** retries)
retries += 1
if i % 100 == 0:
print("Done for {} items.".format(i))
update_stats_metrics(loader_stats_table,cloudwatch,100)
print("Test done for {}".format(count))
end_time = datetime.now()
duration = end_time - start_time
print("Test from {} to {}. Run time: {}".format(str(start_time), str(end_time), str(duration)))