Skip to content

Commit

Permalink
fix: Set explicit names for each spawned thread (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 authored Oct 1, 2024
1 parent cde6cfb commit 9c7777c
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 28 deletions.
8 changes: 4 additions & 4 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __update_availability(self, available: bool):
return

log.warn("Detected persistent store unavailability; updates will be cached until it recovers")
task = RepeatingTask(0.5, 0, self.__check_availability)
task = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability)

self.__lock.lock()
self.__poller = task
Expand Down Expand Up @@ -172,6 +172,7 @@ class LDClient:
Client instances are thread-safe.
"""

def __init__(self, config: Config, start_wait: float=5):
"""Constructs a new LDClient instance.
Expand Down Expand Up @@ -248,7 +249,7 @@ def _set_event_processor(self, config):
if not config.event_processor_class:
diagnostic_id = create_diagnostic_id(config)
diagnostic_accumulator = None if config.diagnostic_opt_out else _DiagnosticAccumulator(diagnostic_id)
self._event_processor = DefaultEventProcessor(config, diagnostic_accumulator = diagnostic_accumulator)
self._event_processor = DefaultEventProcessor(config, diagnostic_accumulator=diagnostic_accumulator)
return diagnostic_accumulator
self._event_processor = config.event_processor_class(config)
return None
Expand Down Expand Up @@ -340,7 +341,7 @@ def track(self, event_name: str, context: Context, data: Optional[Any]=None,
log.warning("Invalid context for track (%s)" % context.error)
else:
self._send_event(self._event_factory_default.new_custom_event(event_name,
context, data, metric_value))
context, data, metric_value))

