Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in rate limited KytosEventBuffers #412

Closed
wants to merge 13 commits into from
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ All notable changes to the kytos project will be documented in this file.
UNRELEASED - Under development
******************************

Added
=====

- Added ``RateLimitedBuffer``, an alternative to ``KytosEventBuffer``
- New configuration parameters for ``KytosEventBuffers``, accessible through the ``event_buffer_conf`` parameter.

[2023.1.0] - 2023-06-05
***********************

Expand Down
5 changes: 5 additions & 0 deletions kytos/core/buffers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Module for buffers."""
from .buffers import KytosEventBuffer
from .manager import KytosBuffers

__all__ = ('KytosEventBuffer', 'KytosBuffers')
90 changes: 38 additions & 52 deletions kytos/core/buffers.py → kytos/core/buffers/buffers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Kytos Buffer Classes, based on Python Queue."""
import asyncio
import logging
import time
from typing import Callable, Hashable, Iterable

from janus import PriorityQueue, Queue
import limits
from janus import Queue

from kytos.core.events import KytosEvent
from kytos.core.helpers import get_thread_pool_max_workers

__all__ = ('KytosBuffers', )

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -123,53 +124,38 @@ def full(self):
return self._queue.sync_q.full()


class KytosBuffers:
"""Set of KytosEventBuffer used in Kytos."""

def __init__(self):
"""Build four KytosEventBuffers.

:attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from connection events.

:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from network.
class RateLimitedBuffer(KytosEventBuffer):
"""
Extension of KytosEventBuffer with ratelimiting capabilities.
"""
def __init__(
self, *args,
strategy: limits.strategies.RateLimiter,
limit: limits.RateLimitItem,
gen_identifiers: Callable[[KytosEvent], Iterable[Hashable]],
**kwargs
):
super().__init__(*args, **kwargs)
self.strategy = strategy
self.limit = limit
self.gen_identifiers = gen_identifiers

:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be received.

:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be sent.
def get(self):
val = super().get()
identifiers = self.limit, *self.gen_identifiers(val)
while not self.strategy.hit(*identifiers):
window_reset, _ = self.strategy.get_window_stats(*identifiers)
sleep_time = window_reset - time.time()
if sleep_time > 0:
time.sleep(sleep_time)
return val

:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
sent to NApps.
"""
self._pool_max_workers = get_thread_pool_max_workers()
self.conn = KytosEventBuffer("conn")
self.raw = KytosEventBuffer("raw", maxsize=self._get_maxsize("sb"))
self.msg_in = KytosEventBuffer("msg_in",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue)
self.msg_out = KytosEventBuffer("msg_out",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue)
self.app = KytosEventBuffer("app", maxsize=self._get_maxsize("app"))

def get_all_buffers(self):
"""Get all KytosEventBuffer instances."""
return [
event_buffer for event_buffer in self.__dict__.values()
if isinstance(event_buffer, KytosEventBuffer)
]

def _get_maxsize(self, queue_name):
"""Get queue maxsize if it's been set."""
return self._pool_max_workers.get(queue_name, 0)

def send_stop_signal(self):
"""Send a ``kytos/core.shutdown`` event to each buffer."""
LOG.info('Stop signal received by Kytos buffers.')
LOG.info('Sending KytosShutdownEvent to all apps.')
event = KytosEvent(name='kytos/core.shutdown')
for buffer in self.get_all_buffers():
buffer.put(event)
async def aget(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

David, another requirement to consider:

  • Also, rate limit at the put and aput level so before actually putting in the queue, essentially rate limiting as close to the source as possible as far as the queue is concerned. This also implies supporting both sync and async for this end of the queue.

Let me elaborate on the use case in the next comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is partly related a prior chaining discussion, but here it would only chain on the put|aput where any publisher could register a particular rate limiter that it'll use. Or alternatively provide a context manager or something similar where publisher who need to be rate limiter before putting in the queue they can use that context or async context, although I believe the former would be less boilerplate and more ergonomic to use.

Considering that we've been using bounded queues, this will also contribute to a bit more of fairness with other concurrent producers who might be sending other type of messages regardless of using a priority queue or not (and even more when it's using a priority queue), when rate limiters are also being used on the put|aput side. It's a subtle difference, but if the queue is properly sized for the current number of events going on, then it can make a significant difference since the queue won't be constantly getting full.

                                                          producer 1 (put)
