diff --git a/ldclient/client.py b/ldclient/client.py index 5e84e10a..1925a626 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -108,7 +108,7 @@ def __update_availability(self, available: bool): return log.warn("Detected persistent store unavailability; updates will be cached until it recovers") - task = RepeatingTask(0.5, 0, self.__check_availability) + task = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability) self.__lock.lock() self.__poller = task @@ -172,6 +172,7 @@ class LDClient: Client instances are thread-safe. """ + def __init__(self, config: Config, start_wait: float=5): """Constructs a new LDClient instance. @@ -248,7 +249,7 @@ def _set_event_processor(self, config): if not config.event_processor_class: diagnostic_id = create_diagnostic_id(config) diagnostic_accumulator = None if config.diagnostic_opt_out else _DiagnosticAccumulator(diagnostic_id) - self._event_processor = DefaultEventProcessor(config, diagnostic_accumulator = diagnostic_accumulator) + self._event_processor = DefaultEventProcessor(config, diagnostic_accumulator=diagnostic_accumulator) return diagnostic_accumulator self._event_processor = config.event_processor_class(config) return None @@ -340,7 +341,7 @@ def track(self, event_name: str, context: Context, data: Optional[Any]=None, log.warning("Invalid context for track (%s)" % context.error) else: self._send_event(self._event_factory_default.new_custom_event(event_name, - context, data, metric_value)) + context, data, metric_value)) def identify(self, context: Context): """Reports details about an evaluation context. @@ -711,5 +712,4 @@ def flag_tracker(self) -> FlagTracker: return self.__flag_tracker - __all__ = ['LDClient', 'Config'] diff --git a/ldclient/impl/big_segments.py b/ldclient/impl/big_segments.py index b4f4ce1e..6ef32540 100644 --- a/ldclient/impl/big_segments.py +++ b/ldclient/impl/big_segments.py @@ -15,16 +15,17 @@ class BigSegmentStoreStatusProviderImpl(BigSegmentStoreStatusProvider): """ Default implementation of the BigSegmentStoreStatusProvider interface. - + The real implementation of getting the status is in BigSegmentStoreManager - we pass in a lambda that allows us to get the current status from that class. So this class provides a facade for that, and also adds the listener mechanism. """ + def __init__(self, status_getter: Callable[[], BigSegmentStoreStatus]): self.__status_getter = status_getter self.__status_listeners = Listeners() self.__last_status = None # type: Optional[BigSegmentStoreStatus] - + @property def status(self) -> BigSegmentStoreStatus: return self.__status_getter() @@ -43,15 +44,17 @@ def _update_status(self, new_status: BigSegmentStoreStatus): self.__last_status = new_status self.__status_listeners.notify(new_status) + class BigSegmentStoreManager: # use EMPTY_MEMBERSHIP as a singleton whenever a membership query returns None; it's safe to reuse it # because we will never modify the membership properties after they're queried EMPTY_MEMBERSHIP = {} # type: dict - + """ Internal component that decorates the Big Segment store with caching behavior, and also polls the store to track its status. """ + def __init__(self, config: BigSegmentsConfig): self.__store = config.store @@ -61,8 +64,8 @@ def __init__(self, config: BigSegmentsConfig): self.__poll_task = None # type: Optional[RepeatingTask] if self.__store: - self.__cache = ExpiringDict(max_len = config.context_cache_size, max_age_seconds=config.context_cache_time) - self.__poll_task = RepeatingTask(config.status_poll_interval, 0, self.poll_store_and_update_status) + self.__cache = ExpiringDict(max_len=config.context_cache_size, max_age_seconds=config.context_cache_time) + self.__poll_task = RepeatingTask("ldclient.bigsegment.status-poll", config.status_poll_interval, 0, self.poll_store_and_update_status) self.__poll_task.start() def stop(self): @@ -74,7 +77,7 @@ def stop(self): @property def status_provider(self) -> BigSegmentStoreStatusProvider: return self.__status_provider - + def get_user_membership(self, user_key: str) -> Tuple[Optional[dict], str]: if not self.__store: return (None, BigSegmentsStatus.NOT_CONFIGURED) @@ -101,7 +104,7 @@ def get_status(self) -> BigSegmentStoreStatus: return status if status else self.poll_store_and_update_status() def poll_store_and_update_status(self) -> BigSegmentStoreStatus: - new_status = BigSegmentStoreStatus(False, False) # default to "unavailable" if we don't get a new status below + new_status = BigSegmentStoreStatus(False, False) # default to "unavailable" if we don't get a new status below if self.__store: try: metadata = self.__store.get_metadata() @@ -115,5 +118,6 @@ def poll_store_and_update_status(self) -> BigSegmentStoreStatus: def is_stale(self, timestamp) -> bool: return (timestamp is None) or ((int(time.time() * 1000) - timestamp) >= self.__stale_after_millis) + def _hash_for_user_key(user_key: str) -> str: return base64.b64encode(sha256(user_key.encode('utf-8')).digest()).decode('utf-8') diff --git a/ldclient/impl/datasource/polling.py b/ldclient/impl/datasource/polling.py index b53dcc2c..2f8414d1 100644 --- a/ldclient/impl/datasource/polling.py +++ b/ldclient/impl/datasource/polling.py @@ -21,7 +21,7 @@ def __init__(self, config: Config, requester: FeatureRequester, store: FeatureSt self._requester = requester self._store = store self._ready = ready - self._task = RepeatingTask(config.poll_interval, 0, self._poll) + self._task = RepeatingTask("ldclient.datasource.polling", config.poll_interval, 0, self._poll) def start(self): log.info("Starting PollingUpdateProcessor with request interval: " + str(self._config.poll_interval)) diff --git a/ldclient/impl/datasource/streaming.py b/ldclient/impl/datasource/streaming.py index ec8debe9..80bde5fe 100644 --- a/ldclient/impl/datasource/streaming.py +++ b/ldclient/impl/datasource/streaming.py @@ -31,7 +31,7 @@ class StreamingUpdateProcessor(Thread, UpdateProcessor): def __init__(self, config, store, ready, diagnostic_accumulator): - Thread.__init__(self) + Thread.__init__(self, name="ldclient.datasource.streaming") self.daemon = True self._uri = config.stream_base_uri + STREAM_ALL_PATH self._config = config diff --git a/ldclient/impl/events/event_processor.py b/ldclient/impl/events/event_processor.py index 488e7895..df79c6ba 100644 --- a/ldclient/impl/events/event_processor.py +++ b/ldclient/impl/events/event_processor.py @@ -344,7 +344,7 @@ def __init__(self, inbox, config, http_client, diagnostic_accumulator=None): self._omit_anonymous_contexts = config.omit_anonymous_contexts self._flush_workers = FixedThreadPool(__MAX_FLUSH_THREADS__, "ldclient.flush") - self._diagnostic_flush_workers = None if self._diagnostic_accumulator is None else FixedThreadPool(1, "ldclient.diag_flush") + self._diagnostic_flush_workers = None if self._diagnostic_accumulator is None else FixedThreadPool(1, "ldclient.events.diag_flush") if self._diagnostic_accumulator is not None: init_event = create_diagnostic_init(self._diagnostic_accumulator.data_since_date, self._diagnostic_accumulator.diagnostic_id, @@ -352,7 +352,7 @@ def __init__(self, inbox, config, http_client, diagnostic_accumulator=None): task = DiagnosticEventSendTask(self._http, self._config, init_event) self._diagnostic_flush_workers.execute(task.run) - self._main_thread = Thread(target=self._run_main_loop) + self._main_thread = Thread(target=self._run_main_loop, name="ldclient.events.processor") self._main_thread.daemon = True self._main_thread.start() @@ -504,13 +504,13 @@ class DefaultEventProcessor(EventProcessor): def __init__(self, config, http=None, dispatcher_class=None, diagnostic_accumulator=None): self._inbox = queue.Queue(config.events_max_pending) self._inbox_full = False - self._flush_timer = RepeatingTask(config.flush_interval, config.flush_interval, self.flush) - self._contexts_flush_timer = RepeatingTask(config.context_keys_flush_interval, config.context_keys_flush_interval, self._flush_contexts) + self._flush_timer = RepeatingTask("ldclient.events.flush", config.flush_interval, config.flush_interval, self.flush) + self._contexts_flush_timer = RepeatingTask("ldclient.events.context-flush", config.context_keys_flush_interval, config.context_keys_flush_interval, self._flush_contexts) self._flush_timer.start() self._contexts_flush_timer.start() if diagnostic_accumulator is not None: - self._diagnostic_event_timer = RepeatingTask(config.diagnostic_recording_interval, - config.diagnostic_recording_interval, self._send_diagnostic) + self._diagnostic_event_timer = RepeatingTask("ldclient.events.send-diagnostic", config.diagnostic_recording_interval, + config.diagnostic_recording_interval, self._send_diagnostic) self._diagnostic_event_timer.start() else: self._diagnostic_event_timer = None diff --git a/ldclient/impl/integrations/files/file_data_source.py b/ldclient/impl/integrations/files/file_data_source.py index d02d5b28..5ba67a1e 100644 --- a/ldclient/impl/integrations/files/file_data_source.py +++ b/ldclient/impl/integrations/files/file_data_source.py @@ -189,7 +189,7 @@ def __init__(self, resolved_paths, reloader, interval): self._paths = resolved_paths self._reloader = reloader self._file_times = self._check_file_times() - self._timer = RepeatingTask(interval, interval, self._poll) + self._timer = RepeatingTask("ldclient.datasource.file.poll", interval, interval, self._poll) self._timer.start() def stop(self): diff --git a/ldclient/impl/repeating_task.py b/ldclient/impl/repeating_task.py index 57d9a088..6c737ce6 100644 --- a/ldclient/impl/repeating_task.py +++ b/ldclient/impl/repeating_task.py @@ -4,14 +4,16 @@ import time from typing import Callable + class RepeatingTask: """ A generic mechanism for calling a callback repeatedly at fixed intervals on a worker thread. """ - def __init__(self, interval: float, initial_delay: float, callable: Callable): + + def __init__(self, label, interval: float, initial_delay: float, callable: Callable): """ Creates the task, but does not start the worker thread yet. - + :param interval: maximum time in seconds between invocations of the callback :param initial_delay: time in seconds to wait before the first invocation :param callable: the function to execute repeatedly @@ -20,7 +22,7 @@ def __init__(self, interval: float, initial_delay: float, callable: Callable): self.__initial_delay = initial_delay self.__action = callable self.__stop = Event() - self.__thread = Thread(target=self._run) + self.__thread = Thread(target=self._run, name=f"{label}.repeating") self.__thread.daemon = True def start(self): diff --git a/ldclient/testing/http_util.py b/ldclient/testing/http_util.py index dd3abdf3..ee82ec66 100644 --- a/ldclient/testing/http_util.py +++ b/ldclient/testing/http_util.py @@ -42,7 +42,7 @@ def start_secure_server(): class MockServerWrapper(Thread): def __init__(self, port, secure): - Thread.__init__(self) + Thread.__init__(self, name="ldclient.testing.mock-server-wrapper") self.port = port self.uri = '%s://localhost:%d' % ('https' if secure else 'http', port) self.server = HTTPServer(('localhost', port), MockServerRequestHandler) diff --git a/ldclient/testing/impl/events/test_event_processor.py b/ldclient/testing/impl/events/test_event_processor.py index abcc884d..b1719914 100644 --- a/ldclient/testing/impl/events/test_event_processor.py +++ b/ldclient/testing/impl/events/test_event_processor.py @@ -646,8 +646,9 @@ def event_consumer(): if message.type == 'stop': message.param.set() return + def start_consuming_events(): - Thread(target=event_consumer).start() + Thread(target=event_consumer, name="ldclient.testing.events.consumer").start() with DefaultEventProcessor(config, mock_http, dispatcher_factory) as ep: ep_inbox = ep_inbox_holder[0] diff --git a/ldclient/testing/impl/test_repeating_task.py b/ldclient/testing/impl/test_repeating_task.py index f39a3d59..e3f9b391 100644 --- a/ldclient/testing/impl/test_repeating_task.py +++ b/ldclient/testing/impl/test_repeating_task.py @@ -7,7 +7,7 @@ def test_task_does_not_start_when_created(): signal = Event() - task = RepeatingTask(0.01, 0, lambda: signal.set()) + task = RepeatingTask("ldclient.testing.set-signal", 0.01, 0, lambda: signal.set()) try: signal_was_set = signal.wait(0.1) assert signal_was_set == False @@ -16,7 +16,7 @@ def test_task_does_not_start_when_created(): def test_task_executes_until_stopped(): queue = Queue() - task = RepeatingTask(0.1, 0, lambda: queue.put(time.time())) + task = RepeatingTask("ldclient.testing.enqueue-time", 0.1, 0, lambda: queue.put(time.time())) try: last = None task.start() @@ -47,7 +47,7 @@ def do_task(): if counter >= 2: task.stop() stopped.set() - task = RepeatingTask(0.01, 0, do_task) + task = RepeatingTask("ldclient.testing.task-runner", 0.01, 0, do_task) try: task.start() assert stopped.wait(0.1) == True