Skip to content

Commit

Permalink
add lock to ensure atomic operations on upload items
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Feb 2, 2025
1 parent 6d12a47 commit fc2fe08
Showing 1 changed file with 47 additions and 33 deletions.
80 changes: 47 additions & 33 deletions system/athena/athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ def from_dict(cls, d: dict) -> UploadItem:
dispatcher["echo"] = lambda s: s
recv_queue: Queue[str] = queue.Queue()
send_queue: Queue[str] = queue.Queue()
upload_queue: Queue[UploadItem] = queue.Queue()

low_priority_send_queue: Queue[str] = queue.Queue()
log_recv_queue: Queue[str] = queue.Queue()
cancelled_uploads: set[str] = set()

upload_lock = threading.Lock()
upload_queue: Queue[UploadItem] = queue.Queue()
cancelled_uploads: set[str] = set()
cur_upload_items: dict[int, UploadItem | None] = {}


Expand Down Expand Up @@ -130,9 +132,10 @@ def initialize(upload_queue: Queue[UploadItem]) -> None:
@staticmethod
def cache(upload_queue: Queue[UploadItem]) -> None:
try:
queue: list[UploadItem | None] = list(upload_queue.queue)
items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)]
Params().put("AthenadUploadQueue", json.dumps(items))
with upload_lock:
queue: list[UploadItem | None] = list(upload_queue.queue)
items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)]
Params().put("AthenadUploadQueue", json.dumps(items))
except Exception:
cloudlog.exception("athena.UploadQueueCache.cache.exception")

Expand Down Expand Up @@ -191,21 +194,25 @@ def jsonrpc_handler(end_event: threading.Event) -> None:


def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = True) -> None:
item = cur_upload_items[tid]
if item is not None and item.retry_count < MAX_RETRY_COUNT:
new_retry_count = item.retry_count + 1 if increase_count else item.retry_count

item = replace(
item,
retry_count=new_retry_count,
progress=0,
current=False
)
upload_queue.put_nowait(item)
UploadQueueCache.cache(upload_queue)
retry = False
with upload_lock:
item = cur_upload_items[tid]
if item is not None and item.retry_count < MAX_RETRY_COUNT:
new_retry_count = item.retry_count + 1 if increase_count else item.retry_count

item = replace(
item,
retry_count=new_retry_count,
progress=0,
current=False
)
upload_queue.put_nowait(item)
retry = True

cur_upload_items[tid] = None

UploadQueueCache.cache(upload_queue)
if retry:
for _ in range(RETRY_DELAY):
time.sleep(1)
if end_event.is_set():
Expand All @@ -224,22 +231,23 @@ def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None:
if end_event.is_set():
raise AbortTransferException

cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)
with upload_lock:
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)


def upload_handler(end_event: threading.Event) -> None:
sm = messaging.SubMaster(['deviceState'])
tid = threading.get_ident()

while not end_event.is_set():
cur_upload_items[tid] = None

try:
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True)
with upload_lock:
cur_upload_items[tid] = None
cur_upload_items[tid] = item = replace(upload_queue.get_nowait(), current=True)

if item.id in cancelled_uploads:
cancelled_uploads.remove(item.id)
continue
if item.id in cancelled_uploads:
cancelled_uploads.remove(item.id)
continue

# Remove item if too old
age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000)
Expand Down Expand Up @@ -280,6 +288,7 @@ def upload_handler(end_event: threading.Event) -> None:
retry_upload(tid, end_event, False)

except queue.Empty:
end_event.wait(1)
pass
except Exception:
cloudlog.exception("athena.upload_handler.exception")
Expand Down Expand Up @@ -403,7 +412,10 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
)
upload_id = hashlib.sha1(str(item).encode()).hexdigest()
item = replace(item, id=upload_id)
upload_queue.put_nowait(item)

with upload_lock:
upload_queue.put_nowait(item)

items.append(asdict(item))

UploadQueueCache.cache(upload_queue)
Expand All @@ -418,22 +430,24 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo

@dispatcher.add_method
def listUploadQueue() -> list[UploadItemDict]:
items = list(upload_queue.queue) + list(cur_upload_items.values())
return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)]
with upload_lock:
items = list(upload_queue.queue) + list(cur_upload_items.values())
return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)]


@dispatcher.add_method
def cancelUpload(upload_id: str | list[str]) -> dict[str, int | str]:
if not isinstance(upload_id, list):
upload_id = [upload_id]

uploading_ids = {item.id for item in list(upload_queue.queue)}
cancelled_ids = uploading_ids.intersection(upload_id)
if len(cancelled_ids) == 0:
return {"success": 0, "error": "not found"}
with upload_lock:
uploading_ids = {item.id for item in list(upload_queue.queue)}
cancelled_ids = uploading_ids.intersection(upload_id)
if len(cancelled_ids) == 0:
return {"success": 0, "error": "not found"}

cancelled_uploads.update(cancelled_ids)
return {"success": 1}
cancelled_uploads.update(cancelled_ids)
return {"success": 1}

@dispatcher.add_method
def setRouteViewed(route: str) -> dict[str, int | str]:
Expand Down

0 comments on commit fc2fe08

Please sign in to comment.