Skip to content

Commit

Permalink
enhancement(sources): add config option to limit source tcp connectio…
Browse files Browse the repository at this point in the history
…ns (#10491)
  • Loading branch information
fuchsnj authored Dec 22, 2021
1 parent 41e5598 commit 633a27c
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 24 deletions.
1 change: 1 addition & 0 deletions benches/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ fn benchmark_real_world_1(c: &mut Criterion) {
keepalive: None,
tls: None,
receive_buffer_bytes: None,
connection_limit: None
}),
);

Expand Down
25 changes: 12 additions & 13 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct FluentConfig {
receive_buffer_bytes: Option<usize>,
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: AcknowledgementsConfig,
connection_limit: Option<u32>,
}

inventory::submit! {
Expand All @@ -48,6 +49,7 @@ impl GenerateConfig for FluentConfig {
tls: None,
receive_buffer_bytes: None,
acknowledgements: Default::default(),
connection_limit: Some(2),
})
.unwrap()
}
Expand All @@ -68,6 +70,7 @@ impl SourceConfig for FluentConfig {
self.receive_buffer_bytes,
cx,
self.acknowledgements,
self.connection_limit,
)
}

Expand Down Expand Up @@ -306,20 +309,14 @@ impl Decoder for FluentDecoder {
let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);

// check for unexpected EOF to indicate that we need more data
match res {
// can use or-patterns in 1.53
// https://github.com/rust-lang/rust/pull/79278
Err(DecodeError::Decode(decode::Error::InvalidDataRead(ref custom))) => {
if custom.kind() == io::ErrorKind::UnexpectedEof {
return Ok(None);
}
}
Err(DecodeError::Decode(decode::Error::InvalidMarkerRead(ref custom))) => {
if custom.kind() == io::ErrorKind::UnexpectedEof {
return Ok(None);
}
if let Err(DecodeError::Decode(
decode::Error::InvalidDataRead(ref custom)
| decode::Error::InvalidMarkerRead(ref custom),
)) = res
{
if custom.kind() == io::ErrorKind::UnexpectedEof {
return Ok(None);
}
_ => {}
}

(des.position() as usize, res)
Expand Down Expand Up @@ -809,6 +806,7 @@ mod tests {
keepalive: None,
receive_buffer_bytes: None,
acknowledgements: true.into(),
connection_limit: None,
}
.build(SourceContext::new_test(sender))
.await
Expand Down Expand Up @@ -1053,6 +1051,7 @@ mod integration_tests {
keepalive: None,
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
}
.build(SourceContext::new_test(sender))
.await
Expand Down
5 changes: 5 additions & 0 deletions src/sources/logstash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct LogstashConfig {
receive_buffer_bytes: Option<usize>,
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: AcknowledgementsConfig,
connection_limit: Option<u32>,
}

inventory::submit! {
Expand All @@ -46,6 +47,7 @@ impl GenerateConfig for LogstashConfig {
tls: None,
receive_buffer_bytes: None,
acknowledgements: Default::default(),
connection_limit: None,
})
.unwrap()
}
Expand All @@ -68,6 +70,7 @@ impl SourceConfig for LogstashConfig {
self.receive_buffer_bytes,
cx,
self.acknowledgements,
self.connection_limit,
)
}

