Skip to content

Commit

Permalink
Merge pull request #146 from kureuil/error-categorization
Browse files Browse the repository at this point in the history
Properly categorize errors internally
  • Loading branch information
Keruspe authored Nov 20, 2018
2 parents b213113 + dcb59fc commit 59f2f18
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 37 deletions.
23 changes: 12 additions & 11 deletions futures/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use lapin_async;
use lapin_async::api::{ChannelState, RequestId};
use lapin_async::connection::Connection;

use error::Error;
use error::{Error, ErrorKind};
use transport::*;
use message::BasicGetMessage;
use types::FieldTable;
Expand Down Expand Up @@ -270,7 +270,7 @@ impl<T: AsyncRead+AsyncWrite+Send+Sync+'static> Channel<T> {
transport: channel_transport.clone(),
}))
} else {
return Err(Error::new("The maximum number of channels for this connection has been reached"));
return Err(ErrorKind::ChannelLimitReached.into());
}
}).and_then(|channel| {
let channel_id = channel.id;
Expand All @@ -281,12 +281,13 @@ impl<T: AsyncRead+AsyncWrite+Send+Sync+'static> Channel<T> {
let transport = lock_transport!(transport);

match transport.conn.get_state(channel_id) {
Some(ChannelState::Connected) => return Ok(Async::Ready(())),
Some(ChannelState::Error) => return Err(Error::new("Failed to open channel")),
Some(ChannelState::Closed) => return Err(Error::new("Failed to open channel")),
_ => {
Some(ChannelState::Connected) => Ok(Async::Ready(())),
Some(ChannelState::Error) | Some(ChannelState::Closed) => {
Err(ErrorKind::ChannelOpenFailed.into())
},
_ => {
task::current().notify();
return Ok(Async::NotReady);
Ok(Async::NotReady)
}
}
})
Expand Down Expand Up @@ -560,7 +561,7 @@ impl<T: AsyncRead+AsyncWrite+Send+Sync+'static> Channel<T> {
Some(answer) => if answer {
Ok(Async::Ready(Some(request_id)))
} else {
Err(Error::new("basic get returned empty"))
Err(ErrorKind::EmptyBasicGet.into())
},
None => {
task::current().notify();
Expand Down Expand Up @@ -637,7 +638,7 @@ impl<T: AsyncRead+AsyncWrite+Send+Sync+'static> Channel<T> {
}).map(|_| ())
}

fn run_on_locked_transport_full<Action, Finished>(&self, method: &str, error: &str, action: Action, finished: Finished, payload: Option<(Vec<u8>, BasicProperties)>) -> impl Future<Item = Option<RequestId>, Error = Error> + Send + 'static
fn run_on_locked_transport_full<Action, Finished>(&self, method: &str, error_msg: &str, action: Action, finished: Finished, payload: Option<(Vec<u8>, BasicProperties)>) -> impl Future<Item = Option<RequestId>, Error = Error> + Send + 'static
where Action: 'static + Send + FnOnce(&mut AMQPTransport<T>) -> Result<Option<RequestId>, lapin_async::error::Error>,
Finished: 'static + Send + Fn(&mut Connection, RequestId) -> Poll<Option<RequestId>, Error> {
trace!("run on locked transport; method={:?}", method);
Expand All @@ -646,7 +647,7 @@ impl<T: AsyncRead+AsyncWrite+Send+Sync+'static> Channel<T> {
let _transport = self.transport.clone();
let _method = method.to_string();
let method = method.to_string();
let error = error.to_string();
let error_msg = error_msg.to_string();
// Tweak to make the borrow checker happy, see below for more explaination
let mut action = Some(action);
let mut payload = Some(payload);
Expand All @@ -661,7 +662,7 @@ impl<T: AsyncRead+AsyncWrite+Send+Sync+'static> Channel<T> {
// FnMut can be called several time and action which is an FnOnce can only be called
// once (which is implemented as a ownership transfer).
match action.take().unwrap()(&mut transport) {
Err(e) => Err(Error::new(format!("{}: {:?}", error, e))),
Err(e) => Err(ErrorKind::ProtocolError(error_msg.clone(), e).into()),
Ok(request_id) => {
trace!("run on locked transport; method={:?} request_id={:?}", _method, request_id);

Expand Down
6 changes: 3 additions & 3 deletions futures/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::{Duration,Instant};

use transport::*;
use channel::{Channel, ConfirmSelectOptions};
use error::Error;
use error::{Error, ErrorKind};

/// the Client structures connects to a server and creates channels
//#[derive(Clone)]
Expand Down Expand Up @@ -66,7 +66,7 @@ impl FromStr for ConnectionOptions {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let uri = AMQPUri::from_str(s).map_err(|e| Error::new(e.to_string()))?;
let uri = AMQPUri::from_str(s).map_err(|e| ErrorKind::InvalidUri(e))?;
Ok(ConnectionOptions::from_uri(uri))
}
}
Expand All @@ -78,7 +78,7 @@ fn heartbeat_pulse<T: AsyncRead+AsyncWrite+Send+'static>(transport: Arc<Mutex<AM
Err(())
} else {
Ok(Interval::new(Instant::now(), Duration::from_secs(heartbeat.into()))
.map_err(|err| Error::new(err.to_string())))
.map_err(|e| ErrorKind::HeartbeatTimer(e).into()))
};

