Skip to content

Commit

Permalink
Merge pull request #200 from kytos-ng/feat/alien_pacing
Browse files Browse the repository at this point in the history
Pace alien flow deletion + bump telemetry_int pacing + version bump to 2024.1.2
  • Loading branch information
Ktmi authored Sep 6, 2024
2 parents 3e8b02c + 48b1b50 commit c791712
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 47 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ file.
[UNRELEASED] - Under development
********************************

[2024.1.2] - 2024-09-06
***********************

Changed
=======
- Changed alien flows to have ``alien`` as owner. Alien flow deletion is now paced by ``send_flow_mod.alien``.
- Increased ``telemetry_int`` pacing rate to 300/second

[2024.1.1] - 2024-08-30
***********************

Expand Down
2 changes: 1 addition & 1 deletion kytos.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"username": "kytos",
"name": "flow_manager",
"description": "Manage switches' flows through a REST API.",
"version": "2024.1.1",
"version": "2024.1.2",
"napp_dependencies": ["kytos/of_core"],
"license": "MIT",
"url": "https://github.com/kytos/flow_manager.git",
Expand Down
87 changes: 53 additions & 34 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
build_cookie_range_tuple,
build_flow_mod_from_command,
cast_fields,
flows_to_log_info,
flows_to_log,
get_min_wait_diff,
is_ignored,
map_cookie_list_as_tuples,
Expand Down Expand Up @@ -381,22 +381,32 @@ def check_missing_flows(self, switch, verdict_dt: Optional[datetime] = None):
verdict_dt = datetime.utcnow() if not verdict_dt else verdict_dt
dpid = switch.dpid
flows = self.switch_flows_by_id(switch, self.is_not_ignored_flow)
for flow in self.flow_controller.get_flows_lte_updated_at(
switch.id, verdict_dt
):
if flow["flow_id"] not in flows:
log.info(f"Consistency check: missing flow on switch {dpid}.")
flow = {"flows": [flow["flow"]]}
try:
self._install_flows("add", flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be installed. Flow: {flow}"
)
except SwitchNotConnectedError:
log.error(
f"Failed to forward flow to switch {dpid} to be installed. "
f"Flow: {flow}"
)
missing_flows = [
flow["flow"]
for flow in self.flow_controller.get_flows_lte_updated_at(
switch.id, verdict_dt
)
if flow["flow_id"] not in flows
]

if missing_flows:
log.info(
f"Consistency check: missing {len(missing_flows)} flows on switch {dpid}."
)
flow_dict = {"flows": missing_flows}
try:
self._install_flows("add", flow_dict, [switch], save=False)
flows_to_log(
log.info,
f"Flows forwarded to switch {dpid} to be installed. ",
flow_dict,
)
except SwitchNotConnectedError:
flows_to_log(
log.error,
f"Failed to forward flows to switch {dpid} to be installed. ",
flow_dict,
)

def check_alien_flows(self, switch, verdict_dt: Optional[datetime] = None):
"""Check alien flows on a switch and delete them."""
Expand All @@ -418,6 +428,7 @@ def check_alien_flows(self, switch, verdict_dt: Optional[datetime] = None):

verdict_dt = datetime.utcnow() if not verdict_dt else verdict_dt
flows = self.switch_flows_by_id(switch, self.is_not_ignored_flow)
alien_flows = []
for flow_id, flow in flows.items():
if flow_id not in stored_by_flow_id:
if (
Expand All @@ -431,22 +442,28 @@ def check_alien_flows(self, switch, verdict_dt: Optional[datetime] = None):
and deleted_by_flow_id[flow.id]["updated_at"] >= verdict_dt
):
continue
alien_flows.append({**flow.as_dict(), "owner": "alien"})

log.info(f"Consistency check: alien flow on switch {dpid}")
flow = {"flows": [flow.as_dict()]}
command = "delete_strict"
try:
self._install_flows(command, flow, [switch], save=False)
log.info(
f"Flow forwarded to switch {dpid} to be deleted. "
f"Flow: {flow}"
)
continue
except SwitchNotConnectedError:
log.error(
f"Failed to forward flow to switch {dpid} to be deleted. "
f"Flow: {flow}"
)
command = "delete_strict"
if alien_flows:
log.info(
f"Consistency check: {len(alien_flows)} alien flows on switch {dpid}"
)
flow_dict = {"flows": alien_flows}
try:

self._install_flows(command, flow_dict, [switch], save=False)
flows_to_log(
log.info,
f"Flows forwarded to switch {dpid} to be deleted. ",
flow_dict,
)
except SwitchNotConnectedError:
flows_to_log(
log.error,
f"Failed to forward flows to switch {dpid} to be deleted. ",
flow_dict,
)

