-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrigger.py
122 lines (107 loc) · 3.31 KB
/
trigger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# -*- coding: utf-8 -*-
import boto3
import os
import json
import sys
from functools import reduce
from decimal import Decimal
def each_slice(size, iterable):
batch = []
for item in iterable:
batch.append(item)
if len(batch) == size:
yield batch
batch = []
if len(batch) > 0:
yield batch
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return int(obj)
return json.JSONEncoder.default(self, obj)
def handler(event, context):
boto3.resource("dynamodb")
deserializer = boto3.dynamodb.types.TypeDeserializer()
def change(sum, item):
new_image = {
k: deserializer.deserialize(v)
for k, v in item["dynamodb"]["NewImage"].items()
}
pk = new_image["pk"]
sk = new_image["sk"]
if not pk.endswith("#stream"):
return sum
log = new_image["log"]
type = new_image["type"]
payload = new_image["payload"]
if not log in sum:
sum[log] = []
sum[log].append({"key": {"pk": pk, "sk": sk}, "type": type, "payload": payload})
return sum
changes = reduce(
change,
filter(
lambda x: x["eventName"] == "MODIFY" or x["eventName"] == "INSERT",
event["Records"],
),
{},
)
config = {"api_version": "2015-10-07"}
if os.getenv("IS_OFFLINE", ""):
config.update(
{
"endpoint_url": "http://127.0.0.1:4010",
"aws_access_key_id": "x",
"aws_secret_access_key": "x",
"region_name": "us-east-1",
}
)
client = boto3.client("events", **config)
def detail(log, item):
detail_with_payload = json.dumps(
{
"log": log,
"key": item["key"],
"type": item["type"],
"payload": item["payload"],
},
cls=DecimalEncoder,
)
detail_less_payload = json.dumps(
{
"log": log,
"key": item["key"],
"type": item["type"],
"partial": True,
"payload": {"id": item["payload"]["id"]},
},
cls=DecimalEncoder,
)
if len(detail_with_payload) <= 10240:
return detail_with_payload
else:
return detail_less_payload
for log, items in changes.items():
for batch in each_slice(10, items):
entries = list(
map(
lambda item: {
"EventBusName": "dynamodb-log",
"Source": "dynamodb-log",
"DetailType": "stream changes",
"Detail": detail(log, item),
},
batch,
)
)
print("put_events", entries)
ret = client.put_events(Entries=entries)
if "FailedEntryCount" in ret:
failedCount = ret["FailedEntryCount"]
else:
failedCount = 0
if failedCount > 0:
raise Exception(
"error sending to eventbridge failed {0}".format(failedCount)
)
return {}