future::select_all(vec![
Expand Down
4 changes: 2 additions & 2 deletions futures/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio_io::{AsyncRead,AsyncWrite};
use std::collections::VecDeque;
use std::sync::{Arc,Mutex};

use error::Error;
use error::{Error, ErrorKind};
use message::Delivery;
use transport::*;

Expand Down Expand Up @@ -85,7 +85,7 @@ impl<T: AsyncRead+AsyncWrite+Sync+Send+'static> Stream for Consumer<T> {
let mut inner = match self.inner.lock() {
Ok(inner) => inner,
Err(_) => if self.inner.is_poisoned() {
return Err(Error::new("Consumer mutex is poisoned"))
return Err(ErrorKind::PoisonedMutex.into())
} else {
task::current().notify();
return Ok(Async::NotReady)
Expand Down
78 changes: 72 additions & 6 deletions futures/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,71 @@
use std::fmt;
use failure::{Backtrace, Context, Fail};
use lapin_async;
use std::fmt;
use std::io;
use tokio_timer;

use transport::CodecError;

/// The type of error that can be returned in this crate.
///
/// Instead of implementing the `Error` trait provided by the standard library,
/// it implemented the `Fail` trait provided by the `failure` crate. Doing so
/// means that this type guaranteed to be both sendable and usable across
/// threads, and that you'll be able to use the downcasting feature of the
/// `failure::Error` type.
#[derive(Debug)]
pub struct Error {
inner: Context<String>,
inner: Context<ErrorKind>,
}

/// The different kinds of errors that can be reported.
///
/// Event though we expose the complete enumeration of possible error variants, it is not
/// considered stable to exhaustively match on this enumeration: do it at your own risk.
#[derive(Debug, Fail)]
pub enum ErrorKind {
#[fail(display = "The maximum number of channels for this connection has been reached")]
ChannelLimitReached,
#[fail(display = "Failed to open channel")]
ChannelOpenFailed,
#[fail(display = "Couldn't decode incoming frame: {}", _0)]
Decode(CodecError),
#[fail(display = "The connection was closed by the remote peer")]
ConnectionClosed,
#[fail(display = "Failed to connect: {}", _0)]
ConnectionFailed(#[fail(cause)] io::Error),
#[fail(display = "Basic get returned empty")]
EmptyBasicGet,
#[fail(display = "Couldn't encode outcoming frame: {}", _0)]
Encode(CodecError),
#[fail(display = "The timer of the heartbeat encountered an error: {}", _0)]
HeartbeatTimer(#[fail(cause)] tokio_timer::Error),
#[fail(display = "Failed to handle incoming frame: {:?}", _0)]
// FIXME: mark lapin_async's Error as cause once it implements Fail
InvalidFrame(lapin_async::error::Error),
#[fail(display = "Couldn't parse URI: {}", _0)]
InvalidUri(String),
#[fail(display = "Transport mutex is poisoned")]
PoisonedMutex,
#[fail(display = "{}: {:?}", _0, _1)]
// FIXME: mark lapin_async's Error as cause once it implements Fail
ProtocolError(String, lapin_async::error::Error),
/// A hack to prevent developers from exhaustively match on the enum's variants
///
/// The purpose of this variant is to let the `ErrorKind` enumeration grow more variants
/// without it being a breaking change for users. It is planned for the language to provide
/// this functionnality out of the box, though it has not been [stabilized] yet.
///
/// [stabilized]: https://github.com/rust-lang/rust/issues/44109
#[doc(hidden)]
#[fail(display = "lapin_futures::error::ErrorKind::__Nonexhaustive: this should not be printed")]
__Nonexhaustive,
}

impl Error {
pub(crate) fn new(message: impl Into<String>) -> Self {
Error {
inner: Context::new(message.into())
}
/// Return the underlying `ErrorKind`
pub fn kind(&self) -> &ErrorKind {
self.inner.get_context()
}
}

Expand All @@ -30,3 +84,15 @@ impl fmt::Display for Error {
fmt::Display::fmt(&self.inner, f)
}
}

impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Error {
Error { inner: Context::new(kind) }
}
}

impl From<Context<ErrorKind>> for Error {
fn from(inner: Context<ErrorKind>) -> Error {
Error { inner: inner }
}
}
27 changes: 12 additions & 15 deletions futures/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_io::{AsyncRead,AsyncWrite};

use channel::BasicProperties;
use client::ConnectionOptions;
use error::Error;
use error::{Error, ErrorKind};

#[derive(Fail, Debug)]
pub enum CodecError {
Expand Down Expand Up @@ -135,7 +135,7 @@ impl<T> AMQPTransport<T>
conn.set_heartbeat(options.heartbeat);

future::result(conn.connect())
.map_err(|e| Error::new(format!("Failed to connect: {:?}", e)))
.map_err(|e| ErrorKind::ConnectionFailed(e).into())
.and_then(|_| {
let codec = AMQPCodec {
frame_max: conn.configuration.frame_max,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<T> AMQPTransport<T>
Ok(Async::Ready(Some(frame))) => {
trace!("transport poll_recv; frame={:?}", frame);
if let Err(e) = self.conn.handle_frame(frame) {
return Err(Error::new(format!("failed to handle frame: {:?}", e)));
return Err(ErrorKind::InvalidFrame(e).into());
}
},
Ok(Async::Ready(None)) => {
Expand All @@ -212,7 +212,7 @@ impl<T> AMQPTransport<T>
},
Err(e) => {
error!("transport poll_recv; status=Err({:?})", e);
return Err(Error::new(e.to_string()));
return Err(ErrorKind::Decode(e).into());
},
};
}
Expand All @@ -223,17 +223,14 @@ impl<T> AMQPTransport<T>
fn poll_send(&mut self) -> Poll<(), Error> {
while let Some(frame) = self.conn.next_frame() {
trace!("transport poll_send; frame={:?}", frame);
match self.start_send(frame) {
Ok(AsyncSink::Ready) => {
match self.start_send(frame)? {
AsyncSink::Ready => {
trace!("transport poll_send; status=Ready");
},
Ok(AsyncSink::NotReady(frame)) => {
}
AsyncSink::NotReady(frame) => {
trace!("transport poll_send; status=NotReady");
self.conn.frame_queue.push_front(frame);
return Ok(Async::NotReady);
},
Err(e) => {
return Err(Error::new(e.to_string()));
}
}
}
Expand All @@ -252,7 +249,7 @@ impl<T> Stream for AMQPTransport<T>
trace!("transport poll");
if let Async::Ready(()) = self.poll_recv()? {
trace!("poll transport; status=Ready");
return Err(Error::new("The connection was closed by the remote peer"));
return Err(ErrorKind::ConnectionClosed.into());
}
self.poll_send().map(|r| r.map(Some))
}
Expand All @@ -268,13 +265,13 @@ impl <T> Sink for AMQPTransport<T>
fn start_send(&mut self, frame: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
trace!("transport start_send; frame={:?}", frame);
self.upstream.start_send(frame)
.map_err(|e| Error::new(e.to_string()))
.map_err(|e| ErrorKind::Encode(e).into())
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
trace!("transport poll_complete");
self.upstream.poll_complete()
.map_err(|e| Error::new(e.to_string()))
.map_err(|e| ErrorKind::Encode(e).into())
}
}

Expand Down Expand Up @@ -316,7 +313,7 @@ macro_rules! lock_transport (
match $t.lock() {
Ok(t) => t,
Err(_) => if $t.is_poisoned() {
return Err(Error::new("Transport mutex is poisoned"))
return Err($crate::error::ErrorKind::PoisonedMutex.into());
} else {
task::current().notify();
return Ok(Async::NotReady)
Expand Down

0 comments on commit 59f2f18

Please sign in to comment.