Skip to content

Commit

Permalink
Feature/result (#35)
Browse files Browse the repository at this point in the history
* Numerous improvements to Result and error-handling

- Rename NngResult to more canonical `Result` and get rid of NngReturn
- Rename NngFail to more canonical runng::Error
	- TryFrom conversion for NngFail
	- impl std::error::Error (fix #24)
	- Err and Unknown enums now Errno and UnknownErrno, respectively.
- Rename `NngFail::succeed_then()` to `zero_map()`
	- Zero is `Ok` and then it's the same as `Result::map`
- Tests use fewer glob (*) imports
- impl AsRef/AsMut<[u8]> for Alloc and NngMsg (#25)
	- https://rust-lang-nursery.github.io/api-guidelines/interoperability.html#conversions-use-the-standard-traits-from-asref-asmut-c-conv-traits
  • Loading branch information
jeikabu authored Feb 12, 2019
1 parent 26a04e8 commit cc67d5e
Show file tree
Hide file tree
Showing 44 changed files with 540 additions and 494 deletions.
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,3 @@ members = [
"runng",
"runng_derive",
]

[patch.crates-io]
cty = { git = 'https://github.com/jeikabu/cty' }
8 changes: 4 additions & 4 deletions runng/src/aio.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Asynchronous I/O
use super::*;
use crate::*;
use runng_sys::*;
use std::ptr;

Expand Down Expand Up @@ -36,17 +36,17 @@ impl NngAio {
}

/// Finish initialization of `nng_aio`. See [nng_aio_alloc](https://nanomsg.github.io/nng/man/v1.1.0/nng_aio_alloc.3).
pub fn init(&mut self, callback: AioCallback, arg: AioCallbackArg) -> NngReturn {
pub fn init(&mut self, callback: AioCallback, arg: AioCallbackArg) -> Result<()> {
unsafe {
let mut aio: *mut nng_aio = ptr::null_mut();
//https://doc.rust-lang.org/stable/book/first-edition/ffi.html#callbacks-from-c-code-to-rust-functions
let res = nng_aio_alloc(&mut aio, Some(callback), arg);
self.aio = aio;
NngFail::from_i32(res)
Error::from_i32(res)
}
}

pub(crate) fn register_aio<T>(arg: T, callback: AioCallback) -> NngResult<Box<T>>
pub(crate) fn register_aio<T>(arg: T, callback: AioCallback) -> Result<Box<T>>
where
T: Aio,
{
Expand Down
30 changes: 14 additions & 16 deletions runng/src/asyncio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ use std::collections::VecDeque;
/// Context for asynchrounous I/O.
pub trait AsyncContext: Sized {
/// Create a new asynchronous context using specified socket.
fn create(socket: NngSocket) -> NngResult<Self>;
fn create(socket: NngSocket) -> Result<Self>;
}

pub trait AsyncStreamContext: Sized {
/// Create a new asynchronous context using specified socket.
fn create(socket: NngSocket, buffer: usize) -> NngResult<Self>;
//fn create_unbounded(socket: NngSocket) -> NngResult<Self>;
fn create(socket: NngSocket, buffer: usize) -> Result<Self>;
//fn create_unbounded(socket: NngSocket) -> Result<Self>;
}

/// A `Socket` that can be turned into a context for asynchronous I/O.
Expand All @@ -43,9 +43,10 @@ pub trait AsyncStreamContext: Sized {
/// use runng::{
/// *,
/// asyncio::*,
/// factory::latest::ProtocolFactory,
/// };
/// fn test() -> Result<(), NngFail> {
/// let factory = Latest::default();
/// fn test() -> runng::Result<()> {
/// let factory = ProtocolFactory::default();
/// let pusher = factory.pusher_open()?.listen("inproc://test")?;
/// let mut push_ctx = pusher.create_async()?;
/// Ok(())
Expand All @@ -56,7 +57,7 @@ pub trait AsyncSocket: Socket {
type ContextType: AsyncContext;

/// Turns the `Socket` into an asynchronous context
fn create_async(&self) -> NngResult<Self::ContextType> {
fn create_async(&self) -> Result<Self::ContextType> {
let socket = self.socket().clone();
let ctx = Self::ContextType::create(socket)?;
Ok(ctx)
Expand All @@ -68,14 +69,14 @@ pub trait AsyncStream: Socket {
type ContextType: AsyncStreamContext;

/// Turns the `Socket` into an asynchronous context
fn create_async_stream(&self, buffer: usize) -> NngResult<Self::ContextType> {
fn create_async_stream(&self, buffer: usize) -> Result<Self::ContextType> {
let socket = self.socket().clone();
let ctx = Self::ContextType::create(socket, buffer)?;
Ok(ctx)
}
}

fn try_signal_complete(sender: &mut mpsc::Sender<NngResult<NngMsg>>, message: NngResult<NngMsg>) {
fn try_signal_complete(sender: &mut mpsc::Sender<Result<NngMsg>>, message: Result<NngMsg>) {
let res = sender.try_send(message);
if let Err(err) = res {
if err.is_disconnected() {
Expand All @@ -90,12 +91,12 @@ fn try_signal_complete(sender: &mut mpsc::Sender<NngResult<NngMsg>>, message: Nn

#[derive(Debug, Default)]
struct WorkQueue {
waiting: VecDeque<oneshot::Sender<NngResult<NngMsg>>>,
ready: VecDeque<NngResult<NngMsg>>,
waiting: VecDeque<oneshot::Sender<Result<NngMsg>>>,
ready: VecDeque<Result<NngMsg>>,
}

impl WorkQueue {
fn push_back(&mut self, message: NngResult<NngMsg>) {
fn push_back(&mut self, message: Result<NngMsg>) {
if let Some(sender) = self.waiting.pop_front() {
sender.send(message).unwrap();
} else {
Expand All @@ -104,11 +105,8 @@ impl WorkQueue {
}
}

trait NngSink:
Sink<SinkItem = NngResult<NngMsg>, SinkError = mpsc::SendError<NngResult<NngMsg>>>
{
}
impl<T: Sink<SinkItem = NngResult<NngMsg>, SinkError = mpsc::SendError<NngResult<NngMsg>>>> NngSink
trait NngSink: Sink<SinkItem = Result<NngMsg>, SinkError = mpsc::SendError<Result<NngMsg>>> {}
impl<T: Sink<SinkItem = Result<NngMsg>, SinkError = mpsc::SendError<Result<NngMsg>>>> NngSink
for T
{
}
6 changes: 3 additions & 3 deletions runng/src/asyncio/pair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct PairAsyncHandle {
}

impl AsyncContext for PairAsyncHandle {
fn create(socket: NngSocket) -> NngResult<Self> {
fn create(socket: NngSocket) -> Result<Self> {
let push = PushAsyncHandle::create(socket.clone())?;
let pull = PullAsyncHandle::create(socket)?;
let ctx = Self { push, pull };
Expand All @@ -20,13 +20,13 @@ impl AsyncContext for PairAsyncHandle {
}

impl AsyncPush for PairAsyncHandle {
fn send(&mut self, msg: NngMsg) -> oneshot::Receiver<NngReturn> {
fn send(&mut self, msg: NngMsg) -> oneshot::Receiver<Result<()>> {
self.push.send(msg)
}
}

impl ReadAsync for PairAsyncHandle {
fn receive(&mut self) -> Box<dyn Future<Item = NngResult<NngMsg>, Error = oneshot::Canceled>> {
fn receive(&mut self) -> Box<dyn Future<Item = Result<NngMsg>, Error = oneshot::Canceled>> {
self.pull.receive()
}
}
6 changes: 3 additions & 3 deletions runng/src/asyncio/pair_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct PairStreamHandle {
}

impl AsyncStreamContext for PairStreamHandle {
fn create(socket: NngSocket, buffer: usize) -> NngResult<Self> {
fn create(socket: NngSocket, buffer: usize) -> Result<Self> {
let push = PushAsyncHandle::create(socket.clone())?;
let pull = PullAsyncStream::create(socket, buffer)?;
let ctx = Self { push, pull };
Expand All @@ -20,13 +20,13 @@ impl AsyncStreamContext for PairStreamHandle {
}

impl AsyncPush for PairStreamHandle {
fn send(&mut self, msg: NngMsg) -> oneshot::Receiver<NngReturn> {
fn send(&mut self, msg: NngMsg) -> oneshot::Receiver<Result<()>> {
self.push.send(msg)
}
}

impl AsyncPull for PairStreamHandle {
fn receive(&mut self) -> Option<mpsc::Receiver<NngResult<NngMsg>>> {
fn receive(&mut self) -> Option<mpsc::Receiver<Result<NngMsg>>> {
self.pull.receive()
}
}
14 changes: 7 additions & 7 deletions runng/src/asyncio/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ struct PullAioArg {
}

impl PullAioArg {
pub fn create(socket: NngSocket) -> NngResult<Box<Self>> {
pub fn create(socket: NngSocket) -> Result<Box<Self>> {
let aio = NngAio::new(socket);
let queue = Mutex::new(WorkQueue::default());
let arg = Self { aio, queue };
Expand Down Expand Up @@ -50,19 +50,19 @@ pub struct PullAsyncHandle {
}

impl AsyncContext for PullAsyncHandle {
fn create(socket: NngSocket) -> NngResult<Self> {
fn create(socket: NngSocket) -> Result<Self> {
let aio_arg = PullAioArg::create(socket)?;
Ok(Self { aio_arg })
}
}

pub trait ReadAsync {
// FIXME: Can change this to -> impl Future later?
fn receive(&mut self) -> Box<dyn Future<Item = NngResult<NngMsg>, Error = oneshot::Canceled>>;
fn receive(&mut self) -> Box<dyn Future<Item = Result<NngMsg>, Error = oneshot::Canceled>>;
}

impl ReadAsync for PullAsyncHandle {
fn receive(&mut self) -> Box<dyn Future<Item = NngResult<NngMsg>, Error = oneshot::Canceled>> {
fn receive(&mut self) -> Box<dyn Future<Item = Result<NngMsg>, Error = oneshot::Canceled>> {
let mut queue = self.aio_arg.queue.lock().unwrap();
// If a value is ready return it immediately. Otherwise
if let Some(item) = queue.ready.pop_front() {
Expand All @@ -79,15 +79,15 @@ unsafe extern "C" fn read_callback(arg: AioCallbackArg) {
let ctx = &mut *(arg as *mut PullAioArg);
let aio = ctx.aio.nng_aio();
let aio_res = nng_aio_result(aio);
let res = NngFail::from_i32(aio_res);
let res = Error::from_i32(aio_res);
trace!("read_callback::{:?}", res);
match res {
Err(res) => {
match res {
// nng_aio_close() calls nng_aio_stop which nng_aio_abort(NNG_ECANCELED) and waits.
// If we call start_receive() it will fail with ECANCELED and we infinite loop...
NngFail::Err(nng_errno_enum::NNG_ECLOSED)
| NngFail::Err(nng_errno_enum::NNG_ECANCELED) => {
Error::Errno(nng_errno_enum::NNG_ECLOSED)
| Error::Errno(nng_errno_enum::NNG_ECANCELED) => {
debug!("read_callback {:?}", res);
}
_ => {
Expand Down
29 changes: 13 additions & 16 deletions runng/src/asyncio/pull_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ enum PullState {
struct PullContextAioArg {
aio: NngAio,
state: PullState,
sender: mpsc::Sender<NngResult<NngMsg>>,
sender: mpsc::Sender<Result<NngMsg>>,
}

impl PullContextAioArg {
pub fn create(
socket: NngSocket,
sender: mpsc::Sender<NngResult<NngMsg>>,
) -> NngResult<Box<Self>> {
pub fn create(socket: NngSocket, sender: mpsc::Sender<Result<NngMsg>>) -> Result<Box<Self>> {
let aio = NngAio::new(socket);
let arg = Self {
aio,
Expand Down Expand Up @@ -58,12 +55,12 @@ impl Aio for PullContextAioArg {
#[derive(Debug)]
pub struct PullAsyncStream {
aio_arg: Box<PullContextAioArg>,
receiver: Option<mpsc::Receiver<NngResult<NngMsg>>>,
receiver: Option<mpsc::Receiver<Result<NngMsg>>>,
}

impl AsyncStreamContext for PullAsyncStream {
fn create(socket: NngSocket, buffer: usize) -> NngResult<Self> {
let (sender, receiver) = mpsc::channel::<NngResult<NngMsg>>(buffer);
fn create(socket: NngSocket, buffer: usize) -> Result<Self> {
let (sender, receiver) = mpsc::channel::<Result<NngMsg>>(buffer);
let aio_arg = PullContextAioArg::create(socket, sender)?;
let receiver = Some(receiver);
Ok(Self { aio_arg, receiver })
Expand All @@ -73,11 +70,11 @@ impl AsyncStreamContext for PullAsyncStream {
/// Trait for asynchronous contexts that can receive a stream of messages.
pub trait AsyncPull {
/// Asynchronously receive a stream of messages.
fn receive(&mut self) -> Option<mpsc::Receiver<NngResult<NngMsg>>>;
fn receive(&mut self) -> Option<mpsc::Receiver<Result<NngMsg>>>;
}

impl AsyncPull for PullAsyncStream {
fn receive(&mut self) -> Option<mpsc::Receiver<NngResult<NngMsg>>> {
fn receive(&mut self) -> Option<mpsc::Receiver<Result<NngMsg>>> {
let receiver = self.receiver.take();
if receiver.is_some() {
self.aio_arg.start_receive();
Expand All @@ -94,14 +91,14 @@ unsafe extern "C" fn pull_callback(arg: AioCallbackArg) {
PullState::Receiving => {
let aio = ctx.aio.nng_aio();
let aio_res = nng_aio_result(aio);
let res = NngFail::from_i32(aio_res);
let res = Error::from_i32(aio_res);
match res {
Err(res) => {
match res {
// nng_aio_close() calls nng_aio_stop which nng_aio_abort(NNG_ECANCELED) and waits.
// If we call start_receive() it will fail with ECANCELED and we infinite loop...
NngFail::Err(nng_errno_enum::NNG_ECLOSED)
| NngFail::Err(nng_errno_enum::NNG_ECANCELED) => {
Error::Errno(nng_errno_enum::NNG_ECLOSED)
| Error::Errno(nng_errno_enum::NNG_ECANCELED) => {
debug!("pull_callback {:?}", res);
}
_ => {
Expand Down Expand Up @@ -130,14 +127,14 @@ pub struct SubscribeAsyncHandle {
}

impl AsyncPull for SubscribeAsyncHandle {
fn receive(&mut self) -> Option<mpsc::Receiver<NngResult<NngMsg>>> {
fn receive(&mut self) -> Option<mpsc::Receiver<Result<NngMsg>>> {
self.ctx.receive()
}
}

impl AsyncStreamContext for SubscribeAsyncHandle {
/// Create an asynchronous context using the specified socket.
fn create(socket: NngSocket, buffer: usize) -> NngResult<Self> {
fn create(socket: NngSocket, buffer: usize) -> Result<Self> {
let ctx = PullAsyncStream::create(socket, buffer)?;
let ctx = Self { ctx };
Ok(ctx)
Expand All @@ -151,7 +148,7 @@ impl InternalSocket for SubscribeAsyncHandle {
}

impl Subscribe for SubscribeAsyncHandle {
fn subscribe(&self, topic: &[u8]) -> NngReturn {
fn subscribe(&self, topic: &[u8]) -> Result<()> {
unsafe { subscribe(self.socket().nng_socket(), topic) }
}
}
16 changes: 8 additions & 8 deletions runng/src/asyncio/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ enum PushState {
struct PushContextAioArg {
aio: NngAio,
state: PushState,
sender: Option<oneshot::Sender<NngReturn>>,
sender: Option<oneshot::Sender<Result<()>>>,
}

impl PushContextAioArg {
pub fn create(socket: NngSocket) -> NngResult<Box<Self>> {
pub fn create(socket: NngSocket) -> Result<Box<Self>> {
let aio = NngAio::new(socket);
let arg = Self {
aio,
Expand All @@ -34,7 +34,7 @@ impl PushContextAioArg {
NngAio::register_aio(arg, publish_callback)
}

pub fn send(&mut self, msg: NngMsg, sender: oneshot::Sender<NngReturn>) {
pub fn send(&mut self, msg: NngMsg, sender: oneshot::Sender<Result<()>>) {
if self.state != PushState::Ready {
panic!();
}
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct PushAsyncHandle {

impl AsyncContext for PushAsyncHandle {
/// Create an asynchronous context using the specified socket.
fn create(socket: NngSocket) -> NngResult<Self> {
fn create(socket: NngSocket) -> Result<Self> {
let aio_arg = PushContextAioArg::create(socket)?;
Ok(Self { aio_arg })
}
Expand All @@ -80,12 +80,12 @@ impl AsyncContext for PushAsyncHandle {
/// Trait for asynchronous contexts that can send a message.
pub trait AsyncPush {
/// Asynchronously send a message.
fn send(&mut self, msg: NngMsg) -> oneshot::Receiver<NngReturn>;
fn send(&mut self, msg: NngMsg) -> oneshot::Receiver<Result<()>>;
}

impl AsyncPush for PushAsyncHandle {
fn send(&mut self, msg: NngMsg) -> oneshot::Receiver<NngReturn> {
let (sender, receiver) = oneshot::channel::<NngReturn>();
fn send(&mut self, msg: NngMsg) -> oneshot::Receiver<Result<()>> {
let (sender, receiver) = oneshot::channel::<Result<()>>();
self.aio_arg.send(msg, sender);

receiver
Expand All @@ -100,7 +100,7 @@ unsafe extern "C" fn publish_callback(arg: AioCallbackArg) {
PushState::Ready => panic!(),
PushState::Sending => {
let nng_aio = ctx.aio.nng_aio();
let res = NngFail::from_i32(nng_aio_result(nng_aio));
let res = Error::from_i32(nng_aio_result(nng_aio));
if let Err(ref err) = res {
debug!("Push failed: {:?}", err);
// Nng requires that we retrieve the message and free it
Expand Down
Loading

0 comments on commit cc67d5e

Please sign in to comment.