Skip to content

Commit

Permalink
Use multiple generator threads when filtering to more than 100 log st…
Browse files Browse the repository at this point in the history
…reams; fixes jorgebastida#82
  • Loading branch information
mmaslowskicc committed Aug 1, 2016
1 parent 021691b commit 581ef5c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 22 deletions.
36 changes: 25 additions & 11 deletions awslogs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ def milis2iso(milis):
return (res + ".000")[:23] + 'Z'


try:
xrange
except NameError:
xrange = range # Python 3


class AWSLogs(object):

ACTIVE = 1
Expand Down Expand Up @@ -72,30 +78,32 @@ def list_logs(self):
streams = []
if self.log_stream_name != self.ALL_WILDCARD:
streams = list(self._get_streams_from_pattern(self.log_group_name, self.log_stream_name))
if len(streams) > self.FILTER_LOG_EVENTS_STREAMS_LIMIT:
raise exceptions.TooManyStreamsFilteredError(
self.log_stream_name,
len(streams),
self.FILTER_LOG_EVENTS_STREAMS_LIMIT
)
if len(streams) == 0:
raise exceptions.NoStreamsFilteredError(self.log_stream_name)

max_stream_length = max([len(s) for s in streams]) if streams else 10
group_length = len(self.log_group_name)
streams_per_generator = [
streams[i:i + self.FILTER_LOG_EVENTS_STREAMS_LIMIT]
for i in xrange(0, len(streams), self.FILTER_LOG_EVENTS_STREAMS_LIMIT)]

queue, exit = Queue(), Event()

# Note: filter_log_events paginator is broken
# ! Error during pagination: The same next token was received twice

def consumer():
generators_left = len(streams_per_generator)
while not exit.is_set():
event = queue.get()

if event is None:
exit.set()
break
# A generator has finished.
generators_left -= 1
if generators_left == 0:
exit.set()
break
continue

output = []
if self.output_group_enabled:
Expand Down Expand Up @@ -131,7 +139,7 @@ def consumer():
print(' '.join(output))
sys.stdout.flush()

def generator():
def generator(streams):
"""Push events into queue trying to deduplicate them using a lru queue.
AWS API stands for the interleaved parameter that:
interleaved (boolean) -- If provided, the API will make a best
Expand Down Expand Up @@ -178,8 +186,14 @@ def generator():
queue.put(None)
break

g = Thread(target=generator)
g.start()
if streams_per_generator:
for stream_batch in streams_per_generator:
# For each batch of up to 100 log streams, run a thread getting and enqueueing events.
g = Thread(target=generator, args=(stream_batch,))
g.start()
else:
g = Thread(target=generator, args=(streams,))
g.start()

c = Thread(target=consumer)
c.start()
Expand Down
11 changes: 0 additions & 11 deletions awslogs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@ def hint(self):
return "awslogs doesn't understand '{0}' as a date.".format(self.args[0])


class TooManyStreamsFilteredError(BaseAWSLogsException):

code = 6

def hint(self):
return ("The number of streams that match your pattern '{0}' is '{1}'. "
"AWS API limits the number of streams you can filter by to {2}."
"It might be helpful to you to not filter streams by any "
"pattern and filter the output of awslogs.").format(*self.args)


class NoStreamsFilteredError(BaseAWSLogsException):

code = 7
Expand Down
62 changes: 62 additions & 0 deletions tests/test_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,68 @@ def paginator(value):
colored("No streams match your pattern 'foo.*'.\n",
"red"))

@patch('boto3.client')
@patch('sys.stdout', new_callable=StringIO)
@patch('awslogs.core.AWSLogs.FILTER_LOG_EVENTS_STREAMS_LIMIT', 3)
def test_main_get_multiple_generators(self, mock_stdout, botoclient):
"""Test getting logs with more streams specified than supported in a single filter_log_events call."""
client = Mock()
botoclient.return_value = client

event_keys = ["eventId", "timestamp", "ingestionTime",
"message", "logStreamName"]
logs = [
{'events': mapkeys(event_keys,
[[1, 0, 0, 'Hello 1', 'S1'],
[2, 0, 0, 'Hello 2', 'S2'],
[3, 0, 0, 'Hello 3', 'S3']]),
'nextToken': 'token'},
{'events': mapkeys(event_keys,
[[4, 0, 0, 'Hello 4', 'S4'],
[5, 0, 0, 'Hello 5', 'S5'],
[6, 0, 0, 'Hello 6', 'S6']]),
'nextToken': 'token'},
{'events': []},
{'events': []}
]

groups = [
{'logGroups': [{'logGroupName': 'AAA'}]}
]

streams = [
{'logStreams': [
self._stream('S1'),
self._stream('S2'),
self._stream('S3'),
self._stream('S4'),
self._stream('S5'),
self._stream('S6'),
]}
]

def paginator(value):
mock = Mock()
mock.paginate.return_value = {
'describe_log_groups': groups,
'describe_log_streams': streams
}.get(value)
return mock

client.get_paginator.side_effect = paginator
client.filter_log_events.side_effect = logs
main("awslogs get AAA S --no-color".split())

self.assertEqual(
sorted(mock_stdout.getvalue().split('\n')),
("\nAAA S1 Hello 1\n"
"AAA S2 Hello 2\n"
"AAA S3 Hello 3\n"
"AAA S4 Hello 4\n"
"AAA S5 Hello 5\n"
"AAA S6 Hello 6").split('\n')
)

@patch('boto3.client')
@patch('sys.stdout', new_callable=StringIO)
def test_main_groups(self, mock_stdout, botoclient):
Expand Down

0 comments on commit 581ef5c

Please sign in to comment.