Skip to content

Commit

Permalink
Merge pull request #111 from launchdarkly/eb/ch42975/inbox-full
Browse files Browse the repository at this point in the history
drop events when inbox is full
  • Loading branch information
eli-darkly authored Aug 20, 2019
2 parents e5dd5ba + e436f77 commit be6ee0d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 22 deletions.
55 changes: 37 additions & 18 deletions ldclient/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def __init__(self, capacity):
def add_event(self, event):
if len(self._events) >= self._capacity:
if not self._exceeded_capacity:
log.warning("Event queue is full-- dropped an event")
log.warning("Exceeded event queue capacity. Increase capacity to avoid dropping events.")
self._exceeded_capacity = True
else:
self._events.append(event)
Expand All @@ -205,13 +205,13 @@ def clear(self):


class EventDispatcher(object):
def __init__(self, queue, config, http_client):
self._queue = queue
def __init__(self, inbox, config, http_client):
self._inbox = inbox
self._config = config
self._http = create_http_pool_manager(num_pools=1, verify_ssl=config.verify_ssl) if http_client is None else http_client
self._close_http = (http_client is None) # so we know whether to close it later
self._disabled = False
self._buffer = EventBuffer(config.events_max_pending)
self._outbox = EventBuffer(config.events_max_pending)
self._user_keys = SimpleLRUCache(config.user_keys_capacity)
self._formatter = EventOutputFormatter(config)
self._last_known_past_time = 0
Expand All @@ -226,7 +226,7 @@ def _run_main_loop(self):
log.info("Starting event processor")
while True:
try:
message = self._queue.get(block=True)
message = self._inbox.get(block=True)
if message.type == 'event':
self._process_event(message.param)
elif message.type == 'flush':
Expand All @@ -248,7 +248,7 @@ def _process_event(self, event):
return

# Always record the event in the summarizer.
self._buffer.add_to_summary(event)
self._outbox.add_to_summary(event)

# Decide whether to add the event to the payload. Feature events may be added twice, once for
# the event (if tracked) and once for debugging.
Expand All @@ -271,13 +271,13 @@ def _process_event(self, event):

if add_index_event:
ie = { 'kind': 'index', 'creationDate': event['creationDate'], 'user': user }
self._buffer.add_event(ie)
self._outbox.add_event(ie)
if add_full_event:
self._buffer.add_event(event)
self._outbox.add_event(event)
if add_debug_event:
debug_event = event.copy()
debug_event['debug'] = True
self._buffer.add_event(debug_event)
self._outbox.add_event(debug_event)

# Add to the set of users we've noticed, and return true if the user was already known to us.
def notice_user(self, user):
Expand All @@ -298,13 +298,13 @@ def _should_debug_event(self, event):
def _trigger_flush(self):
if self._disabled:
return
payload = self._buffer.get_payload()
payload = self._outbox.get_payload()
if len(payload.events) > 0 or len(payload.summary.counters) > 0:
task = EventPayloadSendTask(self._http, self._config, self._formatter, payload,
self._handle_response)
if self._flush_workers.execute(task.run):
# The events have been handed off to a flush worker; clear them from our buffer.
self._buffer.clear()
self._outbox.clear()
else:
# We're already at our limit of concurrent flushes; leave the events in the buffer.
pass
Expand All @@ -330,22 +330,23 @@ def _do_shutdown(self):


class DefaultEventProcessor(EventProcessor):
def __init__(self, config, http=None):
self._queue = queue.Queue(config.events_max_pending)
def __init__(self, config, http=None, dispatcher_class=None):
self._inbox = queue.Queue(config.events_max_pending)
self._inbox_full = False
self._flush_timer = RepeatingTimer(config.flush_interval, self.flush)
self._users_flush_timer = RepeatingTimer(config.user_keys_flush_interval, self._flush_users)
self._flush_timer.start()
self._users_flush_timer.start()
self._close_lock = Lock()
self._closed = False
EventDispatcher(self._queue, config, http)
(dispatcher_class or EventDispatcher)(self._inbox, config, http)

def send_event(self, event):
event['creationDate'] = int(time.time() * 1000)
self._queue.put(EventProcessorMessage('event', event))
self._post_to_inbox(EventProcessorMessage('event', event))

def flush(self):
self._queue.put(EventProcessorMessage('flush', None))
self._post_to_inbox(EventProcessorMessage('flush', None))

