Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function for fetching ICE servers from service handlers #717

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hass_nabucasa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
STATE_CONNECTED,
)
from .google_report_state import GoogleReportState
from .ice_servers import IceServers
from .iot import CloudIoT
from .remote import RemoteUI
from .utils import UTC, gather_callbacks, parse_date, utcnow
Expand Down Expand Up @@ -75,6 +76,7 @@ def __init__(
self.remote = RemoteUI(self)
self.auth = CognitoAuth(self)
self.voice = Voice(self)
self.ice_servers = IceServers(self)

self._init_task: asyncio.Task | None = None

Expand Down
140 changes: 140 additions & 0 deletions hass_nabucasa/ice_servers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Manage ICE servers."""

from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
import logging
import time
from typing import TYPE_CHECKING

from aiohttp import ClientResponseError
from aiohttp.hdrs import AUTHORIZATION, USER_AGENT

if TYPE_CHECKING:
from . import Cloud, _ClientT


_LOGGER = logging.getLogger(__name__)


@dataclass
klejejs marked this conversation as resolved.
Show resolved Hide resolved
class IceServer:
"""ICE Server."""

urls: str
username: str
credential: str


class IceServers:
"""Class to manage ICE servers."""

def __init__(self, cloud: Cloud[_ClientT]) -> None:
"""Initialize ICE Servers."""
self.cloud = cloud
self._refresh_task: asyncio.Task | None = None
self._ice_servers: list[IceServer] = []
self._ice_servers_listener: Callable[[], Awaitable[None]] | None = None
self._ice_servers_listener_unregister: list[Callable[[], None]] = []

cloud.iot.register_on_connect(self.on_connect)
cloud.iot.register_on_disconnect(self.on_disconnect)

async def _async_fetch_ice_servers(self) -> None:
"""Fetch ICE servers."""
if TYPE_CHECKING:
assert self.cloud.id_token is not None

async with self.cloud.websession.get(
f"https://{self.cloud.servicehandlers_server}/webrtc/ice_servers",
headers={
AUTHORIZATION: self.cloud.id_token,
USER_AGENT: self.cloud.client.client_name,
},
) as resp:
if resp.status >= 400:
_LOGGER.error("Failed to fetch ICE servers: %s", resp.status)
klejejs marked this conversation as resolved.
Show resolved Hide resolved

resp.raise_for_status()
data: list[IceServer] = await resp.json()
klejejs marked this conversation as resolved.
Show resolved Hide resolved

self._ice_servers = data

klejejs marked this conversation as resolved.
Show resolved Hide resolved
if self._ice_servers_listener is not None:
await self._ice_servers_listener()

def _get_refresh_sleep_time(self) -> int:
"""Get the sleep time for refreshing ICE servers."""
timestamps = [
int(server.username.split(":")[0])
for server in self._ice_servers
if server.urls.startswith("turn:")
]

if not timestamps:
return 3600 # 1 hour

# 1 hour before the earliest expiration
return min(timestamps) - int(time.time()) - 3600
klejejs marked this conversation as resolved.
Show resolved Hide resolved

async def _async_refresh_ice_servers(self) -> None:
"""Handle ICE server refresh."""
while True:
try:
await self._async_fetch_ice_servers()
except ClientResponseError as err:
_LOGGER.error("Can't refresh ICE servers: %s", err)
except asyncio.CancelledError:
# Task is canceled, stop it.
break

sleep_time = self._get_refresh_sleep_time()
await asyncio.sleep(sleep_time)

async def on_connect(self) -> None:
"""When the instance is connected."""
self._refresh_task = asyncio.create_task(self._async_refresh_ice_servers())

async def on_disconnect(self) -> None:
"""When the instance is disconnected."""
if self._refresh_task is not None:
self._refresh_task.cancel()
self._refresh_task = None

async def async_register_ice_servers_listener(
self,
register_ice_server_fn: Callable[[IceServer], Awaitable[Callable[[], None]]],
) -> Callable[[], None]:
"""Register a listener for ICE servers."""
_LOGGER.debug("Registering ICE servers listener")

async def perform_ice_server_update() -> None:
"""Perform ICE server update."""
_LOGGER.debug("Updating ICE servers")

for unregister in self._ice_servers_listener_unregister:
unregister()

if not self._ice_servers:
self._ice_servers_listener_unregister = []
return

self._ice_servers_listener_unregister = [
await register_ice_server_fn(ice_server)
for ice_server in self._ice_servers
]

_LOGGER.debug("ICE servers updated")

def remove_listener() -> None:
"""Remove listener."""
self._ice_servers_listener = None

self._ice_servers_listener = perform_ice_server_update

if self._ice_servers:
await self._ice_servers_listener()

return remove_listener