diff --git a/system/athena/athenad.py b/system/athena/athenad.py index c926de97840483..2053900377b5f1 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -97,11 +97,13 @@ def from_dict(cls, d: dict) -> UploadItem: dispatcher["echo"] = lambda s: s recv_queue: Queue[str] = queue.Queue() send_queue: Queue[str] = queue.Queue() -upload_queue: Queue[UploadItem] = queue.Queue() + low_priority_send_queue: Queue[str] = queue.Queue() log_recv_queue: Queue[str] = queue.Queue() -cancelled_uploads: set[str] = set() +upload_lock = threading.Lock() +upload_queue: Queue[UploadItem] = queue.Queue() +cancelled_uploads: set[str] = set() cur_upload_items: dict[int, UploadItem | None] = {} @@ -130,9 +132,10 @@ def initialize(upload_queue: Queue[UploadItem]) -> None: @staticmethod def cache(upload_queue: Queue[UploadItem]) -> None: try: - queue: list[UploadItem | None] = list(upload_queue.queue) - items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)] - Params().put("AthenadUploadQueue", json.dumps(items)) + with upload_lock: + queue: list[UploadItem | None] = list(upload_queue.queue) + items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)] + Params().put("AthenadUploadQueue", json.dumps(items)) except Exception: cloudlog.exception("athena.UploadQueueCache.cache.exception") @@ -191,21 +194,25 @@ def jsonrpc_handler(end_event: threading.Event) -> None: def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = True) -> None: - item = cur_upload_items[tid] - if item is not None and item.retry_count < MAX_RETRY_COUNT: - new_retry_count = item.retry_count + 1 if increase_count else item.retry_count - - item = replace( - item, - retry_count=new_retry_count, - progress=0, - current=False - ) - upload_queue.put_nowait(item) - UploadQueueCache.cache(upload_queue) + retry = False + with upload_lock: + item = cur_upload_items[tid] + if item is not None and item.retry_count < MAX_RETRY_COUNT: + new_retry_count = item.retry_count + 1 if increase_count else item.retry_count + + item = replace( + item, + retry_count=new_retry_count, + progress=0, + current=False + ) + upload_queue.put_nowait(item) + retry = True cur_upload_items[tid] = None + UploadQueueCache.cache(upload_queue) + if retry: for _ in range(RETRY_DELAY): time.sleep(1) if end_event.is_set(): @@ -224,7 +231,8 @@ def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None: if end_event.is_set(): raise AbortTransferException - cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) + with upload_lock: + cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) def upload_handler(end_event: threading.Event) -> None: @@ -232,14 +240,14 @@ def upload_handler(end_event: threading.Event) -> None: tid = threading.get_ident() while not end_event.is_set(): - cur_upload_items[tid] = None - try: - cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True) + with upload_lock: + cur_upload_items[tid] = None + cur_upload_items[tid] = item = replace(upload_queue.get_nowait(), current=True) - if item.id in cancelled_uploads: - cancelled_uploads.remove(item.id) - continue + if item.id in cancelled_uploads: + cancelled_uploads.remove(item.id) + continue # Remove item if too old age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000) @@ -280,6 +288,7 @@ def upload_handler(end_event: threading.Event) -> None: retry_upload(tid, end_event, False) except queue.Empty: + end_event.wait(1) pass except Exception: cloudlog.exception("athena.upload_handler.exception") @@ -403,7 +412,10 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo ) upload_id = hashlib.sha1(str(item).encode()).hexdigest() item = replace(item, id=upload_id) - upload_queue.put_nowait(item) + + with upload_lock: + upload_queue.put_nowait(item) + items.append(asdict(item)) UploadQueueCache.cache(upload_queue) @@ -418,8 +430,9 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo @dispatcher.add_method def listUploadQueue() -> list[UploadItemDict]: - items = list(upload_queue.queue) + list(cur_upload_items.values()) - return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)] + with upload_lock: + items = list(upload_queue.queue) + list(cur_upload_items.values()) + return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)] @dispatcher.add_method @@ -427,13 +440,14 @@ def cancelUpload(upload_id: str | list[str]) -> dict[str, int | str]: if not isinstance(upload_id, list): upload_id = [upload_id] - uploading_ids = {item.id for item in list(upload_queue.queue)} - cancelled_ids = uploading_ids.intersection(upload_id) - if len(cancelled_ids) == 0: - return {"success": 0, "error": "not found"} + with upload_lock: + uploading_ids = {item.id for item in list(upload_queue.queue)} + cancelled_ids = uploading_ids.intersection(upload_id) + if len(cancelled_ids) == 0: + return {"success": 0, "error": "not found"} - cancelled_uploads.update(cancelled_ids) - return {"success": 1} + cancelled_uploads.update(cancelled_ids) + return {"success": 1} @dispatcher.add_method def setRouteViewed(route: str) -> dict[str, int | str]: