From 9015af4902e882c949c34faf2027df4ddd24530c Mon Sep 17 00:00:00 2001 From: Brett Jia Date: Sun, 5 Mar 2023 22:25:47 -0500 Subject: [PATCH] arlo: optimize event handling (#601) * optimize event waiting by keying on properties * bump 0.6.6 * interrupt cleanup for other tasks * bump 0.6.7 for race condition fix --- plugins/arlo/package-lock.json | 6 +- plugins/arlo/package.json | 2 +- .../arlo/src/arlo_plugin/arlo/arlo_async.py | 62 ++++++-- .../arlo/src/arlo_plugin/arlo/stream_async.py | 135 ++++++++---------- 4 files changed, 112 insertions(+), 93 deletions(-) diff --git a/plugins/arlo/package-lock.json b/plugins/arlo/package-lock.json index af992667a1..978f1518ec 100644 --- a/plugins/arlo/package-lock.json +++ b/plugins/arlo/package-lock.json @@ -1,19 +1,19 @@ { "name": "@scrypted/arlo", - "version": "0.6.5", + "version": "0.6.7", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/arlo", - "version": "0.6.5", + "version": "0.6.7", "devDependencies": { "@scrypted/sdk": "file:../../sdk" } }, "../../sdk": { "name": "@scrypted/sdk", - "version": "0.2.63", + "version": "0.2.78", "dev": true, "license": "ISC", "dependencies": { diff --git a/plugins/arlo/package.json b/plugins/arlo/package.json index b4aeb690b3..f2fd038b9a 100644 --- a/plugins/arlo/package.json +++ b/plugins/arlo/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/arlo", - "version": "0.6.5", + "version": "0.6.7", "description": "Arlo Plugin for Scrypted", "keywords": [ "scrypted", diff --git a/plugins/arlo/src/arlo_plugin/arlo/arlo_async.py b/plugins/arlo/src/arlo_plugin/arlo/arlo_async.py index 3672ad01c3..1538e7c575 100644 --- a/plugins/arlo/src/arlo_plugin/arlo/arlo_async.py +++ b/plugins/arlo/src/arlo_plugin/arlo/arlo_async.py @@ -27,7 +27,7 @@ from .mqtt_stream_async import MQTTStream from .sse_stream_async import EventStream from .logging import logger - + # Import all of the other stuff. from datetime import datetime @@ -227,7 +227,7 @@ async def Subscribe(self, basestation_camera_tuples=[]): when subsequent calls to /notify are made. """ async def heartbeat(self, basestations, interval=30): - while self.event_stream and self.event_stream.connected: + while self.event_stream and self.event_stream.active: for basestation in basestations: try: self.Ping(basestation) @@ -378,7 +378,9 @@ def callbackwrapper(self, event): return None return stop - return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) + return asyncio.get_event_loop().create_task( + self.HandleEvents(basestation, resource, [('is', 'motionDetected')], callbackwrapper) + ) def SubscribeToBatteryEvents(self, basestation, camera, callback): """ @@ -403,7 +405,9 @@ def callbackwrapper(self, event): return None return stop - return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) + return asyncio.get_event_loop().create_task( + self.HandleEvents(basestation, resource, [('is', 'batteryLevel')], callbackwrapper) + ) def SubscribeToDoorbellEvents(self, basestation, doorbell, callback): """ @@ -437,7 +441,9 @@ def callbackwrapper(self, event): return None return stop - return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) + return asyncio.get_event_loop().create_task( + self.HandleEvents(basestation, resource, [('is', 'buttonPressed')], callbackwrapper) + ) def SubscribeToSDPAnswers(self, basestation, camera, callback): """ @@ -456,14 +462,16 @@ def callback(self, event) def callbackwrapper(self, event): properties = event.get("properties", {}) - stop = None + stop = None if properties.get("type") == "answerSdp": stop = callback(properties.get("data")) if not stop: return None return stop - return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper)) + return asyncio.get_event_loop().create_task( + self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper) + ) def SubscribeToCandidateAnswers(self, basestation, camera, callback): """ @@ -482,14 +490,16 @@ def callback(self, event) def callbackwrapper(self, event): properties = event.get("properties", {}) - stop = None + stop = None if properties.get("type") == "answerCandidate": stop = callback(properties.get("data")) if not stop: return None return stop - return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper)) + return asyncio.get_event_loop().create_task( + self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper) + ) async def HandleEvents(self, basestation, resource, actions, callback): """ @@ -502,9 +512,17 @@ async def HandleEvents(self, basestation, resource, actions, callback): await self.Subscribe() async def loop_action_listener(action): + # in this function, action can either be a tuple or a string + # if it is a tuple, we expect there to be a property key in the tuple + property = None + if isinstance(action, tuple): + action, property = action + if not isinstance(action, str): + raise Exception('Actions must be either a tuple or a str') + seen_events = {} while self.event_stream.active: - event, _ = await self.event_stream.get(resource, [action], seen_events) + event, _ = await self.event_stream.get(resource, action, property, seen_events) if event is None or self.event_stream is None \ or self.event_stream.event_stream_stop_event.is_set(): @@ -514,7 +532,7 @@ async def loop_action_listener(action): response = callback(self, event.item) # always requeue so other listeners can see the event too - self.event_stream.requeue(event, resource, action) + self.event_stream.requeue(event, resource, action, property) if response is not None: return response @@ -606,7 +624,13 @@ def callback(self, event): return nl.stream_url_dict['url'].replace("rtsp://", "rtsps://") return None - return await self.TriggerAndHandleEvent(basestation, resource, ["is"], trigger, callback) + return await self.TriggerAndHandleEvent( + basestation, + resource, + [("is", "activityState")], + trigger, + callback, + ) def StartPushToTalk(self, basestation, camera): url = f'https://{self.BASE_URL}/hmsweb/users/devices/{self.user_id}_{camera.get("deviceId")}/pushtotalk' @@ -644,8 +668,6 @@ def NotifyPushToTalkCandidate(self, basestation, camera, uSessionId, localCandid async def TriggerFullFrameSnapshot(self, basestation, camera): """ This function causes the camera to record a fullframe snapshot. - The presignedFullFrameSnapshotUrl url is returned. - Use DownloadSnapshot() to download the actual image file. """ resource = f"cameras/{camera.get('deviceId')}" @@ -676,4 +698,14 @@ def callback(self, event): return url return None - return await self.TriggerAndHandleEvent(basestation, resource, ["fullFrameSnapshotAvailable", "lastImageSnapshotAvailable", "is"], trigger, callback) + return await self.TriggerAndHandleEvent( + basestation, + resource, + [ + (action, property) + for action in ["fullFrameSnapshotAvailable", "lastImageSnapshotAvailable", "is"] + for property in ["presignedFullFrameSnapshotUrl", "presignedLastImageUrl"] + ], + trigger, + callback, + ) diff --git a/plugins/arlo/src/arlo_plugin/arlo/stream_async.py b/plugins/arlo/src/arlo_plugin/arlo/stream_async.py index 54ac18ee79..64c0641f3c 100644 --- a/plugins/arlo/src/arlo_plugin/arlo/stream_async.py +++ b/plugins/arlo/src/arlo_plugin/arlo/stream_async.py @@ -28,7 +28,7 @@ class Stream: """This class provides a queue-based EventStream object.""" - def __init__(self, arlo, expire=10): + def __init__(self, arlo, expire=5): self.event_stream = None self.initializing = True self.connected = False @@ -43,7 +43,7 @@ def __init__(self, arlo, expire=10): self.event_loop = asyncio.get_event_loop() self.event_loop.create_task(self._clean_queues()) self.event_loop.create_task(self._refresh_interval()) - + def __del__(self): self.disconnect() @@ -83,11 +83,16 @@ def set_refresh_interval(self, interval): self.refresh_loop_signal.put_nowait(object()) async def _clean_queues(self): - interval = self.expire * 2 + interval = self.expire * 4 await asyncio.sleep(interval) while not self.event_stream_stop_event.is_set(): - for key, q in self.queues.items(): + # since we interrupt the cleanup loop after every queue, there's + # a chance the self.queues dict is modified during iteration. + # so, we first make a copy of all the items of the dict and any + # new queues will be processed on the next loop through + queue_items = [i for i in self.queues.items()] + for key, q in queue_items: if q.empty(): continue @@ -114,81 +119,47 @@ async def _clean_queues(self): if num_dropped > 0: logger.debug(f"Cleaned {num_dropped} events from queue {key}") + # cleanup is not urgent, so give other tasks a chance + await asyncio.sleep(0.1) + await asyncio.sleep(interval) - async def get(self, resource, actions, skip_uuids={}): - if len(actions) == 1: - action = actions[0] + async def get(self, resource, action, property=None, skip_uuids={}): + if not property: key = f"{resource}/{action}" + else: + key = f"{resource}/{action}/{property}" - if key not in self.queues: - q = self.queues[key] = asyncio.Queue() - else: - q = self.queues[key] - - first_requeued = None - while True: - event = await q.get() - q.task_done() - - if not event: - # exit signal received - return None, action - - if first_requeued is not None and first_requeued is event: - # if we reach here, we've cycled through the whole queue - # and found nothing for us, so sleep and give the next - # subscriber a chance - q.put_nowait(event) - await asyncio.sleep(random.uniform(0, 0.01)) - continue - - if event.expired: - continue - elif event.uuid in skip_uuids: - q.put_nowait(event) - if first_requeued is None: - first_requeued = event - else: - return event, action + if key not in self.queues: + q = self.queues[key] = asyncio.Queue() else: - while True: - for action in actions: - key = f"{resource}/{action}" + q = self.queues[key] - if key not in self.queues: - q = self.queues[key] = asyncio.Queue() - else: - q = self.queues[key] + first_requeued = None + while True: + event = await q.get() + q.task_done() - if q.empty(): - continue + if not event: + # exit signal received + return None, action - first_requeued = None - while not q.empty(): - event = q.get_nowait() - q.task_done() - - if not event: - # exit signal received - return None, action - - if first_requeued is not None and first_requeued is event: - # if we reach here, we've cycled through the whole queue - # and found nothing for us, so go to the next queue - q.put_nowait(event) - break - - if event.expired: - continue - elif event.uuid in skip_uuids: - q.put_nowait(event) - - if first_requeued is None: - first_requeued = event - else: - return event, action + if first_requeued is not None and first_requeued is event: + # if we reach here, we've cycled through the whole queue + # and found nothing for us, so sleep and give the next + # subscriber a chance + q.put_nowait(event) await asyncio.sleep(random.uniform(0, 0.01)) + continue + + if event.expired: + continue + elif event.uuid in skip_uuids: + q.put_nowait(event) + if first_requeued is None: + first_requeued = event + else: + return event, action async def start(self): raise NotImplementedError() @@ -203,15 +174,31 @@ def _queue_response(self, response): resource = response.get('resource') action = response.get('action') key = f"{resource}/{action}" + + now = time.time() + event = StreamEvent(response, now, now + self.expire) + if key not in self.queues: q = self.queues[key] = asyncio.Queue() else: q = self.queues[key] - now = time.time() - q.put_nowait(StreamEvent(response, now, now + self.expire)) + q.put_nowait(event) - def requeue(self, event, resource, action): - key = f"{resource}/{action}" + # for optimized lookups, notify listeners of individual properties + properties = response.get('properties', {}) + for property in properties.keys(): + key = f"{resource}/{action}/{property}" + if key not in self.queues: + q = self.queues[key] = asyncio.Queue() + else: + q = self.queues[key] + q.put_nowait(event) + + def requeue(self, event, resource, action, property=None): + if not property: + key = f"{resource}/{action}" + else: + key = f"{resource}/{action}/{property}" self.queues[key].put_nowait(event) def disconnect(self):