def stop(self):
with self._close_lock:
Expand All @@ -355,16 +356,34 @@ def stop(self):
self._flush_timer.stop()
self._users_flush_timer.stop()
self.flush()
# Note that here we are not calling _post_to_inbox, because we *do* want to wait if the inbox
# is full; an orderly shutdown can't happen unless these messages are received.
self._post_message_and_wait('stop')

def _post_to_inbox(self, message):
try:
self._inbox.put(message, block=False)
except queue.Full:
if not self._inbox_full:
# possible race condition here, but it's of no real consequence - we'd just get an extra log line
self._inbox_full = True
log.warning("Events are being produced faster than they can be processed; some events will be dropped")

def _flush_users(self):
self._queue.put(EventProcessorMessage('flush_users', None))
self._inbox.put(EventProcessorMessage('flush_users', None))

# Used only in tests
def _wait_until_inactive(self):
self._post_message_and_wait('test_sync')

def _post_message_and_wait(self, type):
reply = Event()
self._queue.put(EventProcessorMessage(type, reply))
self._inbox.put(EventProcessorMessage(type, reply))
reply.wait()

# These magic methods allow use of the "with" block in tests
def __enter__(self):
return self

def __exit__(self, tyep, value, traceback):
self.stop()
30 changes: 30 additions & 0 deletions testing/test_event_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import pytest
from threading import Thread
import time

from ldclient.config import Config
Expand Down Expand Up @@ -460,6 +461,35 @@ def test_will_still_send_after_429_error():
def test_will_still_send_after_500_error():
verify_recoverable_http_error(500)

def test_does_not_block_on_full_inbox():
config = Config(events_max_pending=1) # this sets the size of both the inbox and the outbox to 1
ep_inbox_holder = [ None ]
ep_inbox = None

def dispatcher_factory(inbox, config, http):
ep_inbox_holder[0] = inbox # it's an array because otherwise it's hard for a closure to modify a variable
return None # the dispatcher object itself doesn't matter, we only manipulate the inbox
def event_consumer():
while True:
message = ep_inbox.get(block=True)
if message.type == 'stop':
message.param.set()
return
def start_consuming_events():
Thread(target=event_consumer).start()

with DefaultEventProcessor(config, mock_http, dispatcher_factory) as ep:
ep_inbox = ep_inbox_holder[0]
event1 = { 'kind': 'custom', 'key': 'event1', 'user': user }
event2 = { 'kind': 'custom', 'key': 'event2', 'user': user }
ep.send_event(event1)
ep.send_event(event2) # this event should be dropped - inbox is full
message1 = ep_inbox.get(block=False)
had_no_more = ep_inbox.empty()
start_consuming_events()
assert message1.param == event1
assert had_no_more

def verify_unrecoverable_http_error(status):
setup_processor(Config(sdk_key = 'SDK_KEY'))

Expand Down
8 changes: 4 additions & 4 deletions testing/test_feature_store_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def test_get_can_throw_exception(self, cached):
core = MockCore()
wrapper = make_wrapper(core, cached)
core.error = CustomError()
with pytest.raises(CustomError, message="expected exception"):
with pytest.raises(CustomError):
wrapper.get(THINGS, "key", lambda x: x)

@pytest.mark.parametrize("cached", [False, True])
Expand Down Expand Up @@ -204,7 +204,7 @@ def test_get_all_can_throw_exception(self, cached):
core = MockCore()
wrapper = make_wrapper(core, cached)
core.error = CustomError()
with pytest.raises(CustomError, message="expected exception"):
with pytest.raises(CustomError):
wrapper.all(THINGS)

@pytest.mark.parametrize("cached", [False, True])
Expand Down Expand Up @@ -255,7 +255,7 @@ def test_upsert_can_throw_exception(self, cached):
core = MockCore()
wrapper = make_wrapper(core, cached)
core.error = CustomError()
with pytest.raises(CustomError, message="expected exception"):
with pytest.raises(CustomError):
wrapper.upsert(THINGS, { "key": "x", "version": 1 })

@pytest.mark.parametrize("cached", [False, True])
Expand All @@ -281,7 +281,7 @@ def test_delete_can_throw_exception(self, cached):
core = MockCore()
wrapper = make_wrapper(core, cached)
core.error = CustomError()
with pytest.raises(CustomError, message="expected exception"):
with pytest.raises(CustomError):
wrapper.delete(THINGS, "x", 1)

def test_uncached_initialized_queries_state_only_until_inited(self):
Expand Down

0 comments on commit be6ee0d

Please sign in to comment.