From 590d5b0ec94264dab2c3936af631832639af7e87 Mon Sep 17 00:00:00 2001 From: Chris <52449218+shadow578@users.noreply.github.com> Date: Fri, 24 May 2024 08:41:07 +0000 Subject: [PATCH] refactor model module --- custom_components/xmltv_epg/api.py | 2 +- custom_components/xmltv_epg/entity.py | 2 +- custom_components/xmltv_epg/model/__init__.py | 11 + custom_components/xmltv_epg/model/channel.py | 72 +++++ custom_components/xmltv_epg/model/guide.py | 72 +++++ custom_components/xmltv_epg/model/helper.py | 11 + custom_components/xmltv_epg/model/program.py | 150 +++++++++ custom_components/xmltv_epg/sensor.py | 2 +- custom_components/xmltv_epg/xmltv/__init__.py | 1 - custom_components/xmltv_epg/xmltv/model.py | 287 ------------------ test/conftest.py | 8 +- test/{xmltv => model}/__init__.py | 0 test/{xmltv => model}/test_TVChannel.py | 2 +- test/{xmltv => model}/test_TVGuide.py | 2 +- test/{xmltv => model}/test_TVProgram.py | 2 +- test/test_config_flow.py | 2 +- test/test_coordinator.py | 2 +- test/test_init.py | 2 +- test/test_sensor.py | 2 +- 19 files changed, 330 insertions(+), 302 deletions(-) create mode 100644 custom_components/xmltv_epg/model/__init__.py create mode 100644 custom_components/xmltv_epg/model/channel.py create mode 100644 custom_components/xmltv_epg/model/guide.py create mode 100644 custom_components/xmltv_epg/model/helper.py create mode 100644 custom_components/xmltv_epg/model/program.py delete mode 100644 custom_components/xmltv_epg/xmltv/__init__.py delete mode 100644 custom_components/xmltv_epg/xmltv/model.py rename test/{xmltv => model}/__init__.py (100%) rename test/{xmltv => model}/test_TVChannel.py (96%) rename test/{xmltv => model}/test_TVGuide.py (94%) rename test/{xmltv => model}/test_TVProgram.py (98%) diff --git a/custom_components/xmltv_epg/api.py b/custom_components/xmltv_epg/api.py index 2e55e36..980f679 100644 --- a/custom_components/xmltv_epg/api.py +++ b/custom_components/xmltv_epg/api.py @@ -8,7 +8,7 @@ import xml.etree.ElementTree as ET -from .xmltv.model import TVGuide +from .model import TVGuide import gzip class XMLTVClientError(Exception): diff --git a/custom_components/xmltv_epg/entity.py b/custom_components/xmltv_epg/entity.py index 87e793e..fbb53e3 100644 --- a/custom_components/xmltv_epg/entity.py +++ b/custom_components/xmltv_epg/entity.py @@ -5,7 +5,7 @@ from .coordinator import XMLTVDataUpdateCoordinator -from .xmltv.model import TVGuide +from .model import TVGuide class XMLTVEntity(CoordinatorEntity): diff --git a/custom_components/xmltv_epg/model/__init__.py b/custom_components/xmltv_epg/model/__init__.py new file mode 100644 index 0000000..8417b72 --- /dev/null +++ b/custom_components/xmltv_epg/model/__init__.py @@ -0,0 +1,11 @@ +"""XMLTV Model.""" + +from .channel import TVChannel +from .program import TVProgram +from .guide import TVGuide + +__all__ = [ + 'TVChannel', + 'TVProgram', + 'TVGuide', +] diff --git a/custom_components/xmltv_epg/model/channel.py b/custom_components/xmltv_epg/model/channel.py new file mode 100644 index 0000000..c68f877 --- /dev/null +++ b/custom_components/xmltv_epg/model/channel.py @@ -0,0 +1,72 @@ +"""TV Channel Model.""" +from datetime import datetime +import xml.etree.ElementTree as ET + +from .program import TVProgram +from .helper import is_none_or_whitespace, get_child_as_text + +class TVChannel: + """TV Channel Class.""" + + TAG = 'channel' + + def __init__(self, id: str, name: str): + """Initialize TV Channel.""" + self.id = id + self.name = name + self.programs = [] + + def add_program(self, program: TVProgram): + """Add a program to channel.""" + self.programs.append(program) + + # keep programs sorted by start time + self.programs.sort(key=lambda p: p.start) + + def get_current_program(self, time: datetime) -> TVProgram: + """Get current program at given time.""" + for program in self.programs: + if program.start.timestamp() <= time.timestamp() < program.end.timestamp(): + return program + + return None + + def get_next_program(self, time: datetime) -> TVProgram: + """Get next program after given time.""" + for program in self.programs: + if program.start.timestamp() >= time.timestamp(): + return program + + return None + + @classmethod + def from_xml(cls, xml: ET.Element) -> 'TVChannel': + """Initialize TV Channel from XML Node, if possible. + + :param xml: XML Node + :return: TV Channel object, or None + + XML node format is: + + WDR Essen + + """ + + # node must be a channel + if xml.tag != cls.TAG: + return None + + # get id and display name + id = xml.attrib.get('id') + if is_none_or_whitespace(id): + return None + + name = get_child_as_text(xml, 'display-name') + if is_none_or_whitespace(name): + return None + + # remove 'XX: ' prefix from name, if present + if len(name) > 4 and name[2] == ':' and name[3] == ' ': # 'XX: ' + name = name[4:] + + return cls(id, name) diff --git a/custom_components/xmltv_epg/model/guide.py b/custom_components/xmltv_epg/model/guide.py new file mode 100644 index 0000000..f99a365 --- /dev/null +++ b/custom_components/xmltv_epg/model/guide.py @@ -0,0 +1,72 @@ +"""TV Guide Model.""" +import xml.etree.ElementTree as ET + +from .channel import TVChannel +from .program import TVProgram + +class TVGuide: + """TV Guide Class.""" + + TAG = 'tv' + + def __init__(self, generator_name: str = None, generator_url: str = None): + """Initialize TV Guide.""" + self.generator_name = generator_name + self.generator_url = generator_url + + self.channels = [] + self.programs = [] + + def get_channel(self, channel_id: str) -> TVChannel: + """Get channel by ID.""" + return next((c for c in self.channels if c.id == channel_id), None) + + @classmethod + def from_xml(cls, xml: ET.Element) -> 'TVGuide': + """Initialize TV Guide from XML Node, if possible. + + :param xml: XML Node + :return: TV Guide object, or None + + XML node format is: + + + + + """ + + # node must be a TV guide + if xml.tag != cls.TAG: + return None + + # parse generator info + generator_name = xml.attrib.get('generator-info-name') + generator_url = xml.attrib.get('generator-info-url') + + # create guide instance + guide = cls(generator_name, generator_url) + + # parse channels and programs + for child in xml: + if child.tag == TVChannel.TAG: + channel = TVChannel.from_xml(child) + if channel is not None: + # ensure no duplicate channel ids + if guide.get_channel(channel.id) is None: + guide.channels.append(channel) + else: + # ?! + continue + elif child.tag == TVProgram.TAG: + program = TVProgram.from_xml(child) + if program is not None: + guide.programs.append(program) + else: + # ?! + continue + + # cross-link programs with channels + for program in guide.programs: + program.cross_link_channel(guide.channels) + + return guide diff --git a/custom_components/xmltv_epg/model/helper.py b/custom_components/xmltv_epg/model/helper.py new file mode 100644 index 0000000..b68ef13 --- /dev/null +++ b/custom_components/xmltv_epg/model/helper.py @@ -0,0 +1,11 @@ +"""Model helper functions.""" +import xml.etree.ElementTree as ET + +def is_none_or_whitespace(s: str) -> bool: + """Check if string is None, empty, or whitespace.""" + return s is None or not isinstance(s, str) or len(s.strip()) == 0 + +def get_child_as_text(parent: ET.Element, tag: str) -> str: + """Get child node text as string, or None if not found.""" + node = parent.find(tag) + return node.text if node is not None else None diff --git a/custom_components/xmltv_epg/model/program.py b/custom_components/xmltv_epg/model/program.py new file mode 100644 index 0000000..c4520a2 --- /dev/null +++ b/custom_components/xmltv_epg/model/program.py @@ -0,0 +1,150 @@ +"""TV Program Model.""" +from datetime import datetime, timedelta +import xml.etree.ElementTree as ET + +from .helper import is_none_or_whitespace, get_child_as_text + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from .channel import TVChannel + +class TVProgram: + """TV Program Class.""" + + TAG = 'programme' + + def __init__(self, + channel_id: str, + start: datetime, + end: datetime, + title: str, + description: str, + episode: str = None, + subtitle: str = None): + """Initialize TV Program.""" + if end <= start: + raise ValueError('End time must be after start time.') + + self._channel_id = channel_id + self.start = start + self.end = end + self.title = title + self.description = description + self.episode = episode + self.subtitle = subtitle + + self.channel = None + + def cross_link_channel(self, channels: list['TVChannel']): + """Set channel for program and cross-link. + + :param channels: List of TV Channels + """ + # find channel by id + channel = next((c for c in channels if c.id == self._channel_id), None) + if channel is None: + raise ValueError(f'Channel with ID "{self._channel_id}" not found.') + + # cross-link + self.channel = channel + self.channel.add_program(self) + + @property + def duration(self) -> timedelta: + """Get program duration.""" + return self.end - self.start + + @property + def full_title(self) -> str: + """Get the full title, including episode and / or subtitle, if available. + + Examples: + (1) + Title: 'Program 1' + Episode: None + Subtitle: None + Result: 'Program 1' + + (2) + Title: 'Program 1' + Episode: 'S1 E1' + Subtitle: None + Result: 'Program 1 (S1 E1)' + + (3) + Title: 'Program 1' + Episode: 'S1 E1' + Subtitle: 'Subtitle 1' + Result: 'Program 1 - Subtitle 1 (S1 E1)' + + (4) + Title: 'Program 1' + Episode: None + Subtitle: 'Subtitle 1' + Result: 'Program 1 - Subtitle 1' + + """ + title = self.title + + if not is_none_or_whitespace(self.subtitle): + title += f' - {self.subtitle}' + + if not is_none_or_whitespace(self.episode): + title += f' ({self.episode})' + + return title + + + @classmethod + def from_xml(cls, xml: ET.Element) -> 'TVProgram': + """Initialize TV Program from XML Node, if possible. + + Cross-link is not done here, call cross_link_channel() after all programs are created. + + :param xml: XML Node + :return: TV Program object, or None + + XML node format is: + + WDR aktuell + vom 17.05.2024, 12:45 Uhr + Das Sendung bietet Nachrichten für und aus Nordrhein-Westfalen im Magazinformat. + S5 E34 + + """ + + # node must be a program + if xml.tag != cls.TAG: + return None + + # get start and end times + start = xml.attrib.get('start') + end = xml.attrib.get('stop') + if is_none_or_whitespace(start) or is_none_or_whitespace(end): + return None + + # parse start and end times + try: + start = datetime.strptime(start, '%Y%m%d%H%M%S %z') + end = datetime.strptime(end, '%Y%m%d%H%M%S %z') + except ValueError: + return None + + # get channel id + channel_id = xml.attrib.get('channel') + if is_none_or_whitespace(channel_id): + return None + + # get and validate program info + title = get_child_as_text(xml, 'title') + description = get_child_as_text(xml, 'desc') + episode = get_child_as_text(xml, 'episode-num') + subtitle = get_child_as_text(xml, 'sub-title') + + if is_none_or_whitespace(title) or is_none_or_whitespace(description): + return None + + try: + return cls(channel_id, start, end, title, description, episode, subtitle) + except ValueError: + return None diff --git a/custom_components/xmltv_epg/sensor.py b/custom_components/xmltv_epg/sensor.py index 1a5265e..874a260 100644 --- a/custom_components/xmltv_epg/sensor.py +++ b/custom_components/xmltv_epg/sensor.py @@ -12,7 +12,7 @@ from .coordinator import XMLTVDataUpdateCoordinator from .entity import XMLTVEntity -from .xmltv.model import TVGuide, TVChannel +from .model import TVGuide, TVChannel async def async_setup_entry(hass, entry, async_add_devices): """Set up the sensor platform.""" diff --git a/custom_components/xmltv_epg/xmltv/__init__.py b/custom_components/xmltv_epg/xmltv/__init__.py deleted file mode 100644 index 739dac6..0000000 --- a/custom_components/xmltv_epg/xmltv/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""XMLTV Client.""" diff --git a/custom_components/xmltv_epg/xmltv/model.py b/custom_components/xmltv_epg/xmltv/model.py deleted file mode 100644 index f99a99b..0000000 --- a/custom_components/xmltv_epg/xmltv/model.py +++ /dev/null @@ -1,287 +0,0 @@ -"""XMLTV Model Classes & Parsing.""" - -from datetime import datetime, timedelta -import xml.etree.ElementTree as ET - -def is_none_or_whitespace(s: str) -> bool: - """Check if string is None, empty, or whitespace.""" - return s is None or not isinstance(s, str) or len(s.strip()) == 0 - -def get_child_as_text(parent: ET.Element, tag: str) -> str: - """Get child node text as string, or None if not found.""" - node = parent.find(tag) - return node.text if node is not None else None - -class TVChannel: - """TV Channel Class.""" - - TAG = 'channel' - - def __init__(self, id: str, name: str): - """Initialize TV Channel.""" - self.id = id - self.name = name - self.programs = [] - - def add_program(self, program: 'TVProgram'): - """Add a program to channel.""" - self.programs.append(program) - - # keep programs sorted by start time - self.programs.sort(key=lambda p: p.start) - - def get_current_program(self, time: datetime) -> 'TVProgram': - """Get current program at given time.""" - for program in self.programs: - if program.start.timestamp() <= time.timestamp() < program.end.timestamp(): - return program - - return None - - def get_next_program(self, time: datetime) -> 'TVProgram': - """Get next program after given time.""" - for program in self.programs: - if program.start.timestamp() >= time.timestamp(): - return program - - return None - - @classmethod - def from_xml(cls, xml: ET.Element) -> 'TVChannel': - """Initialize TV Channel from XML Node, if possible. - - :param xml: XML Node - :return: TV Channel object, or None - - XML node format is: - - WDR Essen - - """ - - # node must be a channel - if xml.tag != cls.TAG: - return None - - # get id and display name - id = xml.attrib.get('id') - if is_none_or_whitespace(id): - return None - - name = get_child_as_text(xml, 'display-name') - if is_none_or_whitespace(name): - return None - - # remove 'XX: ' prefix from name, if present - if len(name) > 4 and name[2] == ':' and name[3] == ' ': # 'XX: ' - name = name[4:] - - return cls(id, name) - -class TVProgram: - """TV Program Class.""" - - TAG = 'programme' - - def __init__(self, - channel_id: str, - start: datetime, - end: datetime, - title: str, - description: str, - episode: str = None, - subtitle: str = None): - """Initialize TV Program.""" - if end <= start: - raise ValueError('End time must be after start time.') - - self._channel_id = channel_id - self.start = start - self.end = end - self.title = title - self.description = description - self.episode = episode - self.subtitle = subtitle - - self.channel = None - - def cross_link_channel(self, channels: list[TVChannel]): - """Set channel for program and cross-link. - - :param channels: List of TV Channels - """ - # find channel by id - channel = next((c for c in channels if c.id == self._channel_id), None) - if channel is None: - raise ValueError(f'Channel with ID "{self._channel_id}" not found.') - - # cross-link - self.channel = channel - self.channel.add_program(self) - - @property - def duration(self) -> timedelta: - """Get program duration.""" - return self.end - self.start - - @property - def full_title(self) -> str: - """Get the full title, including episode and / or subtitle, if available. - - Examples: - (1) - Title: 'Program 1' - Episode: None - Subtitle: None - Result: 'Program 1' - - (2) - Title: 'Program 1' - Episode: 'S1 E1' - Subtitle: None - Result: 'Program 1 (S1 E1)' - - (3) - Title: 'Program 1' - Episode: 'S1 E1' - Subtitle: 'Subtitle 1' - Result: 'Program 1 - Subtitle 1 (S1 E1)' - - (4) - Title: 'Program 1' - Episode: None - Subtitle: 'Subtitle 1' - Result: 'Program 1 - Subtitle 1' - - """ - title = self.title - - if not is_none_or_whitespace(self.subtitle): - title += f' - {self.subtitle}' - - if not is_none_or_whitespace(self.episode): - title += f' ({self.episode})' - - return title - - - @classmethod - def from_xml(cls, xml: ET.Element) -> 'TVProgram': - """Initialize TV Program from XML Node, if possible. - - Cross-link is not done here, call cross_link_channel() after all programs are created. - - :param xml: XML Node - :return: TV Program object, or None - - XML node format is: - - WDR aktuell - vom 17.05.2024, 12:45 Uhr - Das Sendung bietet Nachrichten für und aus Nordrhein-Westfalen im Magazinformat. - S5 E34 - - """ - - # node must be a program - if xml.tag != cls.TAG: - return None - - # get start and end times - start = xml.attrib.get('start') - end = xml.attrib.get('stop') - if is_none_or_whitespace(start) or is_none_or_whitespace(end): - return None - - # parse start and end times - try: - start = datetime.strptime(start, '%Y%m%d%H%M%S %z') - end = datetime.strptime(end, '%Y%m%d%H%M%S %z') - except ValueError: - return None - - # get channel id - channel_id = xml.attrib.get('channel') - if is_none_or_whitespace(channel_id): - return None - - # get and validate program info - title = get_child_as_text(xml, 'title') - description = get_child_as_text(xml, 'desc') - episode = get_child_as_text(xml, 'episode-num') - subtitle = get_child_as_text(xml, 'sub-title') - - if is_none_or_whitespace(title) or is_none_or_whitespace(description): - return None - - try: - return cls(channel_id, start, end, title, description, episode, subtitle) - except ValueError: - return None - -class TVGuide: - """TV Guide Class.""" - - TAG = 'tv' - - def __init__(self, generator_name: str = None, generator_url: str = None): - """Initialize TV Guide.""" - self.generator_name = generator_name - self.generator_url = generator_url - - self.channels = [] - self.programs = [] - - def get_channel(self, channel_id: str) -> TVChannel: - """Get channel by ID.""" - return next((c for c in self.channels if c.id == channel_id), None) - - @classmethod - def from_xml(cls, xml: ET.Element) -> 'TVGuide': - """Initialize TV Guide from XML Node, if possible. - - :param xml: XML Node - :return: TV Guide object, or None - - XML node format is: - - - - - """ - - # node must be a TV guide - if xml.tag != cls.TAG: - return None - - # parse generator info - generator_name = xml.attrib.get('generator-info-name') - generator_url = xml.attrib.get('generator-info-url') - - # create guide instance - guide = cls(generator_name, generator_url) - - # parse channels and programs - for child in xml: - if child.tag == TVChannel.TAG: - channel = TVChannel.from_xml(child) - if channel is not None: - # ensure no duplicate channel ids - if guide.get_channel(channel.id) is None: - guide.channels.append(channel) - else: - # ?! - continue - elif child.tag == TVProgram.TAG: - program = TVProgram.from_xml(child) - if program is not None: - guide.programs.append(program) - else: - # ?! - continue - - # cross-link programs with channels - for program in guide.programs: - program.cross_link_channel(guide.channels) - - return guide diff --git a/test/conftest.py b/test/conftest.py index eae91e6..9a6e0f5 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -4,10 +4,10 @@ pytest_plugins = "pytest_homeassistant_custom_component" -@pytest.fixture(autouse=True) -def auto_enable_custom_integrations(enable_custom_integrations): - """Enable loading custom integrations.""" - yield +@pytest.fixture() +def hass(hass, enable_custom_integrations): + """Return a Home Assistant instance that can load custom integrations.""" + yield hass @pytest.fixture def anyio_backend(): diff --git a/test/xmltv/__init__.py b/test/model/__init__.py similarity index 100% rename from test/xmltv/__init__.py rename to test/model/__init__.py diff --git a/test/xmltv/test_TVChannel.py b/test/model/test_TVChannel.py similarity index 96% rename from test/xmltv/test_TVChannel.py rename to test/model/test_TVChannel.py index b71ac74..3fef6f5 100644 --- a/test/xmltv/test_TVChannel.py +++ b/test/model/test_TVChannel.py @@ -3,7 +3,7 @@ from datetime import datetime import xml.etree.ElementTree as ET -from custom_components.xmltv_epg.xmltv.model import TVChannel, TVProgram +from custom_components.xmltv_epg.model import TVChannel, TVProgram def test_from_xml(): """Test TVChannel.from_xml method with valid input.""" diff --git a/test/xmltv/test_TVGuide.py b/test/model/test_TVGuide.py similarity index 94% rename from test/xmltv/test_TVGuide.py rename to test/model/test_TVGuide.py index 13e05be..9ce0122 100644 --- a/test/xmltv/test_TVGuide.py +++ b/test/model/test_TVGuide.py @@ -2,7 +2,7 @@ import xml.etree.ElementTree as ET -from custom_components.xmltv_epg.xmltv.model import TVGuide, TVChannel +from custom_components.xmltv_epg.model import TVGuide, TVChannel def test_from_xml(): diff --git a/test/xmltv/test_TVProgram.py b/test/model/test_TVProgram.py similarity index 98% rename from test/xmltv/test_TVProgram.py rename to test/model/test_TVProgram.py index 4064f32..7f3be04 100644 --- a/test/xmltv/test_TVProgram.py +++ b/test/model/test_TVProgram.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone import xml.etree.ElementTree as ET -from custom_components.xmltv_epg.xmltv.model import TVProgram, TVChannel +from custom_components.xmltv_epg.model import TVProgram, TVChannel def test_from_xml(): """Test TVProgram.from_xml method with valid input.""" diff --git a/test/test_config_flow.py b/test/test_config_flow.py index 9e29de0..e12be17 100644 --- a/test/test_config_flow.py +++ b/test/test_config_flow.py @@ -16,7 +16,7 @@ ) from custom_components.xmltv_epg.api import XMLTVClientCommunicationError, XMLTVClientError -from custom_components.xmltv_epg.xmltv.model import TVGuide +from custom_components.xmltv_epg.model import TVGuide XMLTV_CLIENT_DATA = TVGuide("MOCK", "http://example.com/epg.xml") diff --git a/test/test_coordinator.py b/test/test_coordinator.py index eedef13..0bfb53b 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -10,7 +10,7 @@ from custom_components.xmltv_epg.coordinator import XMLTVDataUpdateCoordinator from custom_components.xmltv_epg.api import XMLTVClient -from custom_components.xmltv_epg.xmltv.model import TVGuide +from custom_components.xmltv_epg.model import TVGuide TEST_NOW = datetime(2024, 5, 17, 12, 45, 0) XMLTV_CLIENT_DATA = TVGuide("MOCK", "http://example.com/epg.xml") diff --git a/test/test_init.py b/test/test_init.py index dbbb86b..d2d15a0 100644 --- a/test/test_init.py +++ b/test/test_init.py @@ -14,7 +14,7 @@ from custom_components.xmltv_epg.const import DOMAIN from custom_components.xmltv_epg.coordinator import XMLTVDataUpdateCoordinator -from custom_components.xmltv_epg.xmltv.model import TVGuide +from custom_components.xmltv_epg.model import TVGuide UPDATE_COORDINATOR_DATA = TVGuide("MOCK", "http://example.com/epg.xml") diff --git a/test/test_sensor.py b/test/test_sensor.py index 08d6418..280efa7 100644 --- a/test/test_sensor.py +++ b/test/test_sensor.py @@ -11,7 +11,7 @@ from custom_components.xmltv_epg import async_setup_entry from custom_components.xmltv_epg.const import DOMAIN, OPT_PROGRAM_LOOKAHEAD -from custom_components.xmltv_epg.xmltv.model import TVGuide, TVChannel, TVProgram +from custom_components.xmltv_epg.model import TVGuide, TVChannel, TVProgram TEST_NOW = datetime(2024, 5, 17, 12, 45, 0)