Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in rate limited KytosEventBuffers #412

Closed
wants to merge 13 commits into from
52 changes: 49 additions & 3 deletions kytos/core/buffers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Kytos Buffer Classes, based on Python Queue."""
import asyncio
import logging
import time
from typing import Callable, Hashable, Iterable

import limits
from janus import PriorityQueue, Queue

from kytos.core.events import KytosEvent
Expand Down Expand Up @@ -123,6 +127,39 @@ def full(self):
return self._queue.sync_q.full()


class RateLimitedBuffer(KytosEventBuffer):
"""
Extension of KytosEventBuffer with ratelimiting capabilities.
"""
def __init__(
self, *args,
strategy: limits.strategies.RateLimiter,
limit: limits.RateLimitItem,
gen_identifiers: Callable[[KytosEvent], Iterable[Hashable]],
**kwargs
):
super().__init__(*args, **kwargs)
self.strategy = strategy
self.limit = limit
self.gen_identifiers = gen_identifiers

def get(self):
val = super().get()
identifiers = self.limit, *self.gen_identifiers(val)
while not self.strategy.hit(*identifiers):
window_reset, _ = self.strategy.get_window_stats(*identifiers)
time.sleep(window_reset - time.time())
return val

async def aget(self):
val = await super().aget()
identifiers = self.limit, *self.gen_identifiers(val)
while not self.strategy.hit(*identifiers):
window_reset, _ = self.strategy.get_window_stats(*identifiers)
await asyncio.sleep(window_reset - time.time())
viniarck marked this conversation as resolved.
Show resolved Hide resolved
return val


class KytosBuffers:
"""Set of KytosEventBuffer used in Kytos."""

Expand Down Expand Up @@ -150,9 +187,18 @@ def __init__(self):
self.msg_in = KytosEventBuffer("msg_in",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue)
self.msg_out = KytosEventBuffer("msg_out",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue)
strategy = limits.strategies.MovingWindowRateLimiter(
limits.storage.MemoryStorage()
)
self.msg_out = RateLimitedBuffer(
"msg_out",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue,
strategy=strategy,
limit=limits.RateLimitItemPerSecond(100, 1),
gen_identifiers=lambda event:
getattr(event.destination, 'id', ('unknown',)),
viniarck marked this conversation as resolved.
Show resolved Hide resolved
)
self.app = KytosEventBuffer("app", maxsize=self._get_maxsize("app"))

def get_all_buffers(self):
Expand Down
37 changes: 26 additions & 11 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --output-file=requirements/dev.txt --resolver=backtracking requirements/dev.in requirements/run.txt
# pip-compile --output-file=requirements/dev.txt requirements/dev.in requirements/run.txt
#
-e .
# via -r requirements/dev.in
Expand Down Expand Up @@ -76,6 +76,11 @@ decorator==4.4.2
# -r requirements/run.txt
# ipython
# kytos
deprecated==1.2.14
# via
# -r requirements/run.txt
# kytos
# limits
dill==0.3.4
# via pylint
distlib==0.3.6
Expand Down Expand Up @@ -143,8 +148,11 @@ idna==3.3
# requests
imagesize==1.4.1
# via sphinx
importlib-metadata==4.12.0
# via sphinx
importlib-resources==6.0.1
# via
# -r requirements/run.txt
# kytos
# limits
iniconfig==2.0.0
# via pytest
ipython==8.1.1
Expand Down Expand Up @@ -200,6 +208,10 @@ lazy-object-proxy==1.7.1
# astroid
# kytos
# openapi-spec-validator
limits==3.6.0
# via
# -r requirements/run.txt
# kytos
livereload==2.6.3
# via sphinx-autobuild
lockfile==0.12.2
Expand Down Expand Up @@ -241,10 +253,13 @@ openapi-spec-validator==0.5.6
# -r requirements/run.txt
# kytos
# openapi-core
packaging==23.0
packaging==23.1
# via
# -r requirements/run.txt
# black
# build
# kytos
# limits
# pytest
# sphinx
# tox
Expand Down Expand Up @@ -450,14 +465,12 @@ typing-extensions==4.5.0
# via
# -r requirements/run.txt
# astroid
# black
# janus
# jsonschema-spec
# kytos
# limits
# openapi-core
# pydantic
# pylint
# starlette
urllib3==1.26.7
# via
# -r requirements/run.txt
Expand Down Expand Up @@ -504,11 +517,13 @@ werkzeug==2.0.3
wheel==0.40.0
# via pip-tools
wrapt==1.15.0
# via astroid
# via
# -r requirements/run.txt
# astroid
# deprecated
# kytos
yala==3.2.0
# via kytos
zipp==3.8.1
# via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
# pip
Expand Down
1 change: 1 addition & 0 deletions requirements/run.in
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ httpx==0.24.0
starlette[full]==0.26.0
uvicorn[standard]==0.21.1
asgiref==3.6.0
limits==3.6.0
16 changes: 13 additions & 3 deletions requirements/run.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# This file is autogenerated by pip-compile with Python 3.10
viniarck marked this conversation as resolved.
Show resolved Hide resolved
# by the following command:
#
# pip-compile --output-file=requirements/run.txt --resolver=backtracking requirements/run.in
# pip-compile --output-file=requirements/run.txt requirements/run.in
#
anyio==3.6.2
# via
Expand All @@ -26,6 +26,8 @@ click==8.1.3
# via uvicorn
decorator==4.4.2
# via ipython
deprecated==1.2.14
# via limits
dnspython==2.2.1
# via
# -r requirements/run.in
Expand Down Expand Up @@ -55,6 +57,8 @@ idna==3.3
# anyio
# email-validator
# httpx
importlib-resources==6.0.1
# via limits
ipython==8.1.1
# via -r requirements/run.in
isodate==0.6.1
Expand All @@ -80,6 +84,8 @@ jsonschema-spec==0.1.4
# openapi-spec-validator
lazy-object-proxy==1.7.1
# via openapi-spec-validator
limits==3.6.0
# via -r requirements/run.in
lockfile==0.12.2
# via
# -r requirements/run.in
Expand All @@ -98,6 +104,8 @@ openapi-schema-validator==0.4.4
# openapi-spec-validator
openapi-spec-validator==0.5.6
# via openapi-core
packaging==23.1
# via limits
parse==1.19.0
# via openapi-core
parso==0.6.2
Expand Down Expand Up @@ -165,9 +173,9 @@ typing-extensions==4.5.0
# via
# janus
# jsonschema-spec
# limits
# openapi-core
# pydantic
# starlette
urllib3==1.26.7
# via elastic-apm
uvicorn[standard]==0.21.1
Expand All @@ -184,6 +192,8 @@ websockets==11.0
# via uvicorn
werkzeug==2.0.3
# via openapi-core
wrapt==1.15.0
# via deprecated

# The following packages are considered to be unsafe in a requirements file:
# setuptools