core event_handler aget <---  Queue X   <----  put|aput   producer 2 (aput)
                                                          ....
                                                          producer n (put|aput)
    |
    |
    \/
________________________________

NApps event handlers run by

thread pool and/or asyncio task

________________________________


aget working as a global rate limiter with a given identifier as it's currently implemented, and then also structure on put|aput in a way where in the future it's also possible to throttle with a given unique id context, allowing it to essentially replacing implementing rate limiting at the producer level as we've been doing for mef_eline and telemetry_int (even though the global aget rate limit will already solve a major part of the problem).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concrete use case example with our existing Kytos-ng NApps:

  • globally rate limit any messages per 'destination.id' on msg_out at 100/second AND individually rate limit any flows from mef_eline at '30/sec' and any from telemetry_int at '30/sec'.

On msg_out, flow_manager is responsible for pushing FlowMods, so it's centralizing the events, so on flow_manager it could on behalf of its clients rate limit based on the flow owner value:

                                                     mef_eline (via requests and KytosEvent)
aget  <---  Queue msg_out <---  put   flow_manager

                                                    telemetry_int (via KytosEvent)

Do you see what I mean?

Can you also support this or propose another approach to also solve the publisher side rate limit?

val = await super().aget()
identifiers = self.limit, *self.gen_identifiers(val)
while not self.strategy.hit(*identifiers):
window_reset, _ = self.strategy.get_window_stats(*identifiers)
sleep_time = window_reset - time.time()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
return val
86 changes: 86 additions & 0 deletions kytos/core/buffers/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Utilities for composing KytosEventBuffers"""
from functools import reduce

import limits
import limits.storage as lstorage
import limits.strategies as lstrategies
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ktmi, I was looking into limits source code and docs, and there's exclusive async/aio strategies, check it out, also the storage has a corresponding async abstraction that's asyncio.Task based instead of using treading.Timer, so let's make sure we're also using asyncio all the way down where possible. As a result, later on when calling hit it'd be a bool coroutine await moving_window.hit(one_per_minute, "test_namespace", "foo").

Locally, I was exploring a 20/second limit, but then I realized that it was not reaching 20/seconds for a given dpid check out the graph below, in red it's filtering for one dpid, and in blue it's all pkt out messages in a ring topology that I was using, I also included some logs to see how it was sleeping, and even though in most cases it was only sleeping after fetching roughly 20 items or so (or hitting them 20), but in some cases it was only getting 10 more or less, looks like it's sleeping more than it should, so I'm going to recommend for you to research and double check this part again, and check if we can potentially use the remaing value from WindowStats when using the get_window_stats, if it actually returns the remaining time when it might also simplifies the current subtraction that's being computed.

2023-10-17 16:10:19,006 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9936294555664062 secs, q len: 905, ids: (20 per 1 second,
 ('127.0.0.1', 57754))
2023-10-17 16:10:20,017 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9823873043060303 secs, q len: 877, ids: (20 per 1 second,
 ('127.0.0.1', 57754))
2023-10-17 16:10:21,011 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9889931678771973 secs, q len: 868, ids: (20 per 1 second,
 ('127.0.0.1', 57754))
2023-10-17 16:10:22,018 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9818587303161621 secs, q len: 856, ids: (20 per 1 second,
 ('127.0.0.1', 57754))
2023-10-17 16:10:23,018 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9812984466552734 secs, q len: 836, ids: (20 per 1 second,
 ('127.0.0.1', 57754))

20231017_161154

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've seen the async version, but the reason I didn't use it because, then I would have to have two separate rate limiters between the async and non async version. However, it seems that we don't use non-async get, so I can change this to the pure async version if we go under the assumption that get is deprecated and that we only use aget.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Let's go with pure async then and leave the sync part unsupported. All queue consumers on core are async, they're either using msg_out_event_handler or the generic event_handler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched the code to async. Not sure what's causing the rate to be halved like that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ktmi, I'm suspecting it might be the sleep interval, in some iterations if it sleeps more than expected then it makes sense that the rate halved. Maybe the remaing value from WindowStats when using the get_window_stats might be helpful if you could look into it and then later on confirm with a local stress test too. I generated this graph with wireshark > IO graph sending a bunch of FlowMods. If you need help let me know.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing the same behaviour you are. For the following graph, I multiplied the packets sent by of_lldp so it would send 192 to a single switch every 3 seconds with a limit of 100/second, and here are the results.
image

Same conditions as above, but I have of_lldp try to send 400 packets every 3 seconds.

image

In both above graphs, their are periods where packets aren't being sent at the same throughput, but the max throughput isn't being limited in the same way you are showing.

As for the remaining value from window_stats, that's the amount of room left in the window, I'll add a guard to try again if the value is not 0.

Copy link
Member

@viniarck viniarck Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ktmi,

their are periods where packets aren't being sent at the same throughput, but the max throughput isn't being limited in the same way you are showing.

That's certainly a positive point knowing the max throughput didn't get limited, but with a rate of every 3 secs it might not be hitting the issue that I'm seeing. To facilitate to reproduce the issue, I've tried to reduce the minimal scenario possible, I've manage to reproduce using this stress test tool called vegeta, by sending requests to flow manager. Arguably the minimal code path would be without sending requests and the DB, but these aren't bottlenecks for the rate I'm using so it's OK.

Here are my findings and the stress test command. For all stress tests I'm always sending 100 request per second with a single flow over 30 seconds to sustain the throughput to get close to 100 packets per second:

❯ jq -ncM '{method: "POST", url:"http://localhost:8181/api/kytos/flow_manager/v2/flows/00:00:00:00:00:00:00:01", body: { "force": false, "flows": [ { "priority": 10, "match": { "in_port": 1, "dl_vlan": 100 }, "actions": [ { "action_type": "output", "port": 1 } ] } ] } | @base64, header: {"Content-Type": ["application/json"]}}' | vegeta attack -format=json -rate 100/1s -duration=30s -timeout=60s | tee results.bin | vegeta report
  • Case 0 -> To prove that 100 pps (FlowMods ps) can be reached and the DB isn't a bottleneck for this rate:

20231026_151841

  • Case 1 -> I've set msg_out limit as 20/second, notice that rate of flow mods aren't relatively constant and it's almost as if it were halved:

20231026_152948

  • Rerunning case 1 again -> Sometimes it even goes over 10 pps, so it's not like a hard half rate limit, but on average it looks like it is, it's actually a bit lower since the rate actually sent on wire isn't constant:

20231026_161021

  • Case 2 -> I've set msg_out limit as 100/second (default one), again, it behaved the same as the prior example except resulting in 50/second:

20231026_153246

  • Case 3 -> I've set msg_out limit as 300/second, it didn't get rate limited as expected and FlowMods were sent at a relatively constant rate of 100/sec matching the client rate, so proving that without sleeping for a constant rate it's not interfering either as expected:

20231026_153549

Conclusion, I still suspect it might be the value it's sleeping for and/or maybe also try out other strategies type such as fixed window, I haven't further debugged this specfically this time, but it's behaving as I saw the other day. Let me know if you can try to reproduce on your env with vegeta, otherwise we can try to pair on it. Thanks, David.

from janus import PriorityQueue, Queue

from .buffers import KytosEventBuffer, RateLimitedBuffer


def process_default(config):
"""
Create a default KytosEventBuffer from a given config dict
"""
queue_classes = {
'default': Queue,
'queue': Queue,
'priority': PriorityQueue,
}
return {
'buffer_cls': KytosEventBuffer,
'buffer_args': {
'queue_cls': queue_classes[config.get('queue', 'default')]
},
}


def process_storage(config):
"""
Create a rate limit storage from a given config dict
"""
storages = {
'memory': lstorage.MemoryStorage,
}
return storages[config.get('type', 'memory')]()


def process_strategy(config):
"""
Create a rate limit strategy from a given config dict
"""
strategies = {
'moving_window': lstrategies.MovingWindowRateLimiter,
'fixed_window': lstrategies.FixedWindowRateLimiter,
'elastic_window': lstrategies.FixedWindowElasticExpiryRateLimiter,
}
strategy_cls = strategies[config.get('type', 'moving_window')]
return strategy_cls(
process_storage(
config.get('storage', {})
)
)


def process_rate_limited(config):
viniarck marked this conversation as resolved.
Show resolved Hide resolved
"""
Create a rate limited KytosEventBuffer from a given config dict
"""
processed = process_default(config)
processed['buffer_cls'] = RateLimitedBuffer
args = processed['buffer_args']
args['strategy'] = process_strategy(config.get('strategy', {}))
args['limit'] = limits.parse(config.get('limit', '100/second'))
args['gen_identifiers'] = lambda event: reduce(
lambda ev, attr: getattr(ev, attr, ('unknown',)),
config.get('identifier', 'connection.id').split('.'),
event
)
return processed


def buffer_from_config(name, config: dict) -> KytosEventBuffer:
"""
Create a KytosEventBuffer from a given config dict
"""
buffer_conf_processors = {
'default': process_default,
'rate_limited': process_rate_limited,
}
buffer_type = config.get('type', 'default')
processed_conf = buffer_conf_processors[buffer_type](config)
return processed_conf['buffer_cls'](
name,
**(processed_conf['buffer_args'])
)
79 changes: 79 additions & 0 deletions kytos/core/buffers/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Kytos Buffer Classes, based on Python Queue."""
import logging

