Skip to content

Commit

Permalink
Print Queue is full once per 5 seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Jun 20, 2023
1 parent fe5c07d commit 8b2e3e3
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore
self.queue_batch_size = queue_batch_size
self.queue_max_size = queue_max_size

self._last_warning_time = 0

# this queue is used to be able to process entities supplied from another thread.
self.queue: queue.Queue[TorrentTitle] = queue.Queue(maxsize=self.queue_max_size)

Expand Down Expand Up @@ -135,8 +137,8 @@ def query(_start, _end):
self.set_last_processed_torrent_id(end)

duration = time.time() - start_time
self.logger.info(
f'[Batch] Processed: {processed} titles. Added: {added} tags. Duration: {duration:.3f} seconds.')
message = f'[Batch] Processed: {processed} titles. Added: {added} tags. Duration: {duration:.3f} seconds.'
self.logger.info(message)

is_finished = end >= max_row_id
if is_finished:
Expand All @@ -161,8 +163,8 @@ async def process_queue(self) -> int:

if processed:
duration = time.time() - start_time
self.logger.info(
f'[Queue] Processed: {processed} titles. Added: {added} tags. Duration: {duration:.3f} seconds.')
message = f'[Queue] Processed: {processed} titles. Added: {added} tags. Duration: {duration:.3f} seconds.'
self.logger.info(message)
return processed

def put_entity_to_the_queue(self, infohash: Optional[bytes] = None, title: Optional[str] = None):
Expand All @@ -174,7 +176,11 @@ def put_entity_to_the_queue(self, infohash: Optional[bytes] = None, title: Optio
try:
self.queue.put_nowait(TorrentTitle(infohash, title))
except queue.Full:
self.logger.warning('Queue is full')
now = time.time()
time_passed = now - self._last_warning_time
if time_passed > 5: # sec
self.logger.warning('Queue is full')
self._last_warning_time = now

@force_switch
async def process_torrent_title(self, infohash: Optional[bytes] = None, title: Optional[str] = None) -> int:
Expand Down

0 comments on commit 8b2e3e3

Please sign in to comment.