-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added in rate limited KytosEventBuffers #412
Changes from 7 commits
49159da
96f2173
36e720d
b27ba3a
f695e1c
3aaa59c
1e90204
44764c0
bc7794b
c91b952
e7dbd18
d908a7e
a73d412
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
"""Module for buffers.""" | ||
from .buffers import KytosEventBuffer | ||
from .manager import KytosBuffers | ||
|
||
__all__ = ('KytosEventBuffer', 'KytosBuffers') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
"""Utilities for composing KytosEventBuffers""" | ||
from functools import reduce | ||
|
||
import limits | ||
import limits.storage as lstorage | ||
import limits.strategies as lstrategies | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Ktmi, I was looking into Locally, I was exploring a 20/second limit, but then I realized that it was not reaching 20/seconds for a given dpid check out the graph below, in red it's filtering for one dpid, and in blue it's all pkt out messages in a ring topology that I was using, I also included some logs to see how it was sleeping, and even though in most cases it was only sleeping after fetching roughly 20 items or so (or hitting them 20), but in some cases it was only getting 10 more or less, looks like it's sleeping more than it should, so I'm going to recommend for you to research and double check this part again, and check if we can potentially use the remaing value from
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I've seen the async version, but the reason I didn't use it because, then I would have to have two separate rate limiters between the async and non async version. However, it seems that we don't use non-async There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. Let's go with pure async then and leave the sync part unsupported. All queue consumers on core are async, they're either using msg_out_event_handler or the generic event_handler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched the code to async. Not sure what's causing the rate to be halved like that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Ktmi, I'm suspecting it might be the sleep interval, in some iterations if it sleeps more than expected then it makes sense that the rate halved. Maybe the remaing value from WindowStats when using the get_window_stats might be helpful if you could look into it and then later on confirm with a local stress test too. I generated this graph with wireshark > IO graph sending a bunch of FlowMods. If you need help let me know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not seeing the same behaviour you are. For the following graph, I multiplied the packets sent by of_lldp so it would send 192 to a single switch every 3 seconds with a limit of 100/second, and here are the results. Same conditions as above, but I have In both above graphs, their are periods where packets aren't being sent at the same throughput, but the max throughput isn't being limited in the same way you are showing. As for the remaining value from window_stats, that's the amount of room left in the window, I'll add a guard to try again if the value is not 0. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's certainly a positive point knowing the max throughput didn't get limited, but with a rate of every 3 secs it might not be hitting the issue that I'm seeing. To facilitate to reproduce the issue, I've tried to reduce the minimal scenario possible, I've manage to reproduce using this stress test tool called Here are my findings and the stress test command. For all stress tests I'm always sending 100 request per second with a single flow over 30 seconds to sustain the throughput to get close to 100 packets per second:
Conclusion, I still suspect it might be the value it's sleeping for and/or maybe also try out other strategies type such as fixed window, I haven't further debugged this specfically this time, but it's behaving as I saw the other day. Let me know if you can try to reproduce on your env with |
||
from janus import PriorityQueue, Queue | ||
|
||
from .buffers import KytosEventBuffer, RateLimitedBuffer | ||
|
||
|
||
def process_default(config): | ||
""" | ||
Create a default KytosEventBuffer from a given config dict | ||
""" | ||
queue_classes = { | ||
'default': Queue, | ||
'queue': Queue, | ||
'priority': PriorityQueue, | ||
} | ||
return { | ||
'buffer_cls': KytosEventBuffer, | ||
'buffer_args': { | ||
'queue_cls': queue_classes[config.get('queue', 'default')] | ||
}, | ||
} | ||
|
||
|
||
def process_storage(config): | ||
""" | ||
Create a rate limit storage from a given config dict | ||
""" | ||
storages = { | ||
'memory': lstorage.MemoryStorage, | ||
} | ||
return storages[config.get('type', 'memory')]() | ||
|
||
|
||
def process_strategy(config): | ||
""" | ||
Create a rate limit strategy from a given config dict | ||
""" | ||
strategies = { | ||
'moving_window': lstrategies.MovingWindowRateLimiter, | ||
'fixed_window': lstrategies.FixedWindowRateLimiter, | ||
'elastic_window': lstrategies.FixedWindowElasticExpiryRateLimiter, | ||
} | ||
strategy_cls = strategies[config.get('type', 'moving_window')] | ||
return strategy_cls( | ||
process_storage( | ||
config.get('storage', {}) | ||
) | ||
) | ||
|
||
|
||
def process_rate_limited(config): | ||
viniarck marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Create a rate limited KytosEventBuffer from a given config dict | ||
""" | ||
processed = process_default(config) | ||
processed['buffer_cls'] = RateLimitedBuffer | ||
args = processed['buffer_args'] | ||
args['strategy'] = process_strategy(config.get('strategy', {})) | ||
args['limit'] = limits.parse(config.get('limit', '100/second')) | ||
args['gen_identifiers'] = lambda event: reduce( | ||
lambda ev, attr: getattr(ev, attr, ('unknown',)), | ||
config.get('identifier', 'connection.id').split('.'), | ||
event | ||
) | ||
return processed | ||
|
||
|
||
def buffer_from_config(name, config: dict) -> KytosEventBuffer: | ||
""" | ||
Create a KytosEventBuffer from a given config dict | ||
""" | ||
buffer_conf_processors = { | ||
'default': process_default, | ||
'rate_limited': process_rate_limited, | ||
} | ||
buffer_type = config.get('type', 'default') | ||
processed_conf = buffer_conf_processors[buffer_type](config) | ||
return processed_conf['buffer_cls']( | ||
name, | ||
**(processed_conf['buffer_args']) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
"""Kytos Buffer Classes, based on Python Queue.""" | ||
import logging | ||
|
||
from janus import PriorityQueue | ||
|
||
from kytos.core.config import KytosConfig | ||
from kytos.core.events import KytosEvent | ||
from kytos.core.helpers import get_thread_pool_max_workers | ||
|
||
from .buffers import KytosEventBuffer | ||
from .factory import buffer_from_config | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
||
class KytosBuffers: | ||
"""Set of KytosEventBuffer used in Kytos.""" | ||
|
||
def __init__(self): | ||
"""Build four KytosEventBuffers. | ||
|
||
:attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events | ||
received from connection events. | ||
|
||
:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events | ||
received from network. | ||
|
||
:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with | ||
events to be received. | ||
|
||
:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with | ||
events to be sent. | ||
|
||
:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events | ||
sent to NApps. | ||
""" | ||
|
||
self._pool_max_workers = get_thread_pool_max_workers() | ||
|
||
self.conn = KytosEventBuffer("conn") | ||
self.raw = KytosEventBuffer("raw", maxsize=self._get_maxsize("sb")) | ||
self.msg_in = KytosEventBuffer( | ||
"msg_in", | ||
maxsize=self._get_maxsize("sb"), | ||
queue_cls=PriorityQueue, | ||
) | ||
self.msg_out = KytosEventBuffer( | ||
"msg_out", | ||
maxsize=self._get_maxsize("sb"), | ||
queue_cls=PriorityQueue, | ||
) | ||
self.app = KytosEventBuffer( | ||
"app", | ||
maxsize=self._get_maxsize("app") | ||
) | ||
|
||
buffer_conf = KytosConfig().options['daemon'].event_buffer_conf | ||
|
||
for name, config in buffer_conf.items(): | ||
setattr(self, name, buffer_from_config(name, config)) | ||
|
||
def get_all_buffers(self): | ||
"""Get all KytosEventBuffer instances.""" | ||
return [ | ||
event_buffer for event_buffer in self.__dict__.values() | ||
if isinstance(event_buffer, KytosEventBuffer) | ||
] | ||
|
||
def _get_maxsize(self, queue_name): | ||
"""Get queue maxsize if it's been set.""" | ||
return self._pool_max_workers.get(queue_name, 0) | ||
|
||
def send_stop_signal(self): | ||
"""Send a ``kytos/core.shutdown`` event to each buffer.""" | ||
LOG.info('Stop signal received by Kytos buffers.') | ||
LOG.info('Sending KytosShutdownEvent to all apps.') | ||
event = KytosEvent(name='kytos/core.shutdown') | ||
for buffer in self.get_all_buffers(): | ||
buffer.put(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
David, another requirement to consider:
put
andaput
level so before actually putting in the queue, essentially rate limiting as close to the source as possible as far as the queue is concerned. This also implies supporting both sync and async for this end of the queue.Let me elaborate on the use case in the next comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is partly related a prior chaining discussion, but here it would only chain on the
put|aput
where any publisher could register a particular rate limiter that it'll use. Or alternatively provide a context manager or something similar where publisher who need to be rate limiter before putting in the queue they can use that context or async context, although I believe the former would be less boilerplate and more ergonomic to use.Considering that we've been using bounded queues, this will also contribute to a bit more of fairness with other concurrent producers who might be sending other type of messages regardless of using a priority queue or not (and even more when it's using a priority queue), when rate limiters are also being used on the
put|aput
side. It's a subtle difference, but if the queue is properly sized for the current number of events going on, then it can make a significant difference since the queue won't be constantly getting full.aget
working as a global rate limiter with a given identifier as it's currently implemented, and then also structure onput|aput
in a way where in the future it's also possible to throttle with a given unique id context, allowing it to essentially replacing implementing rate limiting at the producer level as we've been doing formef_eline
andtelemetry_int
(even though the globalaget
rate limit will already solve a major part of the problem).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concrete use case example with our existing Kytos-ng NApps:
'destination.id'
onmsg_out
at100/second
AND individually rate limit any flows frommef_eline
at'30/sec'
and any fromtelemetry_int
at'30/sec'
.On
msg_out
,flow_manager
is responsible for pushing FlowMods, so it's centralizing the events, so onflow_manager
it could on behalf of its clients rate limit based on the flowowner
value:Do you see what I mean?
Can you also support this or propose another approach to also solve the publisher side rate limit?