from janus import PriorityQueue

from kytos.core.config import KytosConfig
from kytos.core.events import KytosEvent
from kytos.core.helpers import get_thread_pool_max_workers

from .buffers import KytosEventBuffer
from .factory import buffer_from_config

LOG = logging.getLogger(__name__)


class KytosBuffers:
"""Set of KytosEventBuffer used in Kytos."""

def __init__(self):
"""Build four KytosEventBuffers.

:attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from connection events.

:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from network.

:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be received.

:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be sent.

:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
sent to NApps.
"""

self._pool_max_workers = get_thread_pool_max_workers()

self.conn = KytosEventBuffer("conn")
self.raw = KytosEventBuffer("raw", maxsize=self._get_maxsize("sb"))
self.msg_in = KytosEventBuffer(
"msg_in",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue,
)
self.msg_out = KytosEventBuffer(
"msg_out",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue,
)
self.app = KytosEventBuffer(
"app",
maxsize=self._get_maxsize("app")
)

buffer_conf = KytosConfig().options['daemon'].event_buffer_conf

for name, config in buffer_conf.items():
setattr(self, name, buffer_from_config(name, config))

def get_all_buffers(self):
"""Get all KytosEventBuffer instances."""
return [
event_buffer for event_buffer in self.__dict__.values()
if isinstance(event_buffer, KytosEventBuffer)
]

def _get_maxsize(self, queue_name):
"""Get queue maxsize if it's been set."""
return self._pool_max_workers.get(queue_name, 0)

def send_stop_signal(self):
"""Send a ``kytos/core.shutdown`` event to each buffer."""
LOG.info('Stop signal received by Kytos buffers.')
LOG.info('Sending KytosShutdownEvent to all apps.')
event = KytosEvent(name='kytos/core.shutdown')
for buffer in self.get_all_buffers():
buffer.put(event)
Loading