Skip to content

Commit

Permalink
arlo: optimize event handling (#601)
Browse files Browse the repository at this point in the history
* optimize event waiting by keying on properties

* bump 0.6.6

* interrupt cleanup for other tasks

* bump 0.6.7 for race condition fix
  • Loading branch information
bjia56 authored Mar 6, 2023
1 parent 7902a09 commit 9015af4
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 93 deletions.
6 changes: 3 additions & 3 deletions plugins/arlo/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugins/arlo/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/arlo",
"version": "0.6.5",
"version": "0.6.7",
"description": "Arlo Plugin for Scrypted",
"keywords": [
"scrypted",
Expand Down
62 changes: 47 additions & 15 deletions plugins/arlo/src/arlo_plugin/arlo/arlo_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .mqtt_stream_async import MQTTStream
from .sse_stream_async import EventStream
from .logging import logger

# Import all of the other stuff.
from datetime import datetime

Expand Down Expand Up @@ -227,7 +227,7 @@ async def Subscribe(self, basestation_camera_tuples=[]):
when subsequent calls to /notify are made.
"""
async def heartbeat(self, basestations, interval=30):
while self.event_stream and self.event_stream.connected:
while self.event_stream and self.event_stream.active:
for basestation in basestations:
try:
self.Ping(basestation)
Expand Down Expand Up @@ -378,7 +378,9 @@ def callbackwrapper(self, event):
return None
return stop

return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, [('is', 'motionDetected')], callbackwrapper)
)

def SubscribeToBatteryEvents(self, basestation, camera, callback):
"""
Expand All @@ -403,7 +405,9 @@ def callbackwrapper(self, event):
return None
return stop

return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, [('is', 'batteryLevel')], callbackwrapper)
)

def SubscribeToDoorbellEvents(self, basestation, doorbell, callback):
"""
Expand Down Expand Up @@ -437,7 +441,9 @@ def callbackwrapper(self, event):
return None
return stop

return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, [('is', 'buttonPressed')], callbackwrapper)
)

def SubscribeToSDPAnswers(self, basestation, camera, callback):
"""
Expand All @@ -456,14 +462,16 @@ def callback(self, event)

def callbackwrapper(self, event):
properties = event.get("properties", {})
stop = None
stop = None
if properties.get("type") == "answerSdp":
stop = callback(properties.get("data"))
if not stop:
return None
return stop

return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper)
)

def SubscribeToCandidateAnswers(self, basestation, camera, callback):
"""
Expand All @@ -482,14 +490,16 @@ def callback(self, event)

def callbackwrapper(self, event):
properties = event.get("properties", {})
stop = None
stop = None
if properties.get("type") == "answerCandidate":
stop = callback(properties.get("data"))
if not stop:
return None
return stop

return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper))
return asyncio.get_event_loop().create_task(
self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper)
)

async def HandleEvents(self, basestation, resource, actions, callback):
"""
Expand All @@ -502,9 +512,17 @@ async def HandleEvents(self, basestation, resource, actions, callback):
await self.Subscribe()

async def loop_action_listener(action):
# in this function, action can either be a tuple or a string
# if it is a tuple, we expect there to be a property key in the tuple
property = None
if isinstance(action, tuple):
action, property = action
if not isinstance(action, str):
raise Exception('Actions must be either a tuple or a str')

seen_events = {}
while self.event_stream.active:
event, _ = await self.event_stream.get(resource, [action], seen_events)
event, _ = await self.event_stream.get(resource, action, property, seen_events)

if event is None or self.event_stream is None \
or self.event_stream.event_stream_stop_event.is_set():
Expand All @@ -514,7 +532,7 @@ async def loop_action_listener(action):
response = callback(self, event.item)

# always requeue so other listeners can see the event too
self.event_stream.requeue(event, resource, action)
self.event_stream.requeue(event, resource, action, property)

if response is not None:
return response
Expand Down Expand Up @@ -606,7 +624,13 @@ def callback(self, event):
return nl.stream_url_dict['url'].replace("rtsp://", "rtsps://")
return None

return await self.TriggerAndHandleEvent(basestation, resource, ["is"], trigger, callback)
return await self.TriggerAndHandleEvent(
basestation,
resource,
[("is", "activityState")],
trigger,
callback,
)

