diff --git a/kytos/core/exceptions.py b/kytos/core/exceptions.py index 85d48507..47a66150 100644 --- a/kytos/core/exceptions.py +++ b/kytos/core/exceptions.py @@ -114,22 +114,15 @@ def __init__(self, msg: str) -> None: class KytosTagsNotInTagRanges(KytosTagError): """Exception thrown when tags are outside of tag ranges""" def __init__(self, conflict: list[list[int]], intf_id: str) -> None: - msg = f"The tags {conflict} are outside tag_ranges in {intf_id}" - super().__init__(f"KytosSetTagRangeError, {msg}") + msg = f"The tags {conflict} are outside tag ranges in {intf_id}" + super().__init__(f"KytosTagsNotInTagRanges, {msg}") class KytosTagsAreNotAvailable(KytosTagError): """Exception thrown when tags are not available.""" def __init__(self, conflict: list[list[int]], intf_id: str) -> None: msg = f"The tags {conflict} are not available in {intf_id}" - super().__init__(f"KytosSetTagRangeError, {msg}") - - -class KytosSpecialTagNotAvailable(KytosTagError): - """Exception thrown when an special tag is not available.""" - def __init__(self, conflict: str, intf_id: str) -> None: - msg = f"The special tag '{conflict}' is not available in {intf_id}" - super().__init__(f"KytosSetTagRangeError, {msg}") + super().__init__(f"KytosTagsAreNotAvailable, {msg}") # Exceptions related to NApps diff --git a/kytos/core/interface.py b/kytos/core/interface.py index 82e3e83c..5ac156c4 100644 --- a/kytos/core/interface.py +++ b/kytos/core/interface.py @@ -17,15 +17,14 @@ from kytos.core.common import EntityStatus, GenericEntity from kytos.core.events import KytosEvent from kytos.core.exceptions import (KytosSetTagRangeError, - KytosSpecialTagNotAvailable, KytosTagsAreNotAvailable, KytosTagsNotInTagRanges, KytosTagtypeNotSupported) from kytos.core.helpers import now from kytos.core.id import InterfaceID from kytos.core.tag_ranges import (find_index_add, find_index_remove, - get_validated_tags, range_addition, - range_difference) + get_special_tag_range, get_validated_tags, + range_addition, range_difference) __all__ = ('Interface',) @@ -156,9 +155,11 @@ def __init__(self, name, port_number, switch, address=None, state=None, self._tag_lock = Lock() self.available_tags = {'vlan': self.default_tag_values['vlan']} self.tag_ranges = {'vlan': self.default_tag_values['vlan']} - self.special_available_tags = {'vlan': ['untagged', 'any']} + self.special_available_tags = {'vlan': self.default_special_tags} + self.special_tag_range = {'vlan': self.default_special_tags} self.set_available_tags_tag_ranges( - self.available_tags, self.tag_ranges, self.special_available_tags + self.available_tags, self.tag_ranges, + self.special_available_tags, self.special_tag_range ) super().__init__() @@ -235,6 +236,12 @@ def default_tag_values(self) -> dict[str, list[list[int]]]: } return default_values + @property + def default_special_tags(self) -> list[str]: + """Reurn a default list of special tags. Applicable to + special_available_tags and special_tag_range.""" + return ["untagged", "any"] + def set_tag_ranges(self, tag_ranges: list[list[int]], tag_type: str): """Set new restriction, tag_ranges.""" if tag_type != TAGType.VLAN.value: @@ -271,6 +278,40 @@ def remove_tag_ranges(self, tag_type: str): ) self.tag_ranges[tag_type] = self.default_tag_values[tag_type] + def set_special_tag_ranges( + self, + special_tag_range: list[str], + tag_type: str + ): + """Set new restriction, special_tag_range""" + # Verify values in special_tag_range + tag_range = get_special_tag_range( + special_tag_range, self.default_special_tags + ) + + if tag_type != TAGType.VLAN.value: + msg = f"Tag type {tag_type} is not supported." + raise KytosTagtypeNotSupported(msg) + old_special_set = set(self.special_tag_range[tag_type]) + + for tag in self.special_available_tags[tag_type]: + old_special_set.remove(tag) # Get special used tags + used_special = old_special_set.copy() + + for tag in tag_range: + used_special.discard(tag) + + # Missing used special used tags + if used_special: + msg = f"Missing tags in tag_range: {used_special}" + raise KytosSetTagRangeError(msg) + + new_special_available = set(tag_range) + self.special_available_tags[tag_type] = list( + new_special_available - old_special_set + ) + self.special_tag_range[tag_type] = tag_range + def _remove_tags(self, tags: list[int], tag_type: str = 'vlan') -> bool: """Remove tags by resizing available_tags Returns False if nothing was remove, True otherwise""" @@ -321,6 +362,9 @@ def use_tags( Exceptions: KytosTagsAreNotAvailable from _use_tags() """ + if tag_type != TAGType.VLAN.value: + msg = f"Tag type {tag_type} is not supported." + raise KytosTagtypeNotSupported(msg) if isinstance(tags, int): tags = [tags] * 2 elif check_order and not isinstance(tags, str): @@ -355,7 +399,7 @@ def _use_tags( try: self.special_available_tags[tag_type].remove(tags) except ValueError: - raise KytosSpecialTagNotAvailable(tags, self.id) + raise KytosTagsAreNotAvailable(tags, self.id) # pylint: disable=too-many-branches def _add_tags(self, tags: list[int], tag_type: str = 'vlan') -> bool: @@ -441,6 +485,9 @@ def make_tags_available( Exeptions: KytosTagsNotInTagRanges from _make_tags_available() """ + if tag_type != TAGType.VLAN.value: + msg = f"Tag type {tag_type} is not supported." + raise KytosTagtypeNotSupported(msg) if isinstance(tags, int): tags = [tags] * 2 elif check_order and not isinstance(tags, str): @@ -478,6 +525,8 @@ def _make_tags_available( if result is False: return [tags] if isinstance(tags, str): + if tags not in self.special_tag_range[tag_type]: + raise KytosTagsNotInTagRanges(tags, self._id) if tags not in self.special_available_tags[tag_type]: self.special_available_tags[tag_type].append(tags) return None @@ -488,7 +537,8 @@ def set_available_tags_tag_ranges( self, available_tag: dict[str, list[list[int]]], tag_ranges: dict[str, list[list[int]]], - special_available_tags: dict[str, list[str]] + special_available_tags: dict[str, list[str]], + special_tag_range: dict[str, list[str]] ): """Set a range of VLAN tags to be used by this Interface. @@ -500,6 +550,7 @@ def set_available_tags_tag_ranges( self.available_tags = available_tag self.tag_ranges = tag_ranges self.special_available_tags = special_available_tags + self.special_tag_range = special_tag_range def enable(self): """Enable this interface instance. diff --git a/kytos/core/tag_ranges.py b/kytos/core/tag_ranges.py index 23ba26c7..13241c44 100644 --- a/kytos/core/tag_ranges.py +++ b/kytos/core/tag_ranges.py @@ -7,6 +7,24 @@ from kytos.core.exceptions import KytosInvalidTagRanges +def get_special_tag_range(tag_range: list[str], default) -> list[str]: + """Get special_tag_range and check values""" + # Find duplicated + if len(tag_range) != len(set(tag_range)): + msg = "There are duplicated values in the range." + raise KytosInvalidTagRanges(msg) + + # Find invalid tag + default_set = set(default) + for tag in tag_range: + try: + default_set.remove(tag) + except KeyError: + msg = f"The tag {tag} is not supported" + raise KytosInvalidTagRanges(msg) + return tag_range + + def map_singular_values(tag_range: Union[int, list[int]]): """Change integer or singular interger list to list[int, int] when necessary""" diff --git a/tests/unit/test_core/test_interface.py b/tests/unit/test_core/test_interface.py index 10956d1b..77059417 100644 --- a/tests/unit/test_core/test_interface.py +++ b/tests/unit/test_core/test_interface.py @@ -158,25 +158,24 @@ async def test_interface_available_tags_tag_ranges(self): default_available = {'vlan': [[1, 4095]]} default_tag_ranges = {'vlan': [[1, 4095]]} default_special_vlans = {'vlan': ["untagged", "any"]} - intf_available = self.iface.available_tags - intf_tag_ranges = self.iface.tag_ranges - intf_special_vlans = self.iface.special_available_tags - assert intf_available == default_available - assert intf_tag_ranges == default_tag_ranges - assert intf_special_vlans == default_special_vlans + default_special_tag_range = {'vlan': ["untagged", "any"]} + assert self.iface.available_tags == default_available + assert self.iface.tag_ranges == default_tag_ranges + assert self.iface.special_available_tags == default_special_vlans + assert self.iface.special_tag_range == default_special_tag_range custom_available = {'vlan': [[10, 200], [210, 4095]]} custom_tag_ranges = {'vlan': [[1, 100], [200, 4095]]} custom_special_vlans = {'vlan': ["any"]} + custom_special_tag_range = {'vlan': ["any"]} self.iface.set_available_tags_tag_ranges( - custom_available, custom_tag_ranges, custom_special_vlans + custom_available, custom_tag_ranges, + custom_special_vlans, custom_special_tag_range ) - intf_available = self.iface.available_tags - intf_tag_ranges = self.iface.tag_ranges - intf_special_vlans = self.iface.special_available_tags - assert intf_available == custom_available - assert intf_tag_ranges == custom_tag_ranges - assert intf_special_vlans == custom_special_vlans + assert self.iface.available_tags == custom_available + assert self.iface.tag_ranges == custom_tag_ranges + assert self.iface.special_available_tags == custom_special_vlans + assert self.iface.special_tag_range == custom_special_tag_range async def test_interface_is_tag_available(self): """Test is_tag_available on Interface class.""" @@ -346,10 +345,12 @@ async def test_make_tags_available(self, controller) -> None: """Test make_tags_available""" available = {'vlan': [[300, 3000]]} tag_ranges = {'vlan': [[20, 20], [200, 3000]]} - special_available_tags = {'vlan': ["untagged"]} + special_available_tags = {'vlan': []} + special_tag_range = {'vlan': ["any"]} self.iface._notify_interface_tags = MagicMock() self.iface.set_available_tags_tag_ranges( - available, tag_ranges, special_available_tags + available, tag_ranges, + special_available_tags, special_tag_range ) assert self.iface.available_tags == available assert self.iface.tag_ranges == tag_ranges @@ -370,6 +371,9 @@ async def test_make_tags_available(self, controller) -> None: assert self.iface.make_tags_available(controller, 300) == [[300, 300]] + with pytest.raises(KytosTagsNotInTagRanges): + self.iface.make_tags_available(controller, "untagged") + assert self.iface.make_tags_available(controller, "any") is None assert "any" in self.iface.special_available_tags["vlan"] assert self.iface.make_tags_available(controller, "any") == "any" @@ -410,6 +414,7 @@ async def test_remove_tags(self) -> None: self.iface.set_available_tags_tag_ranges( {'vlan': available_tag}, {'vlan': tag_ranges}, + {'vlan': ["untagged", "any"]}, {'vlan': ["untagged", "any"]} ) ava_expected = [[20, 20], [241, 3000]] @@ -433,7 +438,8 @@ async def test_remove_tags_empty(self) -> None: parameters = { "available_tag": {'vlan': available_tag}, "tag_ranges": {'vlan': tag_ranges}, - "special_available_tags": {'vlan': ["untagged", "any"]} + "special_available_tags": {'vlan': ["untagged", "any"]}, + "special_tag_range": {'vlan': ["untagged", "any"]} } self.iface.set_available_tags_tag_ranges(**parameters) assert self.iface._remove_tags([4, 6]) is False @@ -446,7 +452,8 @@ async def test_add_tags(self) -> None: parameters = { "available_tag": {'vlan': available_tag}, "tag_ranges": {'vlan': tag_ranges}, - "special_available_tags": {'vlan': ["untagged", "any"]} + "special_available_tags": {'vlan': ["untagged", "any"]}, + "special_tag_range": {'vlan': ["untagged", "any"]} } self.iface.set_available_tags_tag_ranges(**parameters) ava_expected = [[4, 10], [20, 30]] @@ -490,7 +497,8 @@ async def test_add_tags_empty(self) -> None: parameters = { "available_tag": {'vlan': available_tag}, "tag_ranges": {'vlan': tag_ranges}, - "special_available_tags": {'vlan': ["untagged", "any"]} + "special_available_tags": {'vlan': ["untagged", "any"]}, + "special_tag_range": {'vlan': ["untagged", "any"]} } self.iface.set_available_tags_tag_ranges(**parameters) assert self.iface._add_tags([4, 6])