Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Set explicit names for each spawned thread #311

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __update_availability(self, available: bool):
return

log.warn("Detected persistent store unavailability; updates will be cached until it recovers")
task = RepeatingTask(0.5, 0, self.__check_availability)
task = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability)

self.__lock.lock()
self.__poller = task
Expand Down Expand Up @@ -172,6 +172,7 @@ class LDClient:

Client instances are thread-safe.
"""

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did add some small whitespace fixes in this PR in addition to the thread renames. I figured it was small enough it wouldn't be a big deal.

def __init__(self, config: Config, start_wait: float=5):
"""Constructs a new LDClient instance.

Expand Down Expand Up @@ -248,7 +249,7 @@ def _set_event_processor(self, config):
if not config.event_processor_class:
diagnostic_id = create_diagnostic_id(config)
diagnostic_accumulator = None if config.diagnostic_opt_out else _DiagnosticAccumulator(diagnostic_id)
self._event_processor = DefaultEventProcessor(config, diagnostic_accumulator = diagnostic_accumulator)
self._event_processor = DefaultEventProcessor(config, diagnostic_accumulator=diagnostic_accumulator)
return diagnostic_accumulator
self._event_processor = config.event_processor_class(config)
return None
Expand Down Expand Up @@ -340,7 +341,7 @@ def track(self, event_name: str, context: Context, data: Optional[Any]=None,
log.warning("Invalid context for track (%s)" % context.error)
else:
self._send_event(self._event_factory_default.new_custom_event(event_name,
context, data, metric_value))
context, data, metric_value))

def identify(self, context: Context):
"""Reports details about an evaluation context.
Expand Down Expand Up @@ -711,5 +712,4 @@ def flag_tracker(self) -> FlagTracker:
return self.__flag_tracker



__all__ = ['LDClient', 'Config']
18 changes: 11 additions & 7 deletions ldclient/impl/big_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
class BigSegmentStoreStatusProviderImpl(BigSegmentStoreStatusProvider):
"""
Default implementation of the BigSegmentStoreStatusProvider interface.