Expand Down Expand Up @@ -591,6 +594,7 @@ mod test {
keepalive: None,
receive_buffer_bytes: None,
acknowledgements: true.into(),
connection_limit: None,
}
.build(SourceContext::new_test(sender))
.await
Expand Down Expand Up @@ -800,6 +804,7 @@ output {
keepalive: None,
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
}
.build(SourceContext::new_test(sender))
.await
Expand Down
1 change: 1 addition & 0 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl SourceConfig for SocketConfig {
config.receive_buffer_bytes(),
cx,
false.into(),
config.connection_limit,
)
}
Mode::Udp(config) => {
Expand Down
4 changes: 4 additions & 0 deletions src/sources/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct TcpConfig {
#[serde(default = "default_decoding")]
#[getset(get = "pub", set = "pub")]
decoding: Box<dyn DeserializerConfig>,
pub connection_limit: Option<u32>,
}

const fn default_shutdown_timeout_secs() -> u64 {
Expand All @@ -57,6 +58,7 @@ impl TcpConfig {
receive_buffer_bytes: Option<usize>,
framing: Option<Box<dyn FramingConfig>>,
decoding: Box<dyn DeserializerConfig>,
connection_limit: Option<u32>,
) -> Self {
Self {
address,
Expand All @@ -68,6 +70,7 @@ impl TcpConfig {
receive_buffer_bytes,
framing,
decoding,
connection_limit,
}
}

Expand All @@ -82,6 +85,7 @@ impl TcpConfig {
receive_buffer_bytes: None,
framing: None,
decoding: default_decoding(),
connection_limit: None,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct TcpConfig {
#[serde(default = "default_shutdown_timeout_secs")]
shutdown_timeout_secs: u64,
receive_buffer_bytes: Option<usize>,
connection_limit: Option<u32>,
}

impl TcpConfig {
Expand All @@ -73,6 +74,7 @@ impl TcpConfig {
tls: None,
shutdown_timeout_secs: default_shutdown_timeout_secs(),
receive_buffer_bytes: None,
connection_limit: None,
}
}
}
Expand Down Expand Up @@ -112,6 +114,7 @@ impl SourceConfig for StatsdConfig {
config.receive_buffer_bytes,
cx,
false.into(),
config.connection_limit,
)
}
#[cfg(unix)]
Expand Down
4 changes: 4 additions & 0 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum Mode {
keepalive: Option<TcpKeepaliveConfig>,
tls: Option<TlsConfig>,
receive_buffer_bytes: Option<usize>,
connection_limit: Option<u32>,
},
Udp {
address: SocketAddr,
Expand Down Expand Up @@ -80,6 +81,7 @@ impl GenerateConfig for SyslogConfig {
keepalive: None,
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
},
host_key: None,
max_length: crate::serde::default_max_length(),
Expand All @@ -103,6 +105,7 @@ impl SourceConfig for SyslogConfig {
keepalive,
tls,
receive_buffer_bytes,
connection_limit,
} => {
let source = SyslogTcpSource {
max_length: self.max_length,
Expand All @@ -118,6 +121,7 @@ impl SourceConfig for SyslogConfig {
receive_buffer_bytes,
cx,
false.into(),
connection_limit,
)
}
Mode::Udp {
Expand Down
21 changes: 10 additions & 11 deletions src/sources/util/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use std::{
fmt, io,
mem::drop,
net::{IpAddr, SocketAddr},
sync::Arc,
time::Duration,
};

use bytes::Bytes;
use futures::{future::BoxFuture, stream, FutureExt, Sink, SinkExt, StreamExt};
use listenfd::ListenFd;
use serde::{de, Deserialize, Deserializer, Serialize};
use smallvec::SmallVec;
use socket2::SockRef;
use std::net::{IpAddr, SocketAddr};
use std::{fmt, io, mem::drop, sync::Arc, time::Duration};
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
Expand Down Expand Up @@ -118,6 +112,7 @@ where
receive_buffer_bytes: Option<usize>,
cx: SourceContext,
acknowledgements: AcknowledgementsConfig,
max_connections: Option<u32>,
) -> crate::Result<crate::sources::Source> {
let out = cx
.out
Expand Down Expand Up @@ -150,9 +145,9 @@ where
let shutdown_clone = cx.shutdown.clone();

listener
.accept_stream()
.accept_stream_limited(max_connections)
.take_until(shutdown_clone)
.for_each(move |connection| {
.for_each(move |(connection, permit)| {
let shutdown_signal = cx.shutdown.clone();
let tripwire = tripwire.clone();
let source = self.clone();
Expand Down Expand Up @@ -202,7 +197,11 @@ where
);

tokio::spawn(
fut.map(move |()| drop(open_token)).instrument(span.clone()),
fut.map(move |()| {
drop(open_token);
drop(permit);
})
.instrument(span.clone()),
);
});
}
Expand Down
1 change: 1 addition & 0 deletions src/sources/vector/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl VectorConfig {
self.receive_buffer_bytes,
cx,
false.into(),
None,
)
}

Expand Down
41 changes: 41 additions & 0 deletions src/tls/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::{
future::Future,
net::SocketAddr,
Expand All @@ -8,6 +9,7 @@ use std::{
use futures::{future::BoxFuture, stream, FutureExt, Stream};
use openssl::ssl::{Ssl, SslAcceptor, SslMethod};
use snafu::ResultExt;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::{
io::{self, AsyncRead, AsyncWrite, ReadBuf},
net::{TcpListener, TcpStream},
Expand Down Expand Up @@ -72,6 +74,7 @@ impl MaybeTlsListener {
(self.accept().await, self)
}

#[allow(unused)]
pub(crate) fn accept_stream(
self,
) -> impl Stream<Item = crate::tls::Result<MaybeTlsIncomingStream<TcpStream>>> {
Expand All @@ -85,6 +88,44 @@ impl MaybeTlsListener {
})
}

#[allow(unused)]
pub(crate) fn accept_stream_limited(
self,
max_connections: Option<u32>,
) -> impl Stream<
Item = (
crate::tls::Result<MaybeTlsIncomingStream<TcpStream>>,
Option<OwnedSemaphorePermit>,
),
> {
let connection_semaphore =
max_connections.map(|max| Arc::new(Semaphore::new(max as usize)));

let mut semaphore_future = connection_semaphore
.clone()
.map(|x| Box::pin(x.acquire_owned()));
let mut accept = Box::pin(self.into_accept());
stream::poll_fn(move |context| {
let permit = match semaphore_future.as_mut() {
Some(semaphore) => match semaphore.as_mut().poll(context) {
Poll::Ready(permit) => {
semaphore.set(connection_semaphore.clone().unwrap().acquire_owned());
permit.ok()
}
Poll::Pending => return Poll::Pending,
},
None => None,
};
match accept.as_mut().poll(context) {
Poll::Ready((item, this)) => {
accept.set(this.into_accept());
Poll::Ready(Some((item, permit)))
}
Poll::Pending => Poll::Pending,
}
})
}

#[cfg(feature = "listenfd")]
pub(crate) fn local_addr(&self) -> Result<SocketAddr, std::io::Error> {
self.listener.local_addr()
Expand Down
2 changes: 2 additions & 0 deletions tests/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async fn test_tcp_syslog() {
keepalive: None,
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
}),
);
config.add_sink("out", &["in"], tcp_json_sink(out_addr.to_string()));
Expand Down Expand Up @@ -155,6 +156,7 @@ async fn test_octet_counting_syslog() {
keepalive: None,
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
}),
);
config.add_sink("out", &["in"], tcp_json_sink(out_addr.to_string()));
Expand Down
10 changes: 10 additions & 0 deletions website/cue/reference/components/sources/fluent.cue
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ components: sources: fluent: {
examples: ["0.0.0.0:\(_port)"]
}
}
connection_limit: {
common: false
description: "The max number of TCP connections that will be processed."
relevant_when: "mode = `tcp`"
required: false
type: uint: {
default: null
unit: "concurrency"
}
}
}

output: logs: line: {
Expand Down
10 changes: 10 additions & 0 deletions website/cue/reference/components/sources/logstash.cue
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ components: sources: logstash: {
examples: ["0.0.0.0:\(_port)"]
}
}
connection_limit: {
common: false
description: "The max number of TCP connections that will be processed."
relevant_when: "mode = `tcp`"
required: false
type: uint: {
default: null
unit: "concurrency"
}
}
}

output: logs: line: {
Expand Down
Loading

0 comments on commit 633a27c

Please sign in to comment.