def delete_matched_flows(self, flow_dicts, switches: dict) -> None:
"""Try to delete many matched stored flows given flow_dicts for switches.
Expand Down Expand Up @@ -624,7 +641,8 @@ def handle_flows_install_delete(self, event):
log.error(f"Switch dpid {dpid} was not found.")
return

flows_to_log_info(
flows_to_log(
log.info,
f"Send FlowMod from KytosEvent dpid: {dpid}, command: {command}, "
f"force: {force}, ",
flow_dict,
Expand Down Expand Up @@ -690,7 +708,8 @@ def _send_flow_mods_from_request(
raise HTTPException(400, detail=str(exc))

force = bool(flows_dict.get("force", False))
flows_to_log_info(
flows_to_log(
log.info,
f"Send FlowMod from request dpid: {dpid}, command: {command}, "
f"force: {force}, ",
flows_dict,
Expand Down
6 changes: 5 additions & 1 deletion settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"strategy": "fixed_window",
},
"send_flow_mod.telemetry_int": {
"pace": "100/second",
"pace": "300/second",
"strategy": "fixed_window",
},
"send_flow_mod.of_lldp": {
Expand All @@ -51,4 +51,8 @@
"pace": "100/second",
"strategy": "fixed_window",
},
"send_flow_mod.alien": {
"pace": "500/second",
"strategy": "fixed_window",
},
}
4 changes: 2 additions & 2 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def test_event_add_flow(self, monkeypatch, ev_name):
"""Test method for installing flows on the switches through events."""
mock_install_flows, mock_flows_log = MagicMock(), MagicMock()
monkeypatch.setattr(
"napps.kytos.flow_manager.main.flows_to_log_info", mock_flows_log
"napps.kytos.flow_manager.main.flows_to_log", mock_flows_log
)
monkeypatch.setattr(
"napps.kytos.flow_manager.main.Main._install_flows", mock_install_flows
Expand Down Expand Up @@ -601,7 +601,7 @@ def test_event_flows_install_delete(self, monkeypatch, ev_name):
"""Test method for removing flows on the switches through events."""
mock_install_flows, mock_flows_log = MagicMock(), MagicMock()
monkeypatch.setattr(
"napps.kytos.flow_manager.main.flows_to_log_info", mock_flows_log
"napps.kytos.flow_manager.main.flows_to_log", mock_flows_log
)
monkeypatch.setattr(
"napps.kytos.flow_manager.main.Main._install_flows", mock_install_flows
Expand Down
12 changes: 6 additions & 6 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Module to test the utils module."""

from datetime import timedelta
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock

import pytest
from napps.kytos.flow_manager.exceptions import InvalidCommandError
Expand All @@ -16,7 +16,7 @@
merge_cookie_ranges,
validate_cookies_add,
validate_cookies_del,
flows_to_log_info,
flows_to_log,
)
from pyof.v0x04.controller2switch.flow_mod import FlowModCommand

Expand Down Expand Up @@ -258,9 +258,9 @@ def test_get_min_wait_diff(self, dt_t2, dt_t1, min_wait):
== min_wait - (dt_t2 - dt_t1).total_seconds()
)

@patch("napps.kytos.flow_manager.utils.log")
def test_flows_to_log_info(self, mock_log):
"""Test flows_to_log_info"""
def test_flows_to_log(self):
"""Test flows_to_log"""
mock_log = MagicMock()
flows = {"flows": list(range(500))}
flows_to_log_info("", flows)
flows_to_log(mock_log.info, "", flows)
assert mock_log.info.call_count == 3
8 changes: 5 additions & 3 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""kytos/flow_manager utils."""

from collections.abc import Callable

from pyof.foundation.base import UBIntBase
from pyof.v0x04.controller2switch.flow_mod import FlowModCommand

Expand Down Expand Up @@ -187,16 +189,16 @@ def validate_cookies_del(flows: list[dict]) -> None:
)


def flows_to_log_info(message: str, flow_dict: dict[str, list]) -> None:
def flows_to_log(logger_fun: Callable, message: str, flow_dict: dict[str, list]):
"""Log flows, maximun flows in a log is 200"""
length_msg = f"total_length: {len(flow_dict['flows'])}, "
maximun = 200
flows_n = len(flow_dict["flows"])
i, j = 0, maximun
while flow_dict["flows"][i:j]:
log.info(
logger_fun(
f"{message}{length_msg} flows[{i}, {(j if j < flows_n else flows_n)}]:"
f" {flow_dict['flows'][i:j]}"
)
i, j = i + maximun, j + maximun
i, j = j, j + maximun
length_msg = ""

0 comments on commit c791712

Please sign in to comment.