-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added in rate limited KytosEventBuffers #412
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent contribution, @Ktmi. Once again, I appreciated your help with this. Otherwise, we wouldn't have this on this 2023.2
version.
I look forward to also seeing the FlowMods stress test, if you could also plot a graph, using wireshark
with the plot IO once if filter for FlowMod OpenFlow msg type that can be an option. Also, regarding the e2e test failures, they look unrelated, but if you hit it again please file an issue and ping @gretelliz to take a look (on nightly tests it's passing though), she's been enhancing sdntrace_cp test suite too.
Also, regarding generating flows, looks like you've hit kytos-ng/flow_manager#104, check out the payload again.
Still getting the same error in the SDN trace e2e tests. Not sure why. Will rerun without this patch and see if the issue persists. |
E2E tests on SDN trace fails whether I have this patch applied or not. |
@Ktmi the nightly e2e tests have been passing for more than 2 weeks. Maybe it might be a sleep that's impacting when executing on your laptop? Either way, if it's failing it might be worth mapping an issue for it if you haven't yet, just so it can be improved. It's also worth dispatching an execution on GitLab https://gitlab.ampath.net/amlight/kytos-end-to-end-tester/-/pipelines with this branch here just so you can also have more information when opening the issue. |
If you set this env var |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ktmi, consider this PR approved. A few points though (other than the e2e test we're discussing on other comments):
- Please, simulate a queue misconfiguration and see what will happen. Ideally, it should exit
kytosd
if it can't handle the option, same pattern that we use with other unsupported option values. - Also, let us know if you had the chance to stress test the rate limit in practice and see it working.
- Also I raised a question regarding the current supported configuration and if we need more flexibility or not.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving it pre-approved. @Ktmi once you stress test the rate limit in practice and confirm it's working feel free to merge.
Regarding the future discussion, let's leave that for another issue, and maybe we might not even need to prioritize we'll see, and it's OK to only address that in the future and break compat once we have other clear usages, sometimes it's also better to wait for the concrete use cases, we still have reasonable time frame until 2023.2
is shipped, so if any other case is needed we'll reprioritize otherwise we'll ship the current one, OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR looks good, but still waiting for the following (so I'll review again to make it clear what's left to merge this PR) just so this PR can land:
- Exercise the rate limits in practice and confirm the expected rate.
- Resolve the changelog conflict.
- Confirm that e2e tests are still passing and map whatever else if needed, if any.
Thanks, David.
kytos/core/buffers/factory.py
Outdated
|
||
import limits | ||
import limits.storage as lstorage | ||
import limits.strategies as lstrategies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ktmi, I was looking into limits
source code and docs, and there's exclusive async/aio
strategies, check it out, also the storage has a corresponding async
abstraction that's asyncio.Task
based instead of using treading.Timer
, so let's make sure we're also using asyncio
all the way down where possible. As a result, later on when calling hit
it'd be a bool coroutine await moving_window.hit(one_per_minute, "test_namespace", "foo")
.
Locally, I was exploring a 20/second limit, but then I realized that it was not reaching 20/seconds for a given dpid check out the graph below, in red it's filtering for one dpid, and in blue it's all pkt out messages in a ring topology that I was using, I also included some logs to see how it was sleeping, and even though in most cases it was only sleeping after fetching roughly 20 items or so (or hitting them 20), but in some cases it was only getting 10 more or less, looks like it's sleeping more than it should, so I'm going to recommend for you to research and double check this part again, and check if we can potentially use the remaing value from WindowStats
when using the get_window_stats
, if it actually returns the remaining time when it might also simplifies the current subtraction that's being computed.
2023-10-17 16:10:19,006 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9936294555664062 secs, q len: 905, ids: (20 per 1 second,
('127.0.0.1', 57754))
2023-10-17 16:10:20,017 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9823873043060303 secs, q len: 877, ids: (20 per 1 second,
('127.0.0.1', 57754))
2023-10-17 16:10:21,011 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9889931678771973 secs, q len: 868, ids: (20 per 1 second,
('127.0.0.1', 57754))
2023-10-17 16:10:22,018 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9818587303161621 secs, q len: 856, ids: (20 per 1 second,
('127.0.0.1', 57754))
2023-10-17 16:10:23,018 - INFO [kytos.core.buffers.buffers] [buffers.py:162:aget] (MainThread) buffer msg_out will sleep for: 0.9812984466552734 secs, q len: 836, ids: (20 per 1 second,
('127.0.0.1', 57754))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I've seen the async version, but the reason I didn't use it because, then I would have to have two separate rate limiters between the async and non async version. However, it seems that we don't use non-async get
, so I can change this to the pure async version if we go under the assumption that get
is deprecated and that we only use aget
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Let's go with pure async then and leave the sync part unsupported. All queue consumers on core are async, they're either using msg_out_event_handler or the generic event_handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched the code to async. Not sure what's causing the rate to be halved like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ktmi, I'm suspecting it might be the sleep interval, in some iterations if it sleeps more than expected then it makes sense that the rate halved. Maybe the remaing value from WindowStats when using the get_window_stats might be helpful if you could look into it and then later on confirm with a local stress test too. I generated this graph with wireshark > IO graph sending a bunch of FlowMods. If you need help let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing the same behaviour you are. For the following graph, I multiplied the packets sent by of_lldp so it would send 192 to a single switch every 3 seconds with a limit of 100/second, and here are the results.
Same conditions as above, but I have of_lldp
try to send 400 packets every 3 seconds.
In both above graphs, their are periods where packets aren't being sent at the same throughput, but the max throughput isn't being limited in the same way you are showing.
As for the remaining value from window_stats, that's the amount of room left in the window, I'll add a guard to try again if the value is not 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
their are periods where packets aren't being sent at the same throughput, but the max throughput isn't being limited in the same way you are showing.
That's certainly a positive point knowing the max throughput didn't get limited, but with a rate of every 3 secs it might not be hitting the issue that I'm seeing. To facilitate to reproduce the issue, I've tried to reduce the minimal scenario possible, I've manage to reproduce using this stress test tool called vegeta
, by sending requests to flow manager. Arguably the minimal code path would be without sending requests and the DB, but these aren't bottlenecks for the rate I'm using so it's OK.
Here are my findings and the stress test command. For all stress tests I'm always sending 100 request per second with a single flow over 30 seconds to sustain the throughput to get close to 100 packets per second:
❯ jq -ncM '{method: "POST", url:"http://localhost:8181/api/kytos/flow_manager/v2/flows/00:00:00:00:00:00:00:01", body: { "force": false, "flows": [ { "priority": 10, "match": { "in_port": 1, "dl_vlan": 100 }, "actions": [ { "action_type": "output", "port": 1 } ] } ] } | @base64, header: {"Content-Type": ["application/json"]}}' | vegeta attack -format=json -rate 100/1s -duration=30s -timeout=60s | tee results.bin | vegeta report
- Case 0 -> To prove that 100 pps (FlowMods ps) can be reached and the DB isn't a bottleneck for this rate:
- Case 1 -> I've set
msg_out
limit as20/second
, notice that rate of flow mods aren't relatively constant and it's almost as if it were halved:
- Rerunning case 1 again -> Sometimes it even goes over 10 pps, so it's not like a hard half rate limit, but on average it looks like it is, it's actually a bit lower since the rate actually sent on wire isn't constant:
- Case 2 -> I've set
msg_out
limit as100/second
(default one), again, it behaved the same as the prior example except resulting in 50/second:
- Case 3 -> I've set
msg_out
limit as300/second
, it didn't get rate limited as expected and FlowMods were sent at a relatively constant rate of 100/sec matching the client rate, so proving that without sleeping for a constant rate it's not interfering either as expected:
Conclusion, I still suspect it might be the value it's sleeping for and/or maybe also try out other strategies type such as fixed window, I haven't further debugged this specfically this time, but it's behaving as I saw the other day. Let me know if you can try to reproduce on your env with vegeta
, otherwise we can try to pair on it. Thanks, David.
Other than that I also simulated a wrong configuration and it exited
|
kytos/core/buffers/buffers.py
Outdated
event = KytosEvent(name='kytos/core.shutdown') | ||
for buffer in self.get_all_buffers(): | ||
buffer.put(event) | ||
async def aget(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
David, another requirement to consider:
- Also, rate limit at the
put
andaput
level so before actually putting in the queue, essentially rate limiting as close to the source as possible as far as the queue is concerned. This also implies supporting both sync and async for this end of the queue.
Let me elaborate on the use case in the next comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is partly related a prior chaining discussion, but here it would only chain on the put|aput
where any publisher could register a particular rate limiter that it'll use. Or alternatively provide a context manager or something similar where publisher who need to be rate limiter before putting in the queue they can use that context or async context, although I believe the former would be less boilerplate and more ergonomic to use.
Considering that we've been using bounded queues, this will also contribute to a bit more of fairness with other concurrent producers who might be sending other type of messages regardless of using a priority queue or not (and even more when it's using a priority queue), when rate limiters are also being used on the put|aput
side. It's a subtle difference, but if the queue is properly sized for the current number of events going on, then it can make a significant difference since the queue won't be constantly getting full.
producer 1 (put)
core event_handler aget <--- Queue X <---- put|aput producer 2 (aput)
....
producer n (put|aput)
|
|
\/
________________________________
NApps event handlers run by
thread pool and/or asyncio task
________________________________
aget
working as a global rate limiter with a given identifier as it's currently implemented, and then also structure on put|aput
in a way where in the future it's also possible to throttle with a given unique id context, allowing it to essentially replacing implementing rate limiting at the producer level as we've been doing for mef_eline
and telemetry_int
(even though the global aget
rate limit will already solve a major part of the problem).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concrete use case example with our existing Kytos-ng NApps:
- globally rate limit any messages per
'destination.id'
onmsg_out
at100/second
AND individually rate limit any flows frommef_eline
at'30/sec'
and any fromtelemetry_int
at'30/sec'
.
On msg_out
, flow_manager
is responsible for pushing FlowMods, so it's centralizing the events, so on flow_manager
it could on behalf of its clients rate limit based on the flow owner
value:
mef_eline (via requests and KytosEvent)
aget <--- Queue msg_out <--- put flow_manager
telemetry_int (via KytosEvent)
Do you see what I mean?
Can you also support this or propose another approach to also solve the publisher side rate limit?
Gonna close this one, and reopen a new PR, in order to integrate the changes from #438. |
Closes #245.
Summary
This patch implements a rate limited
KytosEventBuffer
through the usage of thelimits
package.Local Tests
I was trying to test writing many flows to
stored_flows
, however, at the moment, I get a pymongo error on theflow_manager
side of things. Otherwise, normal tasks like creating EVCs seem to work well.End-To-End Tests
The following are the end to end test results: