-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhancement(vector source): implement client connection limits for grpc server #21072
enhancement(vector source): implement client connection limits for grpc server #21072
Conversation
b053c50
to
14066df
Compare
…pc server Related: vectordotdev#19457 Related: vectordotdev#10728
14066df
to
bb3f0d7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this to start a conversation @fpytloun ! It's an impressive attempt given you mentioned you have little Rust experience.
Unfortunately, I'm not sure if this is how we'd want to go about solving this because it applies the limits globally rather than per client.
I'm also not sure that the client is guaranteed to close the connection if it sees a resource exhausted error.
I think what we really want is for hyperium/tonic#1428 to be implemented (there is a PR as of last week: hyperium/tonic#1865). If that were added to tonic we could expose it in Vector.
Granted that only does connection age and not number of requests, but I'm imagining that might suffice for forcing clients to rebalance. What do you think? Should we just wait for that to be added to tonic
?
#[derive(Clone)] | ||
pub struct ConnectionLimit<S> { | ||
inner: S, | ||
request_count: Arc<Mutex<usize>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an atomic could be used here instead: https://doc.rust-lang.org/std/sync/atomic/struct.AtomicUsize.html
request_count: Arc::new(Mutex::new(0)), | ||
max_requests: max_requests, | ||
max_duration: max_duration, | ||
start_time: Instant::now(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think it might be better to initialize the start_time
in the call
method since this is closer to when it would be "running".
} | ||
} | ||
|
||
impl<S> Service<Request<Body>> for ConnectionLimit<S> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would end up applying one duration and request limit across all clients. In an ideal world, I think we would apply the limits per-client (that is per socket).
// Conditionally apply the ConnectionLimitLayer if any limits are set | ||
let service = ConnectionLimitLayer::new(max_requests, max_duration).layer(service); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be added in the list before as another layer after (or before) the DecompressionAndMetricsLayer.
@@ -22,14 +23,19 @@ use tracing::Span; | |||
mod decompression; | |||
pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer}; | |||
|
|||
mod connectionlimit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: module names usually use _
for delimiting multiple words so this would be connection_limit
.
return Err(Status::resource_exhausted( | ||
"Connection closed after reaching the limit.", | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the server close the connection if this error is returned? Or are we depending on the client closing it when it sees this error?
Thank you @jszwedko for reviewing this. So it seems this is a blind route and it might be better to close this and instead try to implement keepalive limit on client-side (vector sink) as that might be easier and more versatile also with possibility to use load-balancing between multiple upstream hosts 🤔 |
Related: #19457
Related: #10728
Adds two new configuration options for vector source to mitigate linked issues:
max_duration
- maximum time for client connectionmax_requests
- maximum number of requests per client connectionNone, both or any of these two parameters can be set.