diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 77a889a4..8d593ee3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,9 @@ UNRELEASED - Under development Added ===== + +- Added optional rate limiting mixins for ``KytosEventBuffer`` +- New configuration parameters for ``KytosEventBuffers``, accessible through the ``event_buffer_conf`` parameter. - Added ``Interface.tag_ranges`` as ``dict[str, list[list[int]]]`` as replacement for ``vlan_pool`` settings. - Added ``kytos/core.interface_tags`` event publication to notify any modification of ``Interface.tag_ranges`` or ``Interface.available_tags``. - Added ``TAGRange`` class which is used when a ``UNI`` has a tag as a list of ranges. diff --git a/kytos/core/buffers/__init__.py b/kytos/core/buffers/__init__.py new file mode 100644 index 00000000..51b7eff1 --- /dev/null +++ b/kytos/core/buffers/__init__.py @@ -0,0 +1,5 @@ +"""Module for buffers.""" +from .buffers import KytosEventBuffer +from .manager import KytosBuffers + +__all__ = ('KytosEventBuffer', 'KytosBuffers') diff --git a/kytos/core/buffers.py b/kytos/core/buffers/buffers.py similarity index 60% rename from kytos/core/buffers.py rename to kytos/core/buffers/buffers.py index 27cc3bbc..68e53bce 100644 --- a/kytos/core/buffers.py +++ b/kytos/core/buffers/buffers.py @@ -1,12 +1,7 @@ """Kytos Buffer Classes, based on Python Queue.""" import logging -from janus import PriorityQueue, Queue - -from kytos.core.events import KytosEvent -from kytos.core.helpers import get_thread_pool_max_workers - -__all__ = ('KytosBuffers', ) +from janus import Queue LOG = logging.getLogger(__name__) @@ -14,8 +9,7 @@ class KytosEventBuffer: """KytosEventBuffer represents a queue to store a set of KytosEvents.""" - def __init__(self, name, event_base_class=None, maxsize=0, - queue_cls=Queue): + def __init__(self, name, queue: Queue = None): """Contructor of KytosEventBuffer receive the parameters below. Args: @@ -25,8 +19,7 @@ def __init__(self, name, event_base_class=None, maxsize=0, queue_cls (class): queue class from janus """ self.name = name - self._event_base_class = event_base_class - self._queue = queue_cls(maxsize=maxsize) + self._queue = queue if queue is not None else Queue() self._reject_new_events = False def put(self, event): @@ -121,55 +114,3 @@ def empty(self): def full(self): """Return True if KytosEventBuffer is full of KytosEvent.""" return self._queue.sync_q.full() - - -class KytosBuffers: - """Set of KytosEventBuffer used in Kytos.""" - - def __init__(self): - """Build four KytosEventBuffers. - - :attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events - received from connection events. - - :attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events - received from network. - - :attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with - events to be received. - - :attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with - events to be sent. - - :attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events - sent to NApps. - """ - self._pool_max_workers = get_thread_pool_max_workers() - self.conn = KytosEventBuffer("conn") - self.raw = KytosEventBuffer("raw", maxsize=self._get_maxsize("sb")) - self.msg_in = KytosEventBuffer("msg_in", - maxsize=self._get_maxsize("sb"), - queue_cls=PriorityQueue) - self.msg_out = KytosEventBuffer("msg_out", - maxsize=self._get_maxsize("sb"), - queue_cls=PriorityQueue) - self.app = KytosEventBuffer("app", maxsize=self._get_maxsize("app")) - - def get_all_buffers(self): - """Get all KytosEventBuffer instances.""" - return [ - event_buffer for event_buffer in self.__dict__.values() - if isinstance(event_buffer, KytosEventBuffer) - ] - - def _get_maxsize(self, queue_name): - """Get queue maxsize if it's been set.""" - return self._pool_max_workers.get(queue_name, 0) - - def send_stop_signal(self): - """Send a ``kytos/core.shutdown`` event to each buffer.""" - LOG.info('Stop signal received by Kytos buffers.') - LOG.info('Sending KytosShutdownEvent to all apps.') - event = KytosEvent(name='kytos/core.shutdown') - for buffer in self.get_all_buffers(): - buffer.put(event) diff --git a/kytos/core/buffers/factory.py b/kytos/core/buffers/factory.py new file mode 100644 index 00000000..54a6f7c3 --- /dev/null +++ b/kytos/core/buffers/factory.py @@ -0,0 +1,169 @@ +"""Utilities for composing KytosEventBuffers""" +from functools import reduce + +import limits +import limits.aio.storage as lstorage +import limits.aio.strategies as lstrategies +from janus import PriorityQueue, Queue + +from kytos.core.helpers import get_thread_pool_max_workers + +from .buffers import KytosEventBuffer +from .mixin import GetRateLimitMixin, PutRateLimitMixin, RateLimitArgs + +queue_classes = { + 'queue': Queue, + 'priority': PriorityQueue, +} + +rate_limit_storages = { + 'memory': lstorage.MemoryStorage, +} + +rate_limit_strategies = { + 'moving_window': lstrategies.MovingWindowRateLimiter, + 'fixed_window': lstrategies.FixedWindowRateLimiter, + 'elastic_window': lstrategies.FixedWindowElasticExpiryRateLimiter, +} + + +def process_queue(config: dict) -> Queue: + """ + Create a janus queue from a given config dict + """ + queue_type = queue_classes[config.get('type', 'queue')] + queue_size = config.get('maxsize', 0) + if isinstance(queue_size, str): + if queue_size.startswith('threadpool_'): + threadpool = queue_size[len('threadpool_'):] + queue_size = get_thread_pool_max_workers().get(threadpool, 0) + else: + raise TypeError( + 'Expected int or str formatted ' + 'as "threadpool_{threadpool_name}"' + ) + return queue_type(maxsize=queue_size) + + +def process_storage(config: dict) -> lstorage.Storage: + """ + Create a rate limit storage from a given config dict + """ + return rate_limit_storages[config.get('type', 'memory')]( + uri=config.get('uri') + ) + + +def process_strategy(config: dict) -> lstrategies.RateLimiter: + """ + Create a rate limiter from a given config dict + """ + strategy_cls = rate_limit_strategies[config.get('type', 'moving_window')] + return strategy_cls( + process_storage( + config.get('storage', {}) + ) + ) + + +def process_gen_identifiers(identifiers: list[str]): + """ + Generate a func for getting a tuple of hashable parameters from an event + """ + split_identifiers = [ + identifier.split('.') + for identifier in identifiers + ] + return lambda event: ( + reduce( + lambda ev, attr: getattr( + ev, + attr, + 'unknown', + ), + identifier, + event + ) + for identifier in split_identifiers + ) + + +def process_rate_limit(config: dict) -> RateLimitArgs: + """ + Create a rate limited KytosEventBuffer from a given config dict + """ + args = {} + args['strategy'] = process_strategy( + config.get('strategy', {}) + ) + args['limit'] = limits.parse( + config.get( + 'limit', + '100/second' + ) + ) + args['gen_identifiers'] = process_gen_identifiers( + config.get('identifier', []) + ) + return args + + +def process_get_rate_limit(config: dict): + """ + Return class and parameters needed for get rate limit mixin + """ + return GetRateLimitMixin, process_rate_limit(config) + + +def process_put_rate_limit(config: dict): + """ + Return class and parameters needed for put rate limit mixin + """ + return PutRateLimitMixin, process_rate_limit(config) + + +mixin_processors = { + 'get_rate_limit': process_get_rate_limit, + 'put_rate_limit': process_put_rate_limit, +} + +__class_cache = {} + + +def combine_mixins(base_cls, mixins): + """Combine mixins into the base_cls.""" + key = base_cls, frozenset(mixins) + if key in __class_cache: + return __class_cache[key] + new_cls = type( + base_cls.__name__, + (*mixins, base_cls), + {} + ) + __class_cache[key] = new_cls + return new_cls + + +def buffer_from_config(name: str, config: dict) -> KytosEventBuffer: + """ + Create a KytosEventBuffer from a given config dict + """ + buffer_cls = KytosEventBuffer + args = {} + # Process Queue Config + args['queue'] = process_queue(config.get('queue', {})) + + # Process Mixins + mixins: dict = config.get('mixins', {}) + mixin_classes = [] + for mixin, mixin_config in mixins.items(): + mixin_cls, mixin_args = mixin_processors[mixin](mixin_config) + mixin_classes.append(mixin_cls) + args[mixin] = mixin_args + + if mixin_classes: + buffer_cls = combine_mixins( + buffer_cls, + mixin_classes + ) + return buffer_cls(name, **args) diff --git a/kytos/core/buffers/manager.py b/kytos/core/buffers/manager.py new file mode 100644 index 00000000..9436703c --- /dev/null +++ b/kytos/core/buffers/manager.py @@ -0,0 +1,80 @@ +"""Kytos Buffer Classes, based on Python Queue.""" +import logging + +from janus import PriorityQueue, Queue + +from kytos.core.config import KytosConfig +from kytos.core.events import KytosEvent +from kytos.core.helpers import get_thread_pool_max_workers + +from .buffers import KytosEventBuffer +from .factory import buffer_from_config + +LOG = logging.getLogger(__name__) + + +class KytosBuffers: + """Set of KytosEventBuffer used in Kytos.""" + + def __init__(self): + """Build four KytosEventBuffers. + + :attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events + received from connection events. + + :attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events + received from network. + + :attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with + events to be received. + + :attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with + events to be sent. + + :attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events + sent to NApps. + """ + + self._pool_max_workers = get_thread_pool_max_workers() + + self.conn = KytosEventBuffer("conn") + self.raw = KytosEventBuffer( + "raw", + queue=Queue(maxsize=self._get_maxsize("sb")) + ) + self.msg_in = KytosEventBuffer( + "msg_in", + queue=PriorityQueue(maxsize=self._get_maxsize("sb")), + ) + self.msg_out = KytosEventBuffer( + "msg_out", + queue=PriorityQueue(maxsize=self._get_maxsize("sb")), + ) + self.app = KytosEventBuffer( + "app", + queue=Queue(maxsize=self._get_maxsize("app")), + ) + + buffer_conf = KytosConfig().options['daemon'].event_buffer_conf + + for name, config in buffer_conf.items(): + setattr(self, name, buffer_from_config(name, config)) + + def get_all_buffers(self): + """Get all KytosEventBuffer instances.""" + return [ + event_buffer for event_buffer in self.__dict__.values() + if isinstance(event_buffer, KytosEventBuffer) + ] + + def _get_maxsize(self, queue_name): + """Get queue maxsize if it's been set.""" + return self._pool_max_workers.get(queue_name, 0) + + def send_stop_signal(self): + """Send a ``kytos/core.shutdown`` event to each buffer.""" + LOG.info('Stop signal received by Kytos buffers.') + LOG.info('Sending KytosShutdownEvent to all apps.') + event = KytosEvent(name='kytos/core.shutdown') + for buffer in self.get_all_buffers(): + buffer.put(event) diff --git a/kytos/core/buffers/mixin.py b/kytos/core/buffers/mixin.py new file mode 100644 index 00000000..a364b38b --- /dev/null +++ b/kytos/core/buffers/mixin.py @@ -0,0 +1,64 @@ +"""Mixins for event buffers.""" + +import asyncio +import time +from typing import Callable, Hashable, Iterable, TypedDict + +import limits +import limits.aio.strategies as lstrategies + +from kytos.core.events import KytosEvent + + +class RateLimitArgs(TypedDict): + """ + Args dict for usage in rate limit mixins + """ + strategy: lstrategies.RateLimiter + limit: limits.RateLimitItem + gen_identifiers: Callable[[KytosEvent], Iterable[Hashable]] + + +class GetRateLimitMixin: + """ + Mixin for KytosEventBuffer to rate limit getting from the buffer. + """ + def __init__(self, *args, get_rate_limit: RateLimitArgs, **kwargs): + super().__init__(*args, **kwargs) + self.__strategy = get_rate_limit['strategy'] + self.__limit = get_rate_limit['limit'] + self.__gen_identifiers = get_rate_limit['gen_identifiers'] + + async def aget(self): + """Rate limited async get""" + val = await super().aget() + identifiers = self.__limit, *self.__gen_identifiers(val) + while not await self.__strategy.hit(*identifiers): + window_reset, _ =\ + await self.__strategy.get_window_stats(*identifiers) + sleep_time = window_reset - time.time() + if sleep_time > 0: + await asyncio.sleep(sleep_time) + return val + + +class PutRateLimitMixin: + """ + Mixin for KytosEventBuffer to rate limit putting into the buffer. + """ + def __init__(self, *args, put_rate_limit: RateLimitArgs, **kwargs): + super().__init__(*args, **kwargs) + self.__strategy = put_rate_limit['strategy'] + self.__limit = put_rate_limit['limit'] + self.__gen_identifiers = put_rate_limit['gen_identifiers'] + + async def aput(self, val): + """Rate limited async put""" + identifiers = self.__limit, *self.__gen_identifiers(val) + while not await self.__strategy.hit(*identifiers): + window_reset, _ =\ + await self.__strategy.get_window_stats(*identifiers) + sleep_time = window_reset - time.time() + if sleep_time > 0: + await asyncio.sleep(sleep_time) + await super().aput(val) diff --git a/kytos/core/config.py b/kytos/core/config.py index d8fb124f..46039ed6 100644 --- a/kytos/core/config.py +++ b/kytos/core/config.py @@ -125,32 +125,80 @@ def parse_args(self): 'debug': False} """ - defaults = {'pidfile': os.path.join(BASE_ENV, - 'var/run/kytos/kytosd.pid'), - 'workdir': os.path.join(BASE_ENV, 'var/lib/kytos'), - 'napps': os.path.join(BASE_ENV, 'var/lib/kytos/napps/'), - 'napps_repositories': "['https://napps.kytos.io/repo/']", - 'installed_napps': os.path.join(BASE_ENV, - 'var/lib/kytos/napps/', - '.installed'), - 'conf': os.path.join(BASE_ENV, 'etc/kytos/kytos.conf'), - 'logging': os.path.join(BASE_ENV, 'etc/kytos/logging.ini'), - 'logger_decorators': - ["kytos.core.logger_decorators.queue_decorator"], - 'listen': '0.0.0.0', - 'port': 6653, - 'api_traceback_on_500': True, - 'foreground': False, - 'protocol_name': '', - 'enable_entities_by_default': False, - 'napps_pre_installed': [], - 'authenticate_urls': [], - 'token_expiration_minutes': 180, - 'thread_pool_max_workers': {}, - 'database': '', - 'apm': '', - 'connection_timeout': 130, - 'debug': False} + defaults = { + 'pidfile': os.path.join( + BASE_ENV, + 'var/run/kytos/kytosd.pid' + ), + 'workdir': os.path.join( + BASE_ENV, + 'var/lib/kytos' + ), + 'napps': os.path.join( + BASE_ENV, + 'var/lib/kytos/napps/' + ), + 'napps_repositories': [ + 'https://napps.kytos.io/repo/' + ], + 'installed_napps': os.path.join( + BASE_ENV, + 'var/lib/kytos/napps/', + '.installed' + ), + 'conf': os.path.join( + BASE_ENV, + 'etc/kytos/kytos.conf' + ), + 'logging': os.path.join( + BASE_ENV, + 'etc/kytos/logging.ini' + ), + 'logger_decorators': + ["kytos.core.logger_decorators.queue_decorator"], + 'listen': '0.0.0.0', + 'port': 6653, + 'api_traceback_on_500': True, + 'foreground': False, + 'protocol_name': '', + 'enable_entities_by_default': False, + 'napps_pre_installed': [], + 'authenticate_urls': [], + 'token_expiration_minutes': 180, + 'thread_pool_max_workers': {}, + 'database': '', + 'apm': '', + 'connection_timeout': 130, + 'debug': False, + 'event_buffer_conf': { + 'msg_out': { + 'type': 'rate_limited', + 'queue': { + 'type': 'priority', + 'maxsize': 'threadpool_sb', + }, + 'mixins': { + 'put_rate_limit': { + 'strategy': { + 'type': 'moving_window', + 'storage': { + 'type': 'memory', + }, + }, + 'limit': '100/second', + 'identifier': ['destination.id'], + }, + }, + }, + 'msg_in': { + 'type': 'default', + 'queue': { + 'type': 'priority', + 'maxsize': 'threadpool_sb', + }, + }, + }, + } options, argv = self.conf_parser.parse_known_args() @@ -184,7 +232,6 @@ def _parse_options(self, argv): options, unknown = self.parser.parse_known_args(argv) if unknown: warnings.warn(f"Unknown arguments: {unknown}") - options.napps_repositories = json.loads(options.napps_repositories) options.debug = options.debug in ['True', True] options.daemon = options.daemon in ['True', True] options.port = int(options.port) @@ -203,11 +250,13 @@ def _parse_json(value): return json.loads(value) return value + options.napps_repositories = _parse_json(options.napps_repositories) options.logger_decorators = _parse_json(options.logger_decorators) options.napps_pre_installed = _parse_json(options.napps_pre_installed) options.authenticate_urls = _parse_json(options.authenticate_urls) thread_pool_max_workers = options.thread_pool_max_workers options.thread_pool_max_workers = _parse_json(thread_pool_max_workers) + options.event_buffer_conf = _parse_json(options.event_buffer_conf) return options diff --git a/kytos/templates/kytos.conf.template b/kytos/templates/kytos.conf.template index dec7198c..d26f21d8 100644 --- a/kytos/templates/kytos.conf.template +++ b/kytos/templates/kytos.conf.template @@ -56,8 +56,9 @@ api_traceback_on_500 = True # This directory has both core napps and user installed napps. napps = {{ prefix }}/var/lib/kytos/napps -napps_repositories = [ - "https://napps.kytos.io/repo/" +napps_repositories = + [ + "https://napps.kytos.io/repo/" ] # Pre installed napps. List of Napps to be pre-installed and enabled. @@ -67,6 +68,9 @@ napps_pre_installed = [] # The jwt_secret parameter is responsible for signing JSON Web Tokens. jwt_secret = {{ jwt_secret }} +# Time to expire authentication token in minutes +token_expiration_minutes = 180 + # Maximum number of thread workers in a thread pool, if it's set as empty dict {} no thread pools will be used. # The following pools are available by default to be used in the listen_to decorator: # - sb: it's used automatically by kytos/of_core.* events, it's meant for southbound related messages @@ -74,8 +78,52 @@ jwt_secret = {{ jwt_secret }} # - db: it can be used by for higher priority db related tasks (need to be parametrized on decorator) thread_pool_max_workers = {"sb": 256, "db": 256, "app": 512} -# Time to expire authentication token in minutes -token_expiration_minutes = 180 +# Configuration for KytosEventBuffers +# Valid event buffers are "msg_in", "msg_out", "app", "conn", and "raw". +# Valid queue types are: +# - queue: Default queue class provided by janus +# - priority: PriorityQueue class provided by janus +# Valid Mixins are: +# - put_rate_limit: Sets the rate limit on async put requests into the buffer +# - get_rate_limit: Sets the rate limit on async get requests from the buffer +# For rate limit mixins, there are a few configurable parameters: +# - strategy: Strategy to use for rate limiting +# - limit: the amount of requests allowed for a given time period. +# - identifier: list of identifiers to use for determining rate limit +# Available strategies include: +# - moving_window +# - fixed_window +# - elastic window +# Available rate limit storage types: +# - memory + +event_buffer_conf = + { + "msg_out": { + "queue": { + "type": "priority", + "maxsize": "threadpool_sb" + }, + "mixins": { + "put_rate_limit": { + "strategy": { + "type": "moving_window", + "storage": { + "type": "memory" + } + }, + "limit": "100/second", + "identifier": ["destination.id"] + } + } + }, + "msg_in": { + "queue": { + "type": "priority", + "maxsize": "threadpool_sb" + } + } + } # NApps database. Supported values: mongodb (don't use quotes in the string) database = diff --git a/requirements/dev.txt b/requirements/dev.txt index 0a4fbd16..3ca03bed 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.9 # by the following command: # -# pip-compile --output-file=requirements/dev.txt --resolver=backtracking requirements/dev.in requirements/run.txt +# pip-compile --output-file=requirements/dev.txt requirements/dev.in requirements/run.txt # -e . # via -r requirements/dev.in @@ -76,6 +76,11 @@ decorator==4.4.2 # -r requirements/run.txt # ipython # kytos +deprecated==1.2.14 + # via + # -r requirements/run.txt + # kytos + # limits dill==0.3.4 # via pylint distlib==0.3.6 @@ -143,8 +148,13 @@ idna==3.3 # requests imagesize==1.4.1 # via sphinx -importlib-metadata==4.12.0 +importlib-metadata==6.8.0 # via sphinx +importlib-resources==6.0.1 + # via + # -r requirements/run.txt + # kytos + # limits iniconfig==2.0.0 # via pytest ipython==8.1.1 @@ -200,6 +210,10 @@ lazy-object-proxy==1.7.1 # astroid # kytos # openapi-spec-validator +limits==3.6.0 + # via + # -r requirements/run.txt + # kytos livereload==2.6.3 # via sphinx-autobuild lockfile==0.12.2 @@ -241,10 +255,13 @@ openapi-spec-validator==0.5.6 # -r requirements/run.txt # kytos # openapi-core -packaging==23.0 +packaging==23.1 # via + # -r requirements/run.txt # black # build + # kytos + # limits # pytest # sphinx # tox @@ -454,6 +471,7 @@ typing-extensions==4.5.0 # janus # jsonschema-spec # kytos + # limits # openapi-core # pydantic # pylint @@ -504,11 +522,19 @@ werkzeug==2.0.3 wheel==0.40.0 # via pip-tools wrapt==1.15.0 - # via astroid + # via + # -r requirements/run.txt + # astroid + # deprecated + # kytos yala==3.2.0 # via kytos -zipp==3.8.1 - # via importlib-metadata +zipp==3.16.2 + # via + # -r requirements/run.txt + # importlib-metadata + # importlib-resources + # kytos # The following packages are considered to be unsafe in a requirements file: # pip diff --git a/requirements/run.in b/requirements/run.in index 6f912d32..9d9317bb 100644 --- a/requirements/run.in +++ b/requirements/run.in @@ -19,3 +19,4 @@ httpx==0.24.0 starlette[full]==0.26.0 uvicorn[standard]==0.21.1 asgiref==3.6.0 +limits==3.6.0 diff --git a/requirements/run.txt b/requirements/run.txt index 643126a6..bd627a38 100644 --- a/requirements/run.txt +++ b/requirements/run.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.9 # by the following command: # -# pip-compile --output-file=requirements/run.txt --resolver=backtracking requirements/run.in +# pip-compile --output-file=requirements/run.txt requirements/run.in # anyio==3.6.2 # via @@ -26,6 +26,8 @@ click==8.1.3 # via uvicorn decorator==4.4.2 # via ipython +deprecated==1.2.14 + # via limits dnspython==2.2.1 # via # -r requirements/run.in @@ -55,6 +57,8 @@ idna==3.3 # anyio # email-validator # httpx +importlib-resources==6.0.1 + # via limits ipython==8.1.1 # via -r requirements/run.in isodate==0.6.1 @@ -80,6 +84,8 @@ jsonschema-spec==0.1.4 # openapi-spec-validator lazy-object-proxy==1.7.1 # via openapi-spec-validator +limits==3.6.0 + # via -r requirements/run.in lockfile==0.12.2 # via # -r requirements/run.in @@ -98,6 +104,8 @@ openapi-schema-validator==0.4.4 # openapi-spec-validator openapi-spec-validator==0.5.6 # via openapi-core +packaging==23.1 + # via limits parse==1.19.0 # via openapi-core parso==0.6.2 @@ -165,6 +173,7 @@ typing-extensions==4.5.0 # via # janus # jsonschema-spec + # limits # openapi-core # pydantic # starlette @@ -184,6 +193,10 @@ websockets==11.0 # via uvicorn werkzeug==2.0.3 # via openapi-core +wrapt==1.15.0 + # via deprecated +zipp==3.16.2 + # via importlib-resources # The following packages are considered to be unsafe in a requirements file: # setuptools diff --git a/tests/unit/test_core/test_buffers.py b/tests/unit/test_core/test_buffers.py index 913f028a..2f6f85af 100644 --- a/tests/unit/test_core/test_buffers.py +++ b/tests/unit/test_core/test_buffers.py @@ -3,6 +3,7 @@ from unittest.mock import MagicMock import pytest +from janus import Queue from kytos.core.buffers import KytosBuffers, KytosEventBuffer from kytos.core.events import KytosEvent @@ -103,7 +104,7 @@ async def test_empty(self): async def test_full(self): """Test full method to full.""" - kytos_event_buffer = KytosEventBuffer("name", maxsize=1) + kytos_event_buffer = KytosEventBuffer("name", queue=Queue(maxsize=1)) assert not kytos_event_buffer.full() assert kytos_event_buffer.empty()