def identify(self, context: Context):
"""Reports details about an evaluation context.
Expand Down Expand Up @@ -711,5 +712,4 @@ def flag_tracker(self) -> FlagTracker:
return self.__flag_tracker



__all__ = ['LDClient', 'Config']
18 changes: 11 additions & 7 deletions ldclient/impl/big_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
class BigSegmentStoreStatusProviderImpl(BigSegmentStoreStatusProvider):
"""
Default implementation of the BigSegmentStoreStatusProvider interface.
The real implementation of getting the status is in BigSegmentStoreManager - we pass in a lambda that
allows us to get the current status from that class. So this class provides a facade for that, and
also adds the listener mechanism.
"""

def __init__(self, status_getter: Callable[[], BigSegmentStoreStatus]):
self.__status_getter = status_getter
self.__status_listeners = Listeners()
self.__last_status = None # type: Optional[BigSegmentStoreStatus]

@property
def status(self) -> BigSegmentStoreStatus:
return self.__status_getter()
Expand All @@ -43,15 +44,17 @@ def _update_status(self, new_status: BigSegmentStoreStatus):
self.__last_status = new_status
self.__status_listeners.notify(new_status)


class BigSegmentStoreManager:
# use EMPTY_MEMBERSHIP as a singleton whenever a membership query returns None; it's safe to reuse it
# because we will never modify the membership properties after they're queried
EMPTY_MEMBERSHIP = {} # type: dict

"""
Internal component that decorates the Big Segment store with caching behavior, and also polls the
store to track its status.
"""

def __init__(self, config: BigSegmentsConfig):
self.__store = config.store

Expand All @@ -61,8 +64,8 @@ def __init__(self, config: BigSegmentsConfig):
self.__poll_task = None # type: Optional[RepeatingTask]

if self.__store:
self.__cache = ExpiringDict(max_len = config.context_cache_size, max_age_seconds=config.context_cache_time)
self.__poll_task = RepeatingTask(config.status_poll_interval, 0, self.poll_store_and_update_status)
self.__cache = ExpiringDict(max_len=config.context_cache_size, max_age_seconds=config.context_cache_time)
self.__poll_task = RepeatingTask("ldclient.bigsegment.status-poll", config.status_poll_interval, 0, self.poll_store_and_update_status)
self.__poll_task.start()

def stop(self):
Expand All @@ -74,7 +77,7 @@ def stop(self):
@property
def status_provider(self) -> BigSegmentStoreStatusProvider:
return self.__status_provider

def get_user_membership(self, user_key: str) -> Tuple[Optional[dict], str]:
if not self.__store:
return (None, BigSegmentsStatus.NOT_CONFIGURED)
Expand All @@ -101,7 +104,7 @@ def get_status(self) -> BigSegmentStoreStatus:
return status if status else self.poll_store_and_update_status()

def poll_store_and_update_status(self) -> BigSegmentStoreStatus:
new_status = BigSegmentStoreStatus(False, False) # default to "unavailable" if we don't get a new status below
new_status = BigSegmentStoreStatus(False, False) # default to "unavailable" if we don't get a new status below
if self.__store:
try:
metadata = self.__store.get_metadata()
Expand All @@ -115,5 +118,6 @@ def poll_store_and_update_status(self) -> BigSegmentStoreStatus:
def is_stale(self, timestamp) -> bool:
return (timestamp is None) or ((int(time.time() * 1000) - timestamp) >= self.__stale_after_millis)


def _hash_for_user_key(user_key: str) -> str:
return base64.b64encode(sha256(user_key.encode('utf-8')).digest()).decode('utf-8')
2 changes: 1 addition & 1 deletion ldclient/impl/datasource/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, config: Config, requester: FeatureRequester, store: FeatureSt
self._requester = requester
self._store = store
self._ready = ready
self._task = RepeatingTask(config.poll_interval, 0, self._poll)
self._task = RepeatingTask("ldclient.datasource.polling", config.poll_interval, 0, self._poll)

def start(self):
log.info("Starting PollingUpdateProcessor with request interval: " + str(self._config.poll_interval))
Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/datasource/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class StreamingUpdateProcessor(Thread, UpdateProcessor):
def __init__(self, config, store, ready, diagnostic_accumulator):
Thread.__init__(self)
Thread.__init__(self, name="ldclient.datasource.streaming")
self.daemon = True
self._uri = config.stream_base_uri + STREAM_ALL_PATH
self._config = config
Expand Down
12 changes: 6 additions & 6 deletions ldclient/impl/events/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,15 @@ def __init__(self, inbox, config, http_client, diagnostic_accumulator=None):
self._omit_anonymous_contexts = config.omit_anonymous_contexts

self._flush_workers = FixedThreadPool(__MAX_FLUSH_THREADS__, "ldclient.flush")
self._diagnostic_flush_workers = None if self._diagnostic_accumulator is None else FixedThreadPool(1, "ldclient.diag_flush")
self._diagnostic_flush_workers = None if self._diagnostic_accumulator is None else FixedThreadPool(1, "ldclient.events.diag_flush")
if self._diagnostic_accumulator is not None:
init_event = create_diagnostic_init(self._diagnostic_accumulator.data_since_date,
self._diagnostic_accumulator.diagnostic_id,
config)
task = DiagnosticEventSendTask(self._http, self._config, init_event)
self._diagnostic_flush_workers.execute(task.run)

self._main_thread = Thread(target=self._run_main_loop)
self._main_thread = Thread(target=self._run_main_loop, name="ldclient.events.processor")
self._main_thread.daemon = True
self._main_thread.start()

Expand Down Expand Up @@ -504,13 +504,13 @@ class DefaultEventProcessor(EventProcessor):
def __init__(self, config, http=None, dispatcher_class=None, diagnostic_accumulator=None):
self._inbox = queue.Queue(config.events_max_pending)
self._inbox_full = False
self._flush_timer = RepeatingTask(config.flush_interval, config.flush_interval, self.flush)
self._contexts_flush_timer = RepeatingTask(config.context_keys_flush_interval, config.context_keys_flush_interval, self._flush_contexts)
self._flush_timer = RepeatingTask("ldclient.events.flush", config.flush_interval, config.flush_interval, self.flush)
self._contexts_flush_timer = RepeatingTask("ldclient.events.context-flush", config.context_keys_flush_interval, config.context_keys_flush_interval, self._flush_contexts)
self._flush_timer.start()
self._contexts_flush_timer.start()
if diagnostic_accumulator is not None:
self._diagnostic_event_timer = RepeatingTask(config.diagnostic_recording_interval,
config.diagnostic_recording_interval, self._send_diagnostic)
self._diagnostic_event_timer = RepeatingTask("ldclient.events.send-diagnostic", config.diagnostic_recording_interval,
config.diagnostic_recording_interval, self._send_diagnostic)
self._diagnostic_event_timer.start()
else:
self._diagnostic_event_timer = None
Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/integrations/files/file_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def __init__(self, resolved_paths, reloader, interval):
self._paths = resolved_paths
self._reloader = reloader
self._file_times = self._check_file_times()
self._timer = RepeatingTask(interval, interval, self._poll)
self._timer = RepeatingTask("ldclient.datasource.file.poll", interval, interval, self._poll)
self._timer.start()

def stop(self):
Expand Down
8 changes: 5 additions & 3 deletions ldclient/impl/repeating_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import time
from typing import Callable


class RepeatingTask:
"""
A generic mechanism for calling a callback repeatedly at fixed intervals on a worker thread.
"""
def __init__(self, interval: float, initial_delay: float, callable: Callable):

def __init__(self, label, interval: float, initial_delay: float, callable: Callable):
"""
Creates the task, but does not start the worker thread yet.
:param interval: maximum time in seconds between invocations of the callback
:param initial_delay: time in seconds to wait before the first invocation
:param callable: the function to execute repeatedly
Expand All @@ -20,7 +22,7 @@ def __init__(self, interval: float, initial_delay: float, callable: Callable):
self.__initial_delay = initial_delay
self.__action = callable
self.__stop = Event()
self.__thread = Thread(target=self._run)
self.__thread = Thread(target=self._run, name=f"{label}.repeating")
self.__thread.daemon = True

def start(self):
Expand Down
2 changes: 1 addition & 1 deletion ldclient/testing/http_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def start_secure_server():

class MockServerWrapper(Thread):
def __init__(self, port, secure):
Thread.__init__(self)
Thread.__init__(self, name="ldclient.testing.mock-server-wrapper")
self.port = port
self.uri = '%s://localhost:%d' % ('https' if secure else 'http', port)
self.server = HTTPServer(('localhost', port), MockServerRequestHandler)
Expand Down
3 changes: 2 additions & 1 deletion ldclient/testing/impl/events/test_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,9 @@ def event_consumer():
if message.type == 'stop':
message.param.set()
return

def start_consuming_events():
Thread(target=event_consumer).start()
Thread(target=event_consumer, name="ldclient.testing.events.consumer").start()

with DefaultEventProcessor(config, mock_http, dispatcher_factory) as ep:
ep_inbox = ep_inbox_holder[0]
Expand Down
6 changes: 3 additions & 3 deletions ldclient/testing/impl/test_repeating_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

def test_task_does_not_start_when_created():
signal = Event()
task = RepeatingTask(0.01, 0, lambda: signal.set())
task = RepeatingTask("ldclient.testing.set-signal", 0.01, 0, lambda: signal.set())
try:
signal_was_set = signal.wait(0.1)
assert signal_was_set == False
Expand All @@ -16,7 +16,7 @@ def test_task_does_not_start_when_created():

def test_task_executes_until_stopped():
queue = Queue()
task = RepeatingTask(0.1, 0, lambda: queue.put(time.time()))
task = RepeatingTask("ldclient.testing.enqueue-time", 0.1, 0, lambda: queue.put(time.time()))
try:
last = None
task.start()
Expand Down Expand Up @@ -47,7 +47,7 @@ def do_task():
if counter >= 2:
task.stop()
stopped.set()
task = RepeatingTask(0.01, 0, do_task)
task = RepeatingTask("ldclient.testing.task-runner", 0.01, 0, do_task)
try:
task.start()
assert stopped.wait(0.1) == True
Expand Down

0 comments on commit 9c7777c

Please sign in to comment.