Skip to content

Commit

Permalink
Merge pull request #46 from EspressoSystems/rm/delegated-sending
Browse files Browse the repository at this point in the history
better errors
  • Loading branch information
rob-maron authored Jun 10, 2024
2 parents 737889a + 3768290 commit 92ea926
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions cdn-proto/src/connection/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::{
task::AbortHandle,
time::timeout,
};
use tracing::warn;

use super::{middleware::Middleware, Bytes};
use crate::{
Expand Down Expand Up @@ -142,14 +143,16 @@ impl Connection {
match message {
BytesOrSoftClose::Bytes(message) => {
// Write the message to the stream
if write_length_delimited(&mut writer, message).await.is_err() {
if let Err(err) = write_length_delimited(&mut writer, message).await {
warn!("failed to write message to stream: {:?}", err);
receive_from_caller.close();
return;
};

// Flush the writer
// Is a no-op for everything but TCP+TLS
if writer.flush().await.is_err() {
if let Err(err) = writer.flush().await {
warn!("failed to flush writer: {:?}", err);
receive_from_caller.close();
return;
};
Expand All @@ -169,11 +172,22 @@ impl Connection {
// Spawn the task that receives from the stream and sends to the caller
let receiver_task = tokio::spawn(async move {
// While we can successfully read messages from the stream,
while let Ok(message) = read_length_delimited::<R>(&mut reader, &middleware).await {
if send_to_caller.send(message).await.is_err() {
send_to_caller.close();
return;
};
loop {
// Read the message from the stream
match read_length_delimited::<R>(&mut reader, &middleware).await {
Ok(message) => {
// If successful, send the message to the caller
if send_to_caller.send(message).await.is_err() {
send_to_caller.close();
return;
};
}
Err(err) => {
// If we fail to read the message, log the error and break
warn!("failed to read message from stream: {:?}", err);
break;
}
}
}
})
.abort_handle();
Expand Down

0 comments on commit 92ea926

Please sign in to comment.