The real implementation of getting the status is in BigSegmentStoreManager - we pass in a lambda that
allows us to get the current status from that class. So this class provides a facade for that, and
also adds the listener mechanism.
"""

def __init__(self, status_getter: Callable[[], BigSegmentStoreStatus]):
self.__status_getter = status_getter
self.__status_listeners = Listeners()
self.__last_status = None # type: Optional[BigSegmentStoreStatus]

@property
def status(self) -> BigSegmentStoreStatus:
return self.__status_getter()
Expand All @@ -43,15 +44,17 @@ def _update_status(self, new_status: BigSegmentStoreStatus):
self.__last_status = new_status
self.__status_listeners.notify(new_status)


class BigSegmentStoreManager:
# use EMPTY_MEMBERSHIP as a singleton whenever a membership query returns None; it's safe to reuse it
# because we will never modify the membership properties after they're queried
EMPTY_MEMBERSHIP = {} # type: dict

"""
Internal component that decorates the Big Segment store with caching behavior, and also polls the
store to track its status.
"""

def __init__(self, config: BigSegmentsConfig):
self.__store = config.store

Expand All @@ -61,8 +64,8 @@ def __init__(self, config: BigSegmentsConfig):
self.__poll_task = None # type: Optional[RepeatingTask]

if self.__store:
self.__cache = ExpiringDict(max_len = config.context_cache_size, max_age_seconds=config.context_cache_time)
self.__poll_task = RepeatingTask(config.status_poll_interval, 0, self.poll_store_and_update_status)
self.__cache = ExpiringDict(max_len=config.context_cache_size, max_age_seconds=config.context_cache_time)
self.__poll_task = RepeatingTask("ldclient.bigsegment.status-poll", config.status_poll_interval, 0, self.poll_store_and_update_status)
self.__poll_task.start()

def stop(self):
Expand All @@ -74,7 +77,7 @@ def stop(self):
@property
def status_provider(self) -> BigSegmentStoreStatusProvider:
return self.__status_provider

def get_user_membership(self, user_key: str) -> Tuple[Optional[dict], str]:
if not self.__store:
return (None, BigSegmentsStatus.NOT_CONFIGURED)
Expand All @@ -101,7 +104,7 @@ def get_status(self) -> BigSegmentStoreStatus:
return status if status else self.poll_store_and_update_status()

def poll_store_and_update_status(self) -> BigSegmentStoreStatus:
new_status = BigSegmentStoreStatus(False, False) # default to "unavailable" if we don't get a new status below
new_status = BigSegmentStoreStatus(False, False) # default to "unavailable" if we don't get a new status below
if self.__store:
try:
metadata = self.__store.get_metadata()
Expand All @@ -115,5 +118,6 @@ def poll_store_and_update_status(self) -> BigSegmentStoreStatus:
def is_stale(self, timestamp) -> bool:
return (timestamp is None) or ((int(time.time() * 1000) - timestamp) >= self.__stale_after_millis)


def _hash_for_user_key(user_key: str) -> str:
return base64.b64encode(sha256(user_key.encode('utf-8')).digest()).decode('utf-8')
2 changes: 1 addition & 1 deletion ldclient/impl/datasource/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, config: Config, requester: FeatureRequester, store: FeatureSt
self._requester = requester
self._store = store
self._ready = ready
self._task = RepeatingTask(config.poll_interval, 0, self._poll)
self._task = RepeatingTask("ldclient.datasource.polling", config.poll_interval, 0, self._poll)

def start(self):
log.info("Starting PollingUpdateProcessor with request interval: " + str(self._config.poll_interval))
Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/datasource/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class StreamingUpdateProcessor(Thread, UpdateProcessor):
def __init__(self, config, store, ready, diagnostic_accumulator):
Thread.__init__(self)
Thread.__init__(self, name="ldclient.datasource.streaming")
self.daemon = True
self._uri = config.stream_base_uri + STREAM_ALL_PATH
self._config = config
Expand Down
12 changes: 6 additions & 6 deletions ldclient/impl/events/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,15 @@ def __init__(self, inbox, config, http_client, diagnostic_accumulator=None):
self._omit_anonymous_contexts = config.omit_anonymous_contexts

self._flush_workers = FixedThreadPool(__MAX_FLUSH_THREADS__, "ldclient.flush")
self._diagnostic_flush_workers = None if self._diagnostic_accumulator is None else FixedThreadPool(1, "ldclient.diag_flush")
self._diagnostic_flush_workers = None if self._diagnostic_accumulator is None else FixedThreadPool(1, "ldclient.events.diag_flush")
if self._diagnostic_accumulator is not None:
init_event = create_diagnostic_init(self._diagnostic_accumulator.data_since_date,
self._diagnostic_accumulator.diagnostic_id,
config)
task = DiagnosticEventSendTask(self._http, self._config, init_event)
self._diagnostic_flush_workers.execute(task.run)

self._main_thread = Thread(target=self._run_main_loop)
self._main_thread = Thread(target=self._run_main_loop, name="ldclient.events.processor")
self._main_thread.daemon = True
self._main_thread.start()

Expand Down Expand Up @@ -504,13 +504,13 @@ class DefaultEventProcessor(EventProcessor):
def __init__(self, config, http=None, dispatcher_class=None, diagnostic_accumulator=None):
self._inbox = queue.Queue(config.events_max_pending)
self._inbox_full = False
self._flush_timer = RepeatingTask(config.flush_interval, config.flush_interval, self.flush)
self._contexts_flush_timer = RepeatingTask(config.context_keys_flush_interval, config.context_keys_flush_interval, self._flush_contexts)
self._flush_timer = RepeatingTask("ldclient.events.flush", config.flush_interval, config.flush_interval, self.flush)
self._contexts_flush_timer = RepeatingTask("ldclient.events.context-flush", config.context_keys_flush_interval, config.context_keys_flush_interval, self._flush_contexts)
self._flush_timer.start()
self._contexts_flush_timer.start()
if diagnostic_accumulator is not None:
self._diagnostic_event_timer = RepeatingTask(config.diagnostic_recording_interval,
config.diagnostic_recording_interval, self._send_diagnostic)
self._diagnostic_event_timer = RepeatingTask("ldclient.events.send-diagnostic", config.diagnostic_recording_interval,
config.diagnostic_recording_interval, self._send_diagnostic)
self._diagnostic_event_timer.start()
else:
self._diagnostic_event_timer = None
Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/integrations/files/file_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def __init__(self, resolved_paths, reloader, interval):
self._paths = resolved_paths
self._reloader = reloader
self._file_times = self._check_file_times()
self._timer = RepeatingTask(interval, interval, self._poll)
self._timer = RepeatingTask("ldclient.datasource.file.poll", interval, interval, self._poll)
self._timer.start()

def stop(self):
Expand Down
8 changes: 5 additions & 3 deletions ldclient/impl/repeating_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import time
from typing import Callable


class RepeatingTask:
"""
A generic mechanism for calling a callback repeatedly at fixed intervals on a worker thread.
"""
def __init__(self, interval: float, initial_delay: float, callable: Callable):

def __init__(self, label, interval: float, initial_delay: float, callable: Callable):
"""
Creates the task, but does not start the worker thread yet.

