Skip to content

Commit

Permalink
Add client websockets helper
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 3, 2020
1 parent b142aed commit fff6073
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 15 deletions.
4 changes: 3 additions & 1 deletion ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Changes

## [0.1.15] - 2020-04-30
## [0.1.15] - 2020-05-03

* ntex::util: Refactor stream dispatcher

* ntex::http: Drop camel case headers support

* ntex::http: Fix upgrade service readiness check

* ntex::http: Add client websockets helper

* ntex::ws: Add stream and sink wrappers for ws protocol

* ntex::web: Add websockets helper
Expand Down
3 changes: 1 addition & 2 deletions ntex/src/http/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod request;
mod response;
mod sender;
mod test;
mod ws;
pub mod ws;

pub use self::builder::ClientBuilder;
pub use self::connect::BoxedSocket;
Expand All @@ -44,7 +44,6 @@ pub use self::request::ClientRequest;
pub use self::response::{ClientResponse, JsonBody, MessageBody};
pub use self::sender::SendClientRequest;
pub use self::test::TestResponse;
pub use self::ws::WebsocketsRequest;

use crate::http::error::HttpError;
use crate::http::{HeaderMap, Method, RequestHead, Uri};
Expand Down
24 changes: 23 additions & 1 deletion ntex/src/http/client/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ use std::{fmt, str};

#[cfg(feature = "cookie")]
use coo_kie::{Cookie, CookieJar};
use futures::Stream;

use crate::codec::Framed;
use crate::codec::{AsyncRead, AsyncWrite, Framed};
use crate::http::error::HttpError;
use crate::http::header::{self, HeaderName, HeaderValue, AUTHORIZATION};
use crate::http::{ConnectionType, Method, StatusCode, Uri, Version};
use crate::http::{Payload, RequestHead};
use crate::rt::time::timeout;
use crate::service::{IntoService, Service};
use crate::util::framed::{Dispatcher, DispatcherError};
use crate::ws;

pub use crate::ws::{CloseCode, CloseReason, Frame, Message};

use super::connect::BoxedSocket;
use super::error::{InvalidUrl, SendRequestError, WsClientError};
use super::response::ClientResponse;
Expand Down Expand Up @@ -402,6 +407,23 @@ impl fmt::Debug for WebsocketsRequest {
}
}

/// Start client websockets service.
pub async fn start<Io, T, F, Rx>(
framed: Framed<Io, ws::Codec>,
rx: Rx,
service: F,
) -> Result<(), DispatcherError<T::Error, ws::Codec>>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
T: Service<Request = ws::Frame, Response = Option<ws::Message>>,
T::Error: 'static,
T::Future: 'static,
F: IntoService<T>,
Rx: Stream<Item = ws::Message> + Unpin + 'static,
{
Dispatcher::with(framed, Some(rx), service.into_service()).await
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
41 changes: 32 additions & 9 deletions ntex/src/web/ws.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::error::Error as StdError;

use bytes::Bytes;
use futures::{Stream, TryStreamExt};
use futures::{Sink, Stream, TryStreamExt};

pub use crate::ws::{CloseCode, CloseReason, Frame, Message};

Expand All @@ -17,9 +17,9 @@ pub type WebSocketsSink =

/// Do websocket handshake and start websockets service.
pub async fn start<T, F, S, Err>(
factory: F,
req: HttpRequest,
payload: S,
factory: F,
) -> Result<HttpResponse, Err>
where
T: ServiceFactory<
Expand All @@ -34,18 +34,41 @@ where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
Err: From<T::InitError>,
Err: From<HandshakeError>,
{
let (tx, rx) = mpsc::channel();

start_with(req, payload, tx, rx, factory).await
}

/// Do websocket handshake and start websockets service.
pub async fn start_with<T, F, S, Err, Tx, Rx>(
req: HttpRequest,
payload: S,
tx: Tx,
rx: Rx,
factory: F,
) -> Result<HttpResponse, Err>
where
T: ServiceFactory<
Config = ws::StreamEncoder<Tx>,
Request = Frame,
Response = Option<Message>,
>,
T::Error: StdError + 'static,
T::InitError: 'static,
T::Service: 'static,
F: IntoServiceFactory<T>,
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
Err: From<T::InitError>,
Err: From<HandshakeError>,
Tx: Sink<Result<Bytes, Box<dyn StdError>>> + Clone + Unpin + 'static,
Tx::Error: StdError,
Rx: Stream<Item = Result<Bytes, Box<dyn StdError>>> + Unpin + 'static,
{
// ws handshake
let mut res = handshake(req.head())?;

let payload = payload.map_err(|e| {
let e: Box<dyn StdError> = Box::new(e);
e
});

// response body stream
let (tx, rx): (_, mpsc::Receiver<Result<Bytes, Box<dyn StdError>>>) =
mpsc::channel();
let sink = ws::StreamEncoder::new(tx);

// create ws service
Expand Down
4 changes: 2 additions & 2 deletions ntex/tests/web_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ async fn web_ws() {
App::new().service(web::resource("/").route(web::to(
|req: HttpRequest, pl: web::types::Payload| async move {
ws::start::<_, _, _, web::Error>(
req,
pl,
fn_factory_with_config(|_| async {
Ok::<_, web::Error>(fn_service(service))
}),
req,
pl,
)
.await
},
Expand Down

0 comments on commit fff6073

Please sign in to comment.