Skip to content

Commit

Permalink
Merge pull request #20 from xoeye/feature/stream-consumer-enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Gaultney authored May 11, 2021
2 parents 7e392e9 + 2b35156 commit b7b755f
Show file tree
Hide file tree
Showing 25 changed files with 959 additions and 182 deletions.
19 changes: 16 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
## 1.13.0

`LazyMulticast` - a ContextManager-based interface for
process-local multicasting of a producer.

`funnel_latest_from_log_group` - CloudWatch Logs funnel that composes
easily with the above.

`funnel_sharded_stream` - Genericized sharded stream funnel that
guarantees receiving subsequent writes. Previously existing DynamoDB
streams processor `process_latest_from_stream` uses this now.

### 1.12.2

Cover more exception types in retries for DynamoDB transaction utilities.

### 1.12.1

Update `all_items_for_next_attempt` to be able to handle multiple tables with different key schemas
Update `all_items_for_next_attempt` to be able to handle multiple
tables with different key schemas

## 1.12.0

An enhanced API supporting the use of
`TypedTable` - An enhanced API supporting the use of
`dynamodb.write_versioned.versioned_transact_write_items`, which is a
great way to write business logic against DynamoDB.

Expand Down Expand Up @@ -82,7 +95,7 @@ avoid race conditions.

- `map_tree` now supports postorder transformations via keyword argument.

### 1.4.0
## 1.4.0

- Improved DynamoDB Item-related Exceptions for `GetItem`,
`put_but_raise_if_exists`, and `versioned_diffed_update_item`.
Expand Down
47 changes: 47 additions & 0 deletions dev-utils/cloudwatch-logs-to-local-file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python
"""This is mostly just a silly proof-of-concept"""
import json
import threading
from functools import partial

import boto3

from xoto3.cloudwatch.logs import funnel_latest_from_log_group
from xoto3.multicast import LazyMulticast

CLOUDWATCH_LOGS = LazyMulticast(partial(funnel_latest_from_log_group, boto3.client("logs")))


def write_log_events_to_file(log_group_name: str, filename: str):
with open(filename, "w") as outf:
with CLOUDWATCH_LOGS(log_group_name) as log_events:
for event in log_events:
outf.write(json.dumps(event) + "\n")


def main():
while True:
log_group_name = input("Log Group Name: ")
if not log_group_name:
continue
try:
while True:
output_filename = input("output filename: ")
if not output_filename:
continue
t = threading.Thread(
target=write_log_events_to_file,
args=(log_group_name, output_filename),
daemon=True,
)
t.start()
break
except KeyboardInterrupt:
print("\n")


if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
40 changes: 17 additions & 23 deletions dev-utils/watch-cloudwatch-log-stream.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,33 +1,27 @@
from datetime import datetime, timedelta
from functools import partial
#!/usr/bin/env python
import argparse
from datetime import datetime, timezone

import boto3

from xoto3.paginate import yield_pages_from_operation
from xoto3.cloudwatch.logs import yield_filtered_log_events

cw_client = boto3.client("logs")

start_time = (datetime.utcnow() - timedelta(hours=20)).timestamp() * 1000
end_time = datetime.utcnow().timestamp() * 1000
def main():
parser = argparse.ArgumentParser()
parser.add_argument("log_group_name")
parser.add_argument("--filter-pattern", "-f", default="")
args = parser.parse_args()

query = dict(
logGroupName="xoi-ecs-logs-devl",
logStreamNamePrefix="dataplateocr/dataplateocrContainer",
startTime=int(start_time),
endTime=int(end_time),
)
cw_client = boto3.client("logs")

nt = ("nextToken",)
CLOUDWATCH_FILTER_LOG_EVENTS = (
nt,
nt,
("limit",),
("events",),
)
start_time = datetime.now(timezone.utc)

yield_cloudwatch_pages = partial(yield_pages_from_operation, *CLOUDWATCH_FILTER_LOG_EVENTS,)
for log_event in yield_filtered_log_events(
cw_client, args.log_group_name, start_time, args.filter_pattern
):
print(log_event["message"])


