Skip to content

Commit

Permalink
feat: introduced buffers.meta for general core control events
Browse files Browse the repository at this point in the history
  • Loading branch information
viniarck committed Jan 17, 2024
1 parent 42019eb commit 5fefcb3
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 12 deletions.
8 changes: 6 additions & 2 deletions kytos/core/buffers/buffers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Kytos Buffer Classes, based on Python Queue."""
import logging
from typing import Optional

from janus import Queue

Expand All @@ -22,7 +23,7 @@ def __init__(self, name, queue: Queue = None):
self._queue = queue if queue is not None else Queue()
self._reject_new_events = False

def put(self, event):
def put(self, event, timeout: Optional[float] = None):
"""Insert an event in KytosEventBuffer if reject_new_events is False.
Reject new events is True when a kytos/core.shutdown message was
Expand All @@ -31,9 +32,12 @@ def put(self, event):
Args:
event (:class:`~kytos.core.events.KytosEvent`):
KytosEvent sent to queue.
timeout: Block if necessary until a free slot is available.
If 'timeout' is a non-negative number, it blocks at most 'timeout'
seconds and raises an Full exception if no free slot was available.
"""
if not self._reject_new_events:
self._queue.sync_q.put(event)
self._queue.sync_q.put(event, timeout=timeout)
LOG.debug('[buffer: %s] Added: %s', self.name, event.name)

if event.name == "kytos/core.shutdown":
Expand Down
5 changes: 5 additions & 0 deletions kytos/core/buffers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def __init__(self):
:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
sent to NApps.
:attr:`meta`: :class:`~kytos.core.buffers.KytosEventBuffer` with
core related events sent to NApps. This is meant for general core
control events.
"""

self._pool_max_workers = get_thread_pool_max_workers()
Expand All @@ -54,6 +58,7 @@ def __init__(self):
"app",
queue=Queue(maxsize=self._get_maxsize("app")),
)
self.meta = KytosEventBuffer("meta")

buffer_conf = KytosConfig().options['daemon'].event_buffer_conf

Expand Down
16 changes: 9 additions & 7 deletions kytos/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ def start_controller(self):
self._tasks.append(task)
task = self.loop.create_task(self.event_handler("app"))
self._tasks.append(task)
task = self.loop.create_task(self.event_handler("meta"))
self._tasks.append(task)

self.started_at = now()

Expand Down Expand Up @@ -569,7 +571,7 @@ async def publish_connection_error(self, event):
f"kytos/core.{event.destination.protocol.name}.connection.error"
error_msg = f"Connection state: {event.destination.state}"
event.content["exception"] = error_msg
await self.buffers.app.aput(event)
await self.buffers.conn.aput(event)

async def msg_out_event_handler(self):
"""Handle msg_out events.
Expand Down Expand Up @@ -679,7 +681,7 @@ def get_switch_or_create(self, dpid, connection=None):
if old_connection is not connection:
self.remove_connection(old_connection)

self.buffers.app.put(event)
self.buffers.conn.put(event)

return switch

Expand Down Expand Up @@ -813,7 +815,7 @@ def load_napp(self, username, napp_name):
napp = napp_module.Main(controller=self)
except Exception as exc: # noqa pylint: disable=bare-except
msg = f"NApp {username}/{napp_name} exception {str(exc)} "
raise KytosNAppSetupException(msg) from exc
raise KytosNAppSetupException(msg, from_exc=exc) from exc

self.napps[(username, napp_name)] = napp

Expand Down Expand Up @@ -846,11 +848,11 @@ def load_napps(self):
try:
self.log.info("Loading NApp %s", napp.id)
self.load_napp(napp.username, napp.name)
except FileNotFoundError as exception:
except FileNotFoundError as exc:
self.log.error("Could not load NApp %s: %s",
napp.id, exception)
msg = f"NApp {napp.id} exception {str(exception)}"
raise KytosNAppSetupException(msg) from exception
napp.id, exc)
msg = f"NApp {napp.id} exception {str(exc)}"
raise KytosNAppSetupException(msg, from_exc=exc) from exc

def unload_napp(self, username, napp_name):
"""Unload a specific NApp.
Expand Down
3 changes: 2 additions & 1 deletion kytos/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ def __str__(self):
class KytosNAppSetupException(KytosNAppException):
"""KytosNAppSetupException. """

def __init__(self, message="KytosNAppSetupException") -> None:
def __init__(self, message="KytosNAppSetupException", from_exc=None):
"""KytosNAppSetupException."""
self.from_exc = from_exc
super().__init__(message=message)

def __str__(self):
Expand Down
2 changes: 1 addition & 1 deletion kytos/core/napps/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def notify_loaded(self):
"""Inform this NApp has been loaded."""
name = f'{self.username}/{self.name}.loaded'
event = KytosEvent(name=name, content={})
self.controller.buffers.app.put(event)
self.controller.buffers.meta.put(event)

# all listeners receive event
def _shutdown_handler(self, event): # pylint: disable=unused-argument
Expand Down
16 changes: 15 additions & 1 deletion tests/unit/test_core/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import warnings
from copy import copy
from unittest import TestCase
from unittest.mock import MagicMock, Mock, call, patch
from unittest.mock import AsyncMock, MagicMock, Mock, call, patch

import pytest
from pyof.foundation.exceptions import PackException
Expand Down Expand Up @@ -362,22 +362,30 @@ def test_get_switch_or_create__exists(self):
dpid = '00:00:00:00:00:00:00:01'
switch = MagicMock(dpid=dpid)
self.controller.switches = {dpid: switch}
self.controller.buffers.conn = MagicMock()

connection = MagicMock()
resp_switch = self.controller.get_switch_or_create(dpid, connection)

self.assertEqual(resp_switch, switch)
self.controller.buffers.conn.put.assert_called()
ev_name = "kytos/core.switch.reconnected"
assert self.controller.buffers.conn.put.call_args[0][0].name == ev_name

def test_get_switch_or_create__not_exists(self):
"""Test status_api method when switch does not exist."""
self.controller.switches = {}
self.controller.buffers.conn = MagicMock()

dpid = '00:00:00:00:00:00:00:01'
connection = MagicMock()
switch = self.controller.get_switch_or_create(dpid, connection)

expected_switches = {'00:00:00:00:00:00:00:01': switch}
self.assertEqual(self.controller.switches, expected_switches)
self.controller.buffers.conn.put.assert_called()
ev_name = "kytos/core.switch.new"
assert self.controller.buffers.conn.put.call_args[0][0].name == ev_name

def test_create_or_update_connection(self):
"""Test create_or_update_connection method."""
Expand Down Expand Up @@ -797,3 +805,9 @@ async def test_configuration_endpoint(self, controller, api_client):
resp = await api_client.get("kytos/core/config")
assert resp.status_code == 200
assert expected == resp.json()

async def test_publish_connection_error(self, controller):
"""Test publish_connection_error."""
controller.buffers.conn.aput = AsyncMock()
await controller.publish_connection_error(MagicMock())
controller.buffers.conn.aput.assert_called()

0 comments on commit 5fefcb3

Please sign in to comment.