:param interval: maximum time in seconds between invocations of the callback
:param initial_delay: time in seconds to wait before the first invocation
:param callable: the function to execute repeatedly
Expand All @@ -20,7 +22,7 @@ def __init__(self, interval: float, initial_delay: float, callable: Callable):
self.__initial_delay = initial_delay
self.__action = callable
self.__stop = Event()
self.__thread = Thread(target=self._run)
self.__thread = Thread(target=self._run, name=f"{label}.repeating")
self.__thread.daemon = True

def start(self):
Expand Down
2 changes: 1 addition & 1 deletion ldclient/testing/http_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def start_secure_server():

class MockServerWrapper(Thread):
def __init__(self, port, secure):
Thread.__init__(self)
Thread.__init__(self, name="ldclient.testing.mock-server-wrapper")
self.port = port
self.uri = '%s://localhost:%d' % ('https' if secure else 'http', port)
self.server = HTTPServer(('localhost', port), MockServerRequestHandler)
Expand Down
3 changes: 2 additions & 1 deletion ldclient/testing/impl/events/test_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,9 @@ def event_consumer():
if message.type == 'stop':
message.param.set()
return

def start_consuming_events():
Thread(target=event_consumer).start()
Thread(target=event_consumer, name="ldclient.testing.events.consumer").start()

with DefaultEventProcessor(config, mock_http, dispatcher_factory) as ep:
ep_inbox = ep_inbox_holder[0]
Expand Down
6 changes: 3 additions & 3 deletions ldclient/testing/impl/test_repeating_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

def test_task_does_not_start_when_created():
signal = Event()
task = RepeatingTask(0.01, 0, lambda: signal.set())
task = RepeatingTask("ldclient.testing.set-signal", 0.01, 0, lambda: signal.set())
try:
signal_was_set = signal.wait(0.1)
assert signal_was_set == False
Expand All @@ -16,7 +16,7 @@ def test_task_does_not_start_when_created():

def test_task_executes_until_stopped():
queue = Queue()
task = RepeatingTask(0.1, 0, lambda: queue.put(time.time()))
task = RepeatingTask("ldclient.testing.enqueue-time", 0.1, 0, lambda: queue.put(time.time()))
try:
last = None
task.start()
Expand Down Expand Up @@ -47,7 +47,7 @@ def do_task():
if counter >= 2:
task.stop()
stopped.set()
task = RepeatingTask(0.01, 0, do_task)
task = RepeatingTask("ldclient.testing.task-runner", 0.01, 0, do_task)
try:
task.start()
assert stopped.wait(0.1) == True
Expand Down