for page in yield_cloudwatch_pages(cw_client.filter_log_events, query,):
for event in page["events"]:
print(event["message"])
if __name__ == "__main__":
main()
27 changes: 12 additions & 15 deletions dev-utils/watch_ddb_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@

import boto3

from xoto3.dynamodb.streams.consume import process_latest_from_stream
from xoto3.dynamodb.streams.records import old_and_new_items_from_stream_event_record
from xoto3.dynamodb.streams.consume import ItemImages, make_dynamodb_stream_images_multicast
from xoto3.dynamodb.utils.index import hash_key_name, range_key_name

DYNAMODB_STREAMS = make_dynamodb_stream_images_multicast()

def make_accept_stream_item_for_table(item_slicer: Callable[[dict], str]):
def accept_stream_item(record: dict):
old, new = old_and_new_items_from_stream_event_record(record)

def make_accept_stream_images(item_slicer: Callable[[dict], str]):
def accept_stream_item(images: ItemImages):
old, new = images
if not old:
print(f"New item: {item_slicer(new)}")
print(f"New item: {item_slicer(new)}") # type: ignore
elif not new:
print(f"Deleted item {item_slicer(old)}")
else:
Expand Down Expand Up @@ -53,8 +54,6 @@ def main():

DDB_RES = boto3.resource("dynamodb")

DDB_STREAMS_CLIENT = boto3.client("dynamodbstreams")

table = DDB_RES.Table(args.table_name)

if args.attribute_names:
Expand All @@ -74,14 +73,12 @@ def item_slicer(item: dict):
item_slicer = make_key_slicer(table)

try:
t, _kill = process_latest_from_stream(
DDB_STREAMS_CLIENT,
table.latest_stream_arn,
make_accept_stream_item_for_table(item_slicer),
)
t.join()
accept_stream_images = make_accept_stream_images(item_slicer)
with DYNAMODB_STREAMS(args.table_name) as table_stream:
for images in table_stream:
accept_stream_images(images)
except KeyboardInterrupt:
pass
pass # no noisy log - Ctrl-C for clean exit


if __name__ == "__main__":
Expand Down
39 changes: 39 additions & 0 deletions tests/xoto3/backoff_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest
from botocore.exceptions import ClientError

from xoto3.backoff import backoff


def make_named_ce(Code: str):
return ClientError({"Error": {"Code": Code}}, "test operation")


def test_backoff_some_client_errors():
count = 0

@backoff
def fails_twice():
nonlocal count
if count > 1:
return "done"
count += 1
raise make_named_ce("ThrottlingException")

assert "done" == fails_twice()
assert count == 2


def test_dont_backoff_others():
@backoff
def not_found():
raise make_named_ce("NotFound")

with pytest.raises(ClientError):
not_found()

@backoff
def whoops():
raise Exception("whoops")

with pytest.raises(Exception):
whoops()
53 changes: 53 additions & 0 deletions tests/xoto3/dynamodb/streams/records_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import pytest

from xoto3.dynamodb.streams.records import (
ItemCreated,
ItemDeleted,
ItemModified,
current_nonempty_value,
matches_key,
old_and_new_dict_tuples_from_stream,
)
from xoto3.dynamodb.utils.serde import serialize_item


def _serialize_record(record: dict):
return {k: serialize_item(image) for k, image in record.items()}


def _serialize_records(*records):
return [dict(dynamodb=_serialize_record(rec["dynamodb"])) for rec in records]


def _fake_stream_event():
return dict(
Records=_serialize_records(
dict(dynamodb=dict(NewImage=dict(id=1, val=2))),
dict(dynamodb=dict(NewImage=dict(id=1, val=3), OldImage=dict(id=1, val=2))),
dict(dynamodb=dict(NewImage=dict(id=2, bar=8), OldImage=dict(id=2, bar=-9))),
dict(dynamodb=dict(NewImage=dict(id=1, val=4), OldImage=dict(id=1, val=3))),
dict(dynamodb=dict(NewImage=dict(id=2, foo="steve"), OldImage=dict(id=2, bar=8))),
dict(dynamodb=dict(OldImage=dict(id=1, val=4))),
)
)


