diff --git a/homeassistant/components/go2rtc/__init__.py b/homeassistant/components/go2rtc/__init__.py index a07a62305f2eee..ca4aeeed9384d3 100644 --- a/homeassistant/components/go2rtc/__init__.py +++ b/homeassistant/components/go2rtc/__init__.py @@ -1,5 +1,8 @@ """The go2rtc component.""" +from __future__ import annotations + +from dataclasses import dataclass import logging import shutil @@ -38,7 +41,13 @@ from homeassistant.util.hass_dict import HassKey from homeassistant.util.package import is_docker_env -from .const import CONF_DEBUG_UI, DEBUG_UI_URL_MESSAGE, DOMAIN, HA_MANAGED_URL +from .const import ( + CONF_DEBUG_UI, + DEBUG_UI_URL_MESSAGE, + DOMAIN, + HA_MANAGED_RTSP_PORT, + HA_MANAGED_URL, +) from .server import Server _LOGGER = logging.getLogger(__name__) @@ -85,13 +94,22 @@ extra=vol.ALLOW_EXTRA, ) -_DATA_GO2RTC: HassKey[str] = HassKey(DOMAIN) +_DATA_GO2RTC: HassKey[Go2RtcData] = HassKey(DOMAIN) _RETRYABLE_ERRORS = (ClientConnectionError, ServerConnectionError) +@dataclass(frozen=True) +class Go2RtcData: + """Data for go2rtc.""" + + url: str + managed: bool + + async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up WebRTC.""" url: str | None = None + managed = False if DOMAIN not in config and DEFAULT_CONFIG_DOMAIN not in config: await _remove_go2rtc_entries(hass) return True @@ -126,8 +144,9 @@ async def on_stop(event: Event) -> None: hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, on_stop) url = HA_MANAGED_URL + managed = True - hass.data[_DATA_GO2RTC] = url + hass.data[_DATA_GO2RTC] = Go2RtcData(url, managed) discovery_flow.async_create_flow( hass, DOMAIN, context={"source": SOURCE_SYSTEM}, data={} ) @@ -142,28 +161,32 @@ async def _remove_go2rtc_entries(hass: HomeAssistant) -> None: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up go2rtc from a config entry.""" - url = hass.data[_DATA_GO2RTC] + data = hass.data[_DATA_GO2RTC] # Validate the server URL try: - client = Go2RtcRestClient(async_get_clientsession(hass), url) + client = Go2RtcRestClient(async_get_clientsession(hass), data.url) await client.validate_server_version() except Go2RtcClientError as err: if isinstance(err.__cause__, _RETRYABLE_ERRORS): raise ConfigEntryNotReady( - f"Could not connect to go2rtc instance on {url}" + f"Could not connect to go2rtc instance on {data.url}" ) from err - _LOGGER.warning("Could not connect to go2rtc instance on %s (%s)", url, err) + _LOGGER.warning( + "Could not connect to go2rtc instance on %s (%s)", data.url, err + ) return False except Go2RtcVersionError as err: raise ConfigEntryNotReady( f"The go2rtc server version is not supported, {err}" ) from err except Exception as err: # noqa: BLE001 - _LOGGER.warning("Could not connect to go2rtc instance on %s (%s)", url, err) + _LOGGER.warning( + "Could not connect to go2rtc instance on %s (%s)", data.url, err + ) return False - provider = WebRTCProvider(hass, url) + provider = WebRTCProvider(hass, data) async_register_webrtc_provider(hass, provider) return True @@ -181,12 +204,12 @@ async def _get_binary(hass: HomeAssistant) -> str | None: class WebRTCProvider(CameraWebRTCProvider): """WebRTC provider.""" - def __init__(self, hass: HomeAssistant, url: str) -> None: + def __init__(self, hass: HomeAssistant, data: Go2RtcData) -> None: """Initialize the WebRTC provider.""" self._hass = hass - self._url = url + self._data = data self._session = async_get_clientsession(hass) - self._rest_client = Go2RtcRestClient(self._session, url) + self._rest_client = Go2RtcRestClient(self._session, data.url) self._sessions: dict[str, Go2RtcWsClient] = {} @property @@ -208,7 +231,7 @@ async def async_handle_async_webrtc_offer( ) -> None: """Handle the WebRTC offer and return the answer via the provided callback.""" self._sessions[session_id] = ws_client = Go2RtcWsClient( - self._session, self._url, source=camera.entity_id + self._session, self._data.url, source=camera.entity_id ) if not (stream_source := await camera.stream_source()): @@ -219,8 +242,30 @@ async def async_handle_async_webrtc_offer( streams = await self._rest_client.streams.list() - if (stream := streams.get(camera.entity_id)) is None or not any( - stream_source == producer.url for producer in stream.producers + if self._data.managed: + # HA manages the go2rtc instance + stream_org_name = camera.entity_id + "_orginal" + stream_redirect_sources = [ + f"rtsp://127.0.0.1:{HA_MANAGED_RTSP_PORT}/{stream_org_name}", + f"ffmpeg:{stream_org_name}#audio=opus", + ] + + if ( + (stream_org := streams.get(stream_org_name)) is None + or not any( + stream_source == producer.url for producer in stream_org.producers + ) + or (stream_redirect := streams.get(camera.entity_id)) is None + or stream_redirect_sources != [p.url for p in stream_redirect.producers] + ): + await self._rest_client.streams.add(stream_org_name, stream_source) + await self._rest_client.streams.add( + camera.entity_id, stream_redirect_sources + ) + + # go2rtc instance is managed outside HA + elif (stream_org := streams.get(camera.entity_id)) is None or not any( + stream_source == producer.url for producer in stream_org.producers ): await self._rest_client.streams.add( camera.entity_id, diff --git a/homeassistant/components/go2rtc/const.py b/homeassistant/components/go2rtc/const.py index d33ae3e389759f..3c4dc9a9500968 100644 --- a/homeassistant/components/go2rtc/const.py +++ b/homeassistant/components/go2rtc/const.py @@ -6,3 +6,4 @@ DEBUG_UI_URL_MESSAGE = "Url and debug_ui cannot be set at the same time." HA_MANAGED_API_PORT = 11984 HA_MANAGED_URL = f"http://localhost:{HA_MANAGED_API_PORT}/" +HA_MANAGED_RTSP_PORT = 18554 diff --git a/homeassistant/components/go2rtc/server.py b/homeassistant/components/go2rtc/server.py index ed3b44aadf9a87..91f4433546caf2 100644 --- a/homeassistant/components/go2rtc/server.py +++ b/homeassistant/components/go2rtc/server.py @@ -12,7 +12,7 @@ from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.aiohttp_client import async_get_clientsession -from .const import HA_MANAGED_API_PORT, HA_MANAGED_URL +from .const import HA_MANAGED_API_PORT, HA_MANAGED_RTSP_PORT, HA_MANAGED_URL _LOGGER = logging.getLogger(__name__) _TERMINATE_TIMEOUT = 5 @@ -24,15 +24,16 @@ # Default configuration for HA # - Api is listening only on localhost -# - Disable rtsp listener +# - Enable rtsp for localhost only as ffmpeg needs it # - Clear default ice servers -_GO2RTC_CONFIG_FORMAT = r""" +_GO2RTC_CONFIG_FORMAT = r"""# This file is managed by Home Assistant +# Do not edit it manually + api: listen: "{api_ip}:{api_port}" rtsp: - # ffmpeg needs rtsp for opus audio transcoding - listen: "127.0.0.1:18554" + listen: "127.0.0.1:{rtsp_port}" webrtc: listen: ":18555/tcp" @@ -67,7 +68,9 @@ def _create_temp_file(api_ip: str) -> str: with NamedTemporaryFile(prefix="go2rtc_", suffix=".yaml", delete=False) as file: file.write( _GO2RTC_CONFIG_FORMAT.format( - api_ip=api_ip, api_port=HA_MANAGED_API_PORT + api_ip=api_ip, + api_port=HA_MANAGED_API_PORT, + rtsp_port=HA_MANAGED_RTSP_PORT, ).encode() ) return file.name diff --git a/tests/components/go2rtc/test_init.py b/tests/components/go2rtc/test_init.py index 18a46fdd4d1c2b..ea1971a31d9e86 100644 --- a/tests/components/go2rtc/test_init.py +++ b/tests/components/go2rtc/test_init.py @@ -3,7 +3,7 @@ from collections.abc import Callable, Generator import logging from typing import NamedTuple -from unittest.mock import AsyncMock, Mock, patch +from unittest.mock import AsyncMock, Mock, call, patch from aiohttp.client_exceptions import ClientConnectionError, ServerConnectionError from go2rtc_client import Stream @@ -296,7 +296,7 @@ async def test() -> None: ], ) @pytest.mark.parametrize("has_go2rtc_entry", [True, False]) -async def test_setup_go_binary( +async def test_setup_managed( hass: HomeAssistant, rest_client: AsyncMock, ws_client: Mock, @@ -308,15 +308,131 @@ async def test_setup_go_binary( config: ConfigType, ui_enabled: bool, ) -> None: - """Test the go2rtc config entry with binary.""" + """Test the go2rtc setup with managed go2rtc instance.""" assert (len(hass.config_entries.async_entries(DOMAIN)) == 1) == has_go2rtc_entry + camera = init_test_integration + + entity_id = camera.entity_id + stream_name_orginal = camera.entity_id + "_orginal" + assert camera.frontend_stream_type == StreamType.HLS + + assert await async_setup_component(hass, DOMAIN, config) + await hass.async_block_till_done(wait_background_tasks=True) + config_entries = hass.config_entries.async_entries(DOMAIN) + assert len(config_entries) == 1 + assert config_entries[0].state == ConfigEntryState.LOADED + server.assert_called_once_with(hass, "/usr/bin/go2rtc", enable_ui=ui_enabled) + server_start.assert_called_once() - def after_setup() -> None: - server.assert_called_once_with(hass, "/usr/bin/go2rtc", enable_ui=ui_enabled) - server_start.assert_called_once() + receive_message_callback = Mock(spec_set=WebRTCSendMessage) - await _test_setup_and_signaling( - hass, rest_client, ws_client, config, after_setup, init_test_integration + async def test() -> None: + await camera.async_handle_async_webrtc_offer( + OFFER_SDP, "session_id", receive_message_callback + ) + ws_client.send.assert_called_once_with( + WebRTCOffer( + OFFER_SDP, + camera.async_get_webrtc_client_configuration().configuration.ice_servers, + ) + ) + ws_client.subscribe.assert_called_once() + + # Simulate the answer from the go2rtc server + callback = ws_client.subscribe.call_args[0][0] + callback(WebRTCAnswer(ANSWER_SDP)) + receive_message_callback.assert_called_once_with(HAWebRTCAnswer(ANSWER_SDP)) + + await test() + + stream_added_calls = [ + call(stream_name_orginal, "rtsp://stream"), + call( + entity_id, + [ + f"rtsp://127.0.0.1:18554/{stream_name_orginal}", + f"ffmpeg:{stream_name_orginal}#audio=opus", + ], + ), + ] + assert rest_client.streams.add.call_args_list == stream_added_calls + + # Stream original missing + rest_client.streams.add.reset_mock() + rest_client.streams.list.return_value = { + entity_id: Stream( + [ + Producer(f"rtsp://127.0.0.1:18554/{stream_name_orginal}"), + Producer(f"ffmpeg:{stream_name_orginal}#audio=opus"), + ] + ) + } + + receive_message_callback.reset_mock() + ws_client.reset_mock() + await test() + + assert rest_client.streams.add.call_args_list == stream_added_calls + + # Stream original source different + rest_client.streams.add.reset_mock() + rest_client.streams.list.return_value = { + stream_name_orginal: Stream([Producer("rtsp://different")]), + entity_id: Stream( + [ + Producer(f"rtsp://127.0.0.1:18554/{stream_name_orginal}"), + Producer(f"ffmpeg:{stream_name_orginal}#audio=opus"), + ] + ), + } + + receive_message_callback.reset_mock() + ws_client.reset_mock() + await test() + + assert rest_client.streams.add.call_args_list == stream_added_calls + + # Stream source different + rest_client.streams.add.reset_mock() + rest_client.streams.list.return_value = { + stream_name_orginal: Stream([Producer("rtsp://stream")]), + entity_id: Stream([Producer("rtsp://different")]), + } + + receive_message_callback.reset_mock() + ws_client.reset_mock() + await test() + + assert rest_client.streams.add.call_args_list == stream_added_calls + + # If the stream is already added, the stream should not be added again. + rest_client.streams.add.reset_mock() + rest_client.streams.list.return_value = { + stream_name_orginal: Stream([Producer("rtsp://stream")]), + entity_id: Stream( + [ + Producer(f"rtsp://127.0.0.1:18554/{stream_name_orginal}"), + Producer(f"ffmpeg:{stream_name_orginal}#audio=opus"), + ] + ), + } + + receive_message_callback.reset_mock() + ws_client.reset_mock() + await test() + + rest_client.streams.add.assert_not_called() + assert isinstance(camera._webrtc_provider, WebRTCProvider) + + # Set stream source to None and provider should be skipped + rest_client.streams.list.return_value = {} + receive_message_callback.reset_mock() + camera.set_stream_source(None) + await camera.async_handle_async_webrtc_offer( + OFFER_SDP, "session_id", receive_message_callback + ) + receive_message_callback.assert_called_once_with( + WebRTCError("go2rtc_webrtc_offer_failed", "Camera has no stream source") ) await hass.async_stop() @@ -332,7 +448,7 @@ def after_setup() -> None: ], ) @pytest.mark.parametrize("has_go2rtc_entry", [True, False]) -async def test_setup_go( +async def test_setup_self_hosted( hass: HomeAssistant, rest_client: AsyncMock, ws_client: Mock, @@ -342,16 +458,83 @@ async def test_setup_go( mock_is_docker_env: Mock, has_go2rtc_entry: bool, ) -> None: - """Test the go2rtc config entry without binary.""" + """Test the go2rtc with selfhosted go2rtc instance.""" assert (len(hass.config_entries.async_entries(DOMAIN)) == 1) == has_go2rtc_entry config = {DOMAIN: {CONF_URL: "http://localhost:1984/"}} + camera = init_test_integration + + entity_id = camera.entity_id + assert camera.frontend_stream_type == StreamType.HLS - def after_setup() -> None: - server.assert_not_called() + assert await async_setup_component(hass, DOMAIN, config) + await hass.async_block_till_done(wait_background_tasks=True) + config_entries = hass.config_entries.async_entries(DOMAIN) + assert len(config_entries) == 1 + assert config_entries[0].state == ConfigEntryState.LOADED + server.assert_not_called() + + receive_message_callback = Mock(spec_set=WebRTCSendMessage) - await _test_setup_and_signaling( - hass, rest_client, ws_client, config, after_setup, init_test_integration + async def test() -> None: + await camera.async_handle_async_webrtc_offer( + OFFER_SDP, "session_id", receive_message_callback + ) + ws_client.send.assert_called_once_with( + WebRTCOffer( + OFFER_SDP, + camera.async_get_webrtc_client_configuration().configuration.ice_servers, + ) + ) + ws_client.subscribe.assert_called_once() + + # Simulate the answer from the go2rtc server + callback = ws_client.subscribe.call_args[0][0] + callback(WebRTCAnswer(ANSWER_SDP)) + receive_message_callback.assert_called_once_with(HAWebRTCAnswer(ANSWER_SDP)) + + await test() + + rest_client.streams.add.assert_called_once_with( + entity_id, ["rtsp://stream", f"ffmpeg:{camera.entity_id}#audio=opus"] + ) + + # Stream exists but the source is different + rest_client.streams.add.reset_mock() + rest_client.streams.list.return_value = { + entity_id: Stream([Producer("rtsp://different")]) + } + + receive_message_callback.reset_mock() + ws_client.reset_mock() + await test() + + rest_client.streams.add.assert_called_once_with( + entity_id, ["rtsp://stream", f"ffmpeg:{camera.entity_id}#audio=opus"] + ) + + # If the stream is already added, the stream should not be added again. + rest_client.streams.add.reset_mock() + rest_client.streams.list.return_value = { + entity_id: Stream([Producer("rtsp://stream")]) + } + + receive_message_callback.reset_mock() + ws_client.reset_mock() + await test() + + rest_client.streams.add.assert_not_called() + assert isinstance(camera._webrtc_provider, WebRTCProvider) + + # Set stream source to None and provider should be skipped + rest_client.streams.list.return_value = {} + receive_message_callback.reset_mock() + camera.set_stream_source(None) + await camera.async_handle_async_webrtc_offer( + OFFER_SDP, "session_id", receive_message_callback + ) + receive_message_callback.assert_called_once_with( + WebRTCError("go2rtc_webrtc_offer_failed", "Camera has no stream source") ) mock_get_binary.assert_not_called() diff --git a/tests/components/go2rtc/test_server.py b/tests/components/go2rtc/test_server.py index d810dbd88eb93c..e4fe3993f3cddf 100644 --- a/tests/components/go2rtc/test_server.py +++ b/tests/components/go2rtc/test_server.py @@ -105,12 +105,13 @@ async def test_server_run_success( # Verify that the config file was written mock_tempfile.write.assert_called_once_with( - f""" + f"""# This file is managed by Home Assistant +# Do not edit it manually + api: listen: "{api_ip}:11984" rtsp: - # ffmpeg needs rtsp for opus audio transcoding listen: "127.0.0.1:18554" webrtc: