Skip to content

Commit

Permalink
Merge pull request #623 from Nukesor/fix-dos
Browse files Browse the repository at this point in the history
Fix dos
  • Loading branch information
Nukesor authored Mar 5, 2025
2 parents b74a573 + 57e1a80 commit cc841c8
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ body:
Logs helps me to debug a problem, especially if the bug is something that's not clearly visible.
You can get detailed log output by launching `pueue` or `pueued` with the `-vvv` flag directly after the binary name.
In case of a panic/crash, please run the program with the `RUST_BACKTRACE=1` environment variable set.
That way we get a proper stack trace.
placeholder: |
```
Some log output here
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Upon updating Pueue and restarting the daemon, the previous state will be wiped,
- Callback templating arguments were html escaped by accident. [#564](https://github.com/Nukesor/pueue/pull/564)
- Print incompatible version warning info as a log message instead of plain stdout input, which broke json outputs [#562](https://github.com/Nukesor/pueue/issues/562).
- Fixed `-d` daemon mode on Windows. [#344](https://github.com/Nukesor/pueue/issues/344)
- Fixed a pueued crash when malformed secret exchange messages are sent by a connecting client [#619](https://github.com/Nukesor/pueue/issues/619).

### Remove

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ snap = "1.1"
strum = { version = "0.27", features = ["derive"] }
tokio = { version = "1.43", features = ["io-std", "rt-multi-thread", "time"] }
tracing = "0.1.41"
tracing-error = "0.2.1"
tracing-subscriber = { version = "0.3.19", features = [
"chrono",
"env-filter",
"fmt",
"local-time",
] }


[profile.release]
codegen-units = 1
Expand Down
9 changes: 2 additions & 7 deletions pueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,8 @@ tempfile = "3"
tokio.workspace = true
toml = "0.8"
tracing.workspace = true
tracing-error = "0.2.1"
tracing-subscriber = { version = "0.3.19", features = [
"chrono",
"env-filter",
"fmt",
"local-time",
] }
tracing-error.workspace = true
tracing-subscriber.workspace = true

[dev-dependencies]
assert_cmd = "2"
Expand Down
6 changes: 5 additions & 1 deletion pueue/src/daemon/network/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ async fn handle_incoming(
secret: Vec<u8>,
) -> Result<()> {
// Receive the secret once and check, whether the client is allowed to connect
let payload_bytes = receive_bytes(&mut stream).await?;
// We only allow max payload sizes of 4MB for this one.
// Daemon's might be exposed publicly and get random traffic, potentially announcing huge
// payloads that would result in an OOM.
let payload_bytes =
receive_bytes_with_max_size(&mut stream, Some(4 * (2usize.pow(20)))).await?;

// Didn't receive any bytes. The client disconnected.
if payload_bytes.is_empty() {
Expand Down
2 changes: 2 additions & 0 deletions pueue_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ portpicker = "0.1"
pretty_assertions.workspace = true
tempfile = "3"
tokio.workspace = true
tracing-error.workspace = true
tracing-subscriber.workspace = true

# --- Platform specific dependencies ---
# Unix
Expand Down
3 changes: 3 additions & 0 deletions pueue_lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub enum Error {
#[error("Couldn't serialize message:\n{}", .0)]
MessageSerialization(String),

#[error("Requested message size of {} with only {} being allowed.", .0, .1)]
MessageTooBig(usize, usize),

#[error("Error while reading configuration:\n{}", .0)]
ConfigDeserialization(String),

Expand Down
67 changes: 66 additions & 1 deletion pueue_lib/src/network/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,20 @@ pub async fn send_bytes(payload: &[u8], stream: &mut GenericStream) -> Result<()
Ok(())
}

pub async fn receive_bytes(stream: &mut GenericStream) -> Result<Vec<u8>, Error> {
receive_bytes_with_max_size(stream, None).await
}

/// Receive a byte stream. \
/// This is part of the basic protocol beneath all communication. \
///
/// 1. First of, the client sends a u64 as a 4byte vector in BigEndian mode, which specifies the
/// length of the payload we're going to receive.
/// 2. Receive chunks of [PACKET_SIZE] bytes until we finished all expected bytes.
pub async fn receive_bytes(stream: &mut GenericStream) -> Result<Vec<u8>, Error> {
pub async fn receive_bytes_with_max_size(
stream: &mut GenericStream,
max_size: Option<usize>,
) -> Result<Vec<u8>, Error> {
// Receive the header with the overall message size
let mut header = vec![0; 8];
stream
Expand All @@ -104,6 +111,21 @@ pub async fn receive_bytes(stream: &mut GenericStream) -> Result<Vec<u8>, Error>
let mut header = Cursor::new(header);
let message_size = ReadBytesExt::read_u64::<BigEndian>(&mut header)? as usize;

if let Some(max_size) = max_size {
if message_size > max_size {
error!(
"Client requested message size of {message_size}, but only {max_size} is allowed."
);
return Err(Error::MessageTooBig(message_size, max_size));
}
}

// Show a warning if we see unusually large payloads. In this case payloads that're bigger than
// 20MB, which is pretty large considering pueue is usually only sending a bit of text.
if message_size > (20 * (2usize.pow(20))) {
warn!("Client is sending a large payload: {message_size} bytes.");
}

// Buffer for the whole payload
let mut payload_bytes = Vec::with_capacity(message_size);

Expand Down Expand Up @@ -281,4 +303,47 @@ mod test {

Ok(())
}

/// Ensure there's no OOM if a huge payload during the handshake phase is being requested.
///
/// We limit the receiving buffer to ~4MB for the incoming secret to prevent (potentially
/// unintended) DoS attacks when something connect to Pueue and sends a malformed secret
/// payload.
#[tokio::test]
async fn test_restricted_payload_size() -> Result<(), Error> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;

let listener: GenericListener = Box::new(listener);

// Spawn a sub thread that:
// 1. Accepts a new connection.
// 2. Sends a malformed payload.
task::spawn(async move {
let mut stream = listener.accept().await.unwrap();

// Send a payload of 9 bytes to the daemon receiver.
// The first 8 bytes determine the payload size in BigEndian.
// This payload requests 2^64 bytes of memory for the secret.
stream
.write_all(&[128, 0, 0, 0, 0, 0, 0, 0, 0])
.await
.unwrap();
});

// Create a receiver stream
let mut client: GenericStream = Box::new(TcpStream::connect(&addr).await?);
// Wait for a short time to allow the sender to send the message
tokio::time::sleep(Duration::from_millis(500)).await;

// Get the message while restricting the payload size to 4MB
let result = receive_bytes_with_max_size(&mut client, Some(4 * 2usize.pow(20))).await;

assert!(
result.is_err(),
"The payload should be rejected due to large size"
);

Ok(())
}
}
55 changes: 55 additions & 0 deletions pueue_lib/tests/helper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use portpicker::pick_unused_port;
use pueue_lib::settings::*;
use tempfile::{Builder, TempDir};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{
EnvFilter, Layer, Registry, field::MakeExt, filter::FromEnvError, fmt::time::ChronoLocal,
layer::SubscriberExt, util::SubscriberInitExt,
};

pub fn get_shared_settings(
#[cfg_attr(target_os = "windows", allow(unused_variables))] use_unix_socket: bool,
Expand Down Expand Up @@ -33,3 +38,53 @@ pub fn get_shared_settings(

(shared_settings, tempdir)
}

#[allow(dead_code)]
pub fn install_tracing(verbosity: u8) -> Result<(), FromEnvError> {
let mut pretty = false;
let level = match verbosity {
0 => LevelFilter::WARN,
1 => LevelFilter::INFO,
2 => LevelFilter::DEBUG,
3 => LevelFilter::TRACE,
_ => {
pretty = true;
LevelFilter::TRACE
}
};

// tries to find local offset internally
let timer = ChronoLocal::new("%H:%M:%S".into());

type GenericLayer<S> = Box<dyn tracing_subscriber::Layer<S> + Send + Sync>;
let fmt_layer: GenericLayer<_> = match pretty {
false => Box::new(
tracing_subscriber::fmt::layer()
.map_fmt_fields(|f| f.debug_alt())
.with_timer(timer)
.with_writer(std::io::stderr),
),
true => Box::new(
tracing_subscriber::fmt::layer()
.pretty()
.with_timer(timer)
.with_target(true)
.with_thread_ids(false)
.with_thread_names(true)
.with_level(true)
.with_ansi(true)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE)
.with_writer(std::io::stderr),
),
};
let filter_layer = EnvFilter::builder()
.with_default_directive(level.into())
.from_env()?;

Registry::default()
.with(fmt_layer.with_filter(filter_layer))
.with(tracing_error::ErrorLayer::default())
.init();

Ok(())
}

0 comments on commit cc841c8

Please sign in to comment.