Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in configuration for event buffers #438

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kytos/core/buffers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Module for kytos buffers"""

from .buffers import KytosEventBuffer
from .manager import KytosBuffers
65 changes: 3 additions & 62 deletions kytos/core/buffers.py → kytos/core/buffers/buffers.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
"""Kytos Buffer Classes, based on Python Queue."""
import logging

from janus import PriorityQueue, Queue

from kytos.core.events import KytosEvent
from kytos.core.helpers import get_thread_pool_max_workers

__all__ = ('KytosBuffers', )
from janus import Queue

LOG = logging.getLogger(__name__)


class KytosEventBuffer:
"""KytosEventBuffer represents a queue to store a set of KytosEvents."""

def __init__(self, name, event_base_class=None, maxsize=0,
queue_cls=Queue):
def __init__(self, name, queue: Queue = None):
"""Contructor of KytosEventBuffer receive the parameters below.

Args:
Expand All @@ -25,8 +19,7 @@ def __init__(self, name, event_base_class=None, maxsize=0,
queue_cls (class): queue class from janus
"""
self.name = name
self._event_base_class = event_base_class
self._queue = queue_cls(maxsize=maxsize)
self._queue = queue if queue is not None else Queue()
self._reject_new_events = False

def put(self, event):
Expand Down Expand Up @@ -121,55 +114,3 @@ def empty(self):
def full(self):
"""Return True if KytosEventBuffer is full of KytosEvent."""
return self._queue.sync_q.full()


class KytosBuffers:
"""Set of KytosEventBuffer used in Kytos."""

def __init__(self):
"""Build four KytosEventBuffers.

:attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from connection events.

:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from network.

:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be received.

:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be sent.

:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
sent to NApps.
"""
self._pool_max_workers = get_thread_pool_max_workers()
self.conn = KytosEventBuffer("conn")
self.raw = KytosEventBuffer("raw", maxsize=self._get_maxsize("sb"))
self.msg_in = KytosEventBuffer("msg_in",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue)
self.msg_out = KytosEventBuffer("msg_out",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue)
self.app = KytosEventBuffer("app", maxsize=self._get_maxsize("app"))

def get_all_buffers(self):
"""Get all KytosEventBuffer instances."""
return [
event_buffer for event_buffer in self.__dict__.values()
if isinstance(event_buffer, KytosEventBuffer)
]

def _get_maxsize(self, queue_name):
"""Get queue maxsize if it's been set."""
return self._pool_max_workers.get(queue_name, 0)

def send_stop_signal(self):
"""Send a ``kytos/core.shutdown`` event to each buffer."""
LOG.info('Stop signal received by Kytos buffers.')
LOG.info('Sending KytosShutdownEvent to all apps.')
event = KytosEvent(name='kytos/core.shutdown')
for buffer in self.get_all_buffers():
buffer.put(event)
54 changes: 54 additions & 0 deletions kytos/core/buffers/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Utilities for composing KytosEventBuffers"""

from janus import PriorityQueue, Queue

from kytos.core.helpers import get_thread_pool_max_workers

from .buffers import KytosEventBuffer

queue_classes = {
'queue': Queue,
'priority': PriorityQueue,
}


def process_queue(config: dict) -> Queue:
"""
Create a janus queue from a given config dict
"""
queue_type = queue_classes[config.get('type', 'queue')]
queue_size = config.get('maxsize', 0)
if isinstance(queue_size, str):
if queue_size.startswith('threadpool_'):
threadpool = queue_size[len('threadpool_'):]
queue_size = get_thread_pool_max_workers().get(threadpool, 0)
else:
raise TypeError(
'Expected int or str formatted '
'as "threadpool_{threadpool_name}"'
)
return queue_type(maxsize=queue_size)


extension_processors = {}


def buffer_from_config(name: str, config: dict) -> KytosEventBuffer:
Ktmi marked this conversation as resolved.
Show resolved Hide resolved
"""
Create a KytosEventBuffer from a given config dict
"""
buffer_cls = KytosEventBuffer
args = {}
# Process Queue Config
args['queue'] = process_queue(config.get('queue', {}))

buffer = buffer_cls(name, **args)

# Process Mixins
extensions: dict = config.get('extensions', [])
for extension in extensions:
extension_type = extension['type']
extension_args = extension.get('args', {})
buffer = extension_processors[extension_type](buffer, extension_args)

return buffer
80 changes: 80 additions & 0 deletions kytos/core/buffers/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Kytos Buffer Classes, based on Python Queue."""
import logging

from janus import PriorityQueue, Queue

from kytos.core.config import KytosConfig
from kytos.core.events import KytosEvent
from kytos.core.helpers import get_thread_pool_max_workers

from .buffers import KytosEventBuffer
from .factory import buffer_from_config

LOG = logging.getLogger(__name__)


class KytosBuffers:
"""Set of KytosEventBuffer used in Kytos."""

def __init__(self):
"""Build four KytosEventBuffers.

:attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from connection events.

:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from network.

:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be received.

:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be sent.

:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
sent to NApps.
"""

self._pool_max_workers = get_thread_pool_max_workers()

self.conn = KytosEventBuffer("conn")
self.raw = KytosEventBuffer(
"raw",
queue=Queue(maxsize=self._get_maxsize("sb"))
)
self.msg_in = KytosEventBuffer(
"msg_in",
queue=PriorityQueue(maxsize=self._get_maxsize("sb")),
)
self.msg_out = KytosEventBuffer(
"msg_out",
queue=PriorityQueue(maxsize=self._get_maxsize("sb")),
)
self.app = KytosEventBuffer(
"app",
queue=Queue(maxsize=self._get_maxsize("app")),
)

buffer_conf = KytosConfig().options['daemon'].event_buffer_conf

for name, config in buffer_conf.items():
setattr(self, name, buffer_from_config(name, config))

def get_all_buffers(self):
"""Get all KytosEventBuffer instances."""
return [
event_buffer for event_buffer in self.__dict__.values()
if isinstance(event_buffer, KytosEventBuffer)
]

def _get_maxsize(self, queue_name):
"""Get queue maxsize if it's been set."""
return self._pool_max_workers.get(queue_name, 0)

def send_stop_signal(self):
"""Send a ``kytos/core.shutdown`` event to each buffer."""
LOG.info('Stop signal received by Kytos buffers.')
LOG.info('Sending KytosShutdownEvent to all apps.')
event = KytosEvent(name='kytos/core.shutdown')
for buffer in self.get_all_buffers():
buffer.put(event)
95 changes: 66 additions & 29 deletions kytos/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,32 +125,65 @@ def parse_args(self):
'debug': False}

"""
defaults = {'pidfile': os.path.join(BASE_ENV,
'var/run/kytos/kytosd.pid'),
'workdir': os.path.join(BASE_ENV, 'var/lib/kytos'),
'napps': os.path.join(BASE_ENV, 'var/lib/kytos/napps/'),
'napps_repositories': "['https://napps.kytos.io/repo/']",
'installed_napps': os.path.join(BASE_ENV,
'var/lib/kytos/napps/',
'.installed'),
'conf': os.path.join(BASE_ENV, 'etc/kytos/kytos.conf'),
'logging': os.path.join(BASE_ENV, 'etc/kytos/logging.ini'),
'logger_decorators':
["kytos.core.logger_decorators.queue_decorator"],
'listen': '0.0.0.0',
'port': 6653,
'api_traceback_on_500': True,
'foreground': False,
'protocol_name': '',
'enable_entities_by_default': False,
'napps_pre_installed': [],
'authenticate_urls': [],
'token_expiration_minutes': 180,
'thread_pool_max_workers': {},
'database': '',
'apm': '',
'connection_timeout': 130,
'debug': False}
defaults = {
'pidfile': os.path.join(
BASE_ENV,
'var/run/kytos/kytosd.pid'
),
'workdir': os.path.join(
BASE_ENV,
'var/lib/kytos'
),
'napps': os.path.join(
BASE_ENV,
'var/lib/kytos/napps/'
),
'napps_repositories': ['https://napps.kytos.io/repo/'],
'installed_napps': os.path.join(
BASE_ENV,
'var/lib/kytos/napps/',
'.installed'
),
'conf': os.path.join(
BASE_ENV,
'etc/kytos/kytos.conf'
),
'logging': os.path.join(
BASE_ENV,
'etc/kytos/logging.ini'
),
'logger_decorators': [
"kytos.core.logger_decorators.queue_decorator",
],
'listen': '0.0.0.0',
'port': 6653,
'api_traceback_on_500': True,
'foreground': False,
'protocol_name': '',
'enable_entities_by_default': False,
'napps_pre_installed': [],
'authenticate_urls': [],
'token_expiration_minutes': 180,
'thread_pool_max_workers': {},
'database': '',
'apm': '',
'connection_timeout': 130,
'debug': False,
'event_buffer_conf': {
'msg_out': {
'queue': {
'type': 'priority',
'maxsize': 'threadpool_sb',
},
},
'msg_in': {
'queue': {
'type': 'priority',
'maxsize': 'threadpool_sb',
},
},
},
}

options, argv = self.conf_parser.parse_known_args()

Expand Down Expand Up @@ -184,7 +217,6 @@ def _parse_options(self, argv):
options, unknown = self.parser.parse_known_args(argv)
if unknown:
warnings.warn(f"Unknown arguments: {unknown}")
options.napps_repositories = json.loads(options.napps_repositories)
options.debug = options.debug in ['True', True]
options.daemon = options.daemon in ['True', True]
options.port = int(options.port)
Expand All @@ -203,11 +235,16 @@ def _parse_json(value):
return json.loads(value)
return value

options.napps_repositories = _parse_json(options.napps_repositories)
options.logger_decorators = _parse_json(options.logger_decorators)
options.napps_pre_installed = _parse_json(options.napps_pre_installed)
options.authenticate_urls = _parse_json(options.authenticate_urls)
thread_pool_max_workers = options.thread_pool_max_workers
options.thread_pool_max_workers = _parse_json(thread_pool_max_workers)
options.thread_pool_max_workers = _parse_json(
options.thread_pool_max_workers
)
options.event_buffer_conf = _parse_json(
options.event_buffer_conf
)

return options

Expand Down
22 changes: 22 additions & 0 deletions kytos/templates/kytos.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ jwt_secret = {{ jwt_secret }}
# - db: it can be used by for higher priority db related tasks (need to be parametrized on decorator)
thread_pool_max_workers = {"sb": 256, "db": 256, "app": 512}

# Configuration for KytosEventBuffers
# Valid event buffers are "msg_in", "msg_out", "app", "conn", and "raw".
# Valid queue types are:
# - queue: Default queue class provided by janus
# - priority: PriorityQueue class provided by janus

event_buffer_conf =
{
"msg_out": {
"queue": {
"type": "priority",
"maxsize": "threadpool_sb"
}
},
"msg_in": {
"queue": {
"type": "priority",
"maxsize": "threadpool_sb"
}
}
}

# Time to expire authentication token in minutes
token_expiration_minutes = 180

Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_core/test_buffers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Test kytos buffers functionalities."""
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import MagicMock

import pytest
from janus import Queue

from kytos.core.buffers import KytosBuffers, KytosEventBuffer
from kytos.core.events import KytosEvent
Expand Down Expand Up @@ -103,7 +104,7 @@ async def test_empty(self):

async def test_full(self):
"""Test full method to full."""
kytos_event_buffer = KytosEventBuffer("name", maxsize=1)
kytos_event_buffer = KytosEventBuffer("name", queue=Queue(maxsize=1))
assert not kytos_event_buffer.full()
assert kytos_event_buffer.empty()

Expand Down
Loading
Loading