def StartPushToTalk(self, basestation, camera):
url = f'https://{self.BASE_URL}/hmsweb/users/devices/{self.user_id}_{camera.get("deviceId")}/pushtotalk'
Expand Down Expand Up @@ -644,8 +668,6 @@ def NotifyPushToTalkCandidate(self, basestation, camera, uSessionId, localCandid
async def TriggerFullFrameSnapshot(self, basestation, camera):
"""
This function causes the camera to record a fullframe snapshot.
The presignedFullFrameSnapshotUrl url is returned.
Use DownloadSnapshot() to download the actual image file.
"""
resource = f"cameras/{camera.get('deviceId')}"

Expand Down Expand Up @@ -676,4 +698,14 @@ def callback(self, event):
return url
return None

return await self.TriggerAndHandleEvent(basestation, resource, ["fullFrameSnapshotAvailable", "lastImageSnapshotAvailable", "is"], trigger, callback)
return await self.TriggerAndHandleEvent(
basestation,
resource,
[
(action, property)
for action in ["fullFrameSnapshotAvailable", "lastImageSnapshotAvailable", "is"]
for property in ["presignedFullFrameSnapshotUrl", "presignedLastImageUrl"]
],
trigger,
callback,
)
135 changes: 61 additions & 74 deletions plugins/arlo/src/arlo_plugin/arlo/stream_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

class Stream:
"""This class provides a queue-based EventStream object."""
def __init__(self, arlo, expire=10):
def __init__(self, arlo, expire=5):
self.event_stream = None
self.initializing = True
self.connected = False
Expand All @@ -43,7 +43,7 @@ def __init__(self, arlo, expire=10):
self.event_loop = asyncio.get_event_loop()
self.event_loop.create_task(self._clean_queues())
self.event_loop.create_task(self._refresh_interval())

def __del__(self):
self.disconnect()

Expand Down Expand Up @@ -83,11 +83,16 @@ def set_refresh_interval(self, interval):
self.refresh_loop_signal.put_nowait(object())

async def _clean_queues(self):
interval = self.expire * 2
interval = self.expire * 4

await asyncio.sleep(interval)
while not self.event_stream_stop_event.is_set():
for key, q in self.queues.items():
# since we interrupt the cleanup loop after every queue, there's
# a chance the self.queues dict is modified during iteration.
# so, we first make a copy of all the items of the dict and any
# new queues will be processed on the next loop through
queue_items = [i for i in self.queues.items()]
for key, q in queue_items:
if q.empty():
continue

Expand All @@ -114,81 +119,47 @@ async def _clean_queues(self):
if num_dropped > 0:
logger.debug(f"Cleaned {num_dropped} events from queue {key}")

# cleanup is not urgent, so give other tasks a chance
await asyncio.sleep(0.1)

await asyncio.sleep(interval)

async def get(self, resource, actions, skip_uuids={}):
if len(actions) == 1:
action = actions[0]
async def get(self, resource, action, property=None, skip_uuids={}):
if not property:
key = f"{resource}/{action}"
else:
key = f"{resource}/{action}/{property}"

if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]

first_requeued = None
while True:
event = await q.get()
q.task_done()

if not event:
# exit signal received
return None, action

if first_requeued is not None and first_requeued is event:
# if we reach here, we've cycled through the whole queue
# and found nothing for us, so sleep and give the next
# subscriber a chance
q.put_nowait(event)
await asyncio.sleep(random.uniform(0, 0.01))
continue

if event.expired:
continue
elif event.uuid in skip_uuids:
q.put_nowait(event)
if first_requeued is None:
first_requeued = event
else:
return event, action
if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
while True:
for action in actions:
key = f"{resource}/{action}"
q = self.queues[key]

if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]
first_requeued = None
while True:
event = await q.get()
q.task_done()

if q.empty():
continue
if not event:
# exit signal received
return None, action

first_requeued = None
while not q.empty():
event = q.get_nowait()
q.task_done()

if not event:
# exit signal received
return None, action

if first_requeued is not None and first_requeued is event:
# if we reach here, we've cycled through the whole queue
# and found nothing for us, so go to the next queue
q.put_nowait(event)
break

if event.expired:
continue
elif event.uuid in skip_uuids:
q.put_nowait(event)

if first_requeued is None:
first_requeued = event
else:
return event, action
if first_requeued is not None and first_requeued is event:
# if we reach here, we've cycled through the whole queue
# and found nothing for us, so sleep and give the next
# subscriber a chance
q.put_nowait(event)
await asyncio.sleep(random.uniform(0, 0.01))
continue

if event.expired:
continue
elif event.uuid in skip_uuids:
q.put_nowait(event)
if first_requeued is None:
first_requeued = event
else:
return event, action

async def start(self):
raise NotImplementedError()
Expand All @@ -203,15 +174,31 @@ def _queue_response(self, response):
resource = response.get('resource')
action = response.get('action')
key = f"{resource}/{action}"

now = time.time()
event = StreamEvent(response, now, now + self.expire)

if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]
now = time.time()
q.put_nowait(StreamEvent(response, now, now + self.expire))
q.put_nowait(event)

def requeue(self, event, resource, action):
key = f"{resource}/{action}"
# for optimized lookups, notify listeners of individual properties
properties = response.get('properties', {})
for property in properties.keys():
key = f"{resource}/{action}/{property}"
if key not in self.queues:
q = self.queues[key] = asyncio.Queue()
else:
q = self.queues[key]
q.put_nowait(event)

def requeue(self, event, resource, action, property=None):
if not property:
key = f"{resource}/{action}"
else:
key = f"{resource}/{action}/{property}"
self.queues[key].put_nowait(event)

def disconnect(self):
Expand Down

0 comments on commit 9015af4

Please sign in to comment.