def test_current_nonempty_value():
list_of_images = old_and_new_dict_tuples_from_stream(_fake_stream_event())

assert [dict(id=1, val=2), dict(id=1, val=3), dict(id=1, val=4)] == list(
current_nonempty_value(dict(id=1))(list_of_images)
)


def test_matches_key_fails_with_empty_key():
with pytest.raises(ValueError):
matches_key(dict())


def test_matches_key_works_on_new_as_well_as_old():
assert not matches_key(dict(id=3))(ItemCreated(None, dict(id=4)))
assert not matches_key(dict(id=3))(ItemDeleted(dict(id=4), None))
assert not matches_key(dict(hash=1, range=3))(
ItemModified(dict(hash=1, range=4, foo=0), dict(hash=1, range=4, foo=1))
)
40 changes: 40 additions & 0 deletions tests/xoto3/paginate_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from xoto3.cloudwatch.logs.events import CLOUDWATCH_LOGS_FILTER_LOG_EVENTS
from xoto3.paginate import yield_pages_from_operation


class FakeApi:
def __init__(self, results):
self.calls = 0
self.results = results

def __call__(self, **kwargs):
self.calls += 1
return self.results[self.calls - 1]


def test_pagination_with_nextToken_and_limit():

fake_cw = FakeApi(
[
dict(nextToken="1", events=[1, 2, 3]),
dict(nextToken="2", events=[4, 5, 6]),
dict(nextToken="3", events=[7, 8, 9]),
]
)

nt = None

def le_cb(next_token):
nonlocal nt
nt = next_token

collected_events = list()
for page in yield_pages_from_operation(
*CLOUDWATCH_LOGS_FILTER_LOG_EVENTS, fake_cw, dict(limit=6), last_evaluated_callback=le_cb
):
for event in page["events"]:
collected_events.append(event)

assert collected_events == list(range(1, 7))

assert nt == "2"
20 changes: 20 additions & 0 deletions tests/xoto3/utils/cm_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from contextlib import contextmanager

from xoto3.utils.cm import xf_cm


@contextmanager
def yield_3():
print("generating a 3")
yield 3
print("cleaning that 3 right on up")


def test_transform_context_manager():
def add_one(x: int):
return x + 1

yield_4 = xf_cm(add_one)(yield_3())

with yield_4 as actually_four:
assert actually_four == 4
50 changes: 50 additions & 0 deletions tests/xoto3/utils/multicast_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import threading
import time
import typing as ty
from collections import defaultdict

from xoto3.utils.multicast import LazyMulticast


def test_lazy_multicast():
class Recvr(ty.NamedTuple):
nums: ty.List[int]

CONSUMER_COUNT = 10
NUM_NUMS = 30
sem = threading.Semaphore(0)

def start_numbers_stream(num_nums: int, recv):
def stream_numbers():
for i in range(CONSUMER_COUNT):
sem.acquire()
# wait for 10 consumers to start
for i in range(num_nums):
recv(i)

t = threading.Thread(target=stream_numbers, daemon=True)
t.start()
return t.join

mc = LazyMulticast(start_numbers_stream) # type: ignore

consumer_results = defaultdict(list)

def consume_numbers():
sem.release()
thread_id = threading.get_ident()
with mc(NUM_NUMS) as nums_stream:
for i, num in enumerate(nums_stream):
consumer_results[thread_id].append(num)
if i == NUM_NUMS - 1:
break

for i in range(CONSUMER_COUNT):
threading.Thread(target=consume_numbers, daemon=True).start()

time.sleep(1)

assert len(consumer_results) == CONSUMER_COUNT

for results in consumer_results.values():
assert list(range(NUM_NUMS)) == results
Loading

0 comments on commit b7b755f

Please sign in to comment.