From 9282a3e8951441e029368014b16adc0e02282161 Mon Sep 17 00:00:00 2001 From: Sanskar Jethi Date: Mon, 20 Dec 2021 12:22:08 +0530 Subject: [PATCH] Add async support in WS (#134) * WS async support * Async function execution implemented Add more support for create, close and message * fix tests * Update docs --- README.md | 2 +- docs/api.md | 38 ++++++++- integration_tests/base_routes.py | 6 +- integration_tests/conftest.py | 4 +- integration_tests/test_web_sockets.py | 1 + src/router.rs | 44 +++++----- src/server.rs | 22 ++--- src/web_socket_connection.rs | 116 +++++++++++++++----------- 8 files changed, 143 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index cf3f16d45..b089f8dee 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ app.start(port=5000) ## Features - Under active development! -- Written in Russt, btw xD +- Written in Rust, btw xD - A multithreaded Runtime - Extensible - A simple API diff --git a/docs/api.md b/docs/api.md index 5153bbbed..c4ec8c475 100644 --- a/docs/api.md +++ b/docs/api.md @@ -164,7 +164,7 @@ app = Robyn(__file__) websocket = WS(app, "/web_socket") ``` -Now, you can define 3 methods for every web_socket for their lifecycle, they are as follows: +Now, you can define 3 methods for every web_socket for their life cycle, they are as follows: ```python3 @websocket.on("message") @@ -198,6 +198,42 @@ The three methods: To see a complete service in action, you can go to the folder [../integration_tests/base_routes.py](../integration_tests/base_routes.py) +### Update(20/12/21) + +Async functions are supported in Web Sockets now! + +Async functions are executed out of order for web sockets. i.e. the order of response is not guaranteed. This is done to achieve a non blocking concurrent effect. + +A blocking async web socket is in plans for the future. + +### Usage + +```python3 +@websocket.on("message") +async def connect(): + global i + i+=1 + if i==0: + return "Whaaat??" + elif i==1: + return "Whooo??" + elif i==2: + return "*chika* *chika* Slim Shady." + elif i==3: + i= -1 + return "" + +@websocket.on("close") +async def close(): + return "Goodbye world, from ws" + +@websocket.on("connect") +async def message(): + return "Hello world, from ws" + +``` + + ## MutliCore Scaling To run Robyn across multiple cores, you can use the following command: diff --git a/integration_tests/base_routes.py b/integration_tests/base_routes.py index 5cf09d685..dbb6836cd 100644 --- a/integration_tests/base_routes.py +++ b/integration_tests/base_routes.py @@ -8,7 +8,7 @@ i = -1 @websocket.on("message") -def connect(): +async def connect(): global i i+=1 if i==0: @@ -21,12 +21,10 @@ def connect(): @websocket.on("close") def close(): - print("Hello world") - return "Hello world, from ws" + return "GoodBye world, from ws" @websocket.on("connect") def message(): - print("Hello world") return "Hello world, from ws" diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py index ce256c7ad..3e56f0279 100644 --- a/integration_tests/conftest.py +++ b/integration_tests/conftest.py @@ -7,12 +7,12 @@ @pytest.fixture def session(): - subprocess.call(["freeport", "5000"]) + subprocess.call(["yes | freeport 5000"], shell=True) os.environ["ROBYN_URL"] = "127.0.0.1" current_file_path = pathlib.Path(__file__).parent.resolve() base_routes = os.path.join(current_file_path, "./base_routes.py") process = subprocess.Popen(["python3", base_routes]) - time.sleep(1) + time.sleep(5) yield process.terminate() del os.environ["ROBYN_URL"] diff --git a/integration_tests/test_web_sockets.py b/integration_tests/test_web_sockets.py index 69f71c72a..951f239b5 100644 --- a/integration_tests/test_web_sockets.py +++ b/integration_tests/test_web_sockets.py @@ -6,6 +6,7 @@ def test_web_socket(session): async def start_ws(uri): async with connect(uri) as websocket: + assert( await websocket.recv() == "Hello world, from ws") await websocket.send("My name is?") assert( await websocket.recv() == "Whaaat??") await websocket.send("My name is?") diff --git a/src/router.rs b/src/router.rs index 3b6c08d6c..c29e4e0ec 100644 --- a/src/router.rs +++ b/src/router.rs @@ -6,7 +6,6 @@ use pyo3::prelude::*; use pyo3::types::PyAny; use actix_web::http::Method; -use dashmap::DashMap; use matchit::Node; /// Contains the thread safe hashmaps of different routes @@ -21,7 +20,7 @@ pub struct Router { options_routes: Arc>>, connect_routes: Arc>>, trace_routes: Arc>>, - web_socket_routes: DashMap>, + web_socket_routes: Arc>>>, } impl Router { @@ -36,7 +35,7 @@ impl Router { options_routes: Arc::new(RwLock::new(Node::new())), connect_routes: Arc::new(RwLock::new(Node::new())), trace_routes: Arc::new(RwLock::new(Node::new())), - web_socket_routes: DashMap::new(), + web_socket_routes: Arc::new(RwLock::new(HashMap::new())), } } @@ -57,7 +56,9 @@ impl Router { } #[inline] - pub fn get_web_socket_map(&self) -> &DashMap> { + pub fn get_web_socket_map( + &self, + ) -> &Arc>>> { &self.web_socket_routes } @@ -117,26 +118,25 @@ impl Router { let (close_route_function, close_route_is_async, close_route_params) = close_route; let (message_route_function, message_route_is_async, message_route_params) = message_route; - let insert_in_router = |table: &DashMap>, - handler: Py, - is_async: bool, - number_of_params: u8, - socket_type: &str| { - let function = if is_async { - PyFunction::CoRoutine(handler) - } else { - PyFunction::SyncFunction(handler) + let insert_in_router = + |handler: Py, is_async: bool, number_of_params: u8, socket_type: &str| { + let function = if is_async { + PyFunction::CoRoutine(handler) + } else { + PyFunction::SyncFunction(handler) + }; + + println!("socket type is {:?} {:?}", table, route); + + table + .write() + .unwrap() + .entry(route.to_string()) + .or_default() + .insert(socket_type.to_string(), (function, number_of_params)) }; - let mut route_map = HashMap::new(); - route_map.insert(socket_type.to_string(), (function, number_of_params)); - - println!("{:?}", table); - table.insert(route.to_string(), route_map); - }; - insert_in_router( - table, connect_route_function, connect_route_is_async, connect_route_params, @@ -144,7 +144,6 @@ impl Router { ); insert_in_router( - table, close_route_function, close_route_is_async, close_route_params, @@ -152,7 +151,6 @@ impl Router { ); insert_in_router( - table, message_route_function, message_route_is_async, message_route_params, diff --git a/src/server.rs b/src/server.rs index 2149ff51d..a7c70c4bc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -63,13 +63,10 @@ impl Server { return Ok(()); } - println!("{}", name); - let borrow = socket.try_borrow_mut()?; let held_socket: &SocketHeld = &*borrow; let raw_socket = held_socket.get_socket(); - println!("Got our socket {:?}", raw_socket); let router = self.router.clone(); let headers = self.headers.clone(); @@ -123,17 +120,23 @@ impl Server { .app_data(web::Data::new(headers.clone())); let web_socket_map = router_copy.get_web_socket_map(); - for elem in (web_socket_map).iter() { - let route = elem.key().clone(); - let params = elem.value().clone(); + for (elem, value) in (web_socket_map.read().unwrap()).iter() { + let route = elem.clone(); + let params = value.clone(); + let event_loop_hdl = event_loop_hdl.clone(); app = app.route( - &route, + &route.clone(), web::get().to( move |_router: web::Data>, _headers: web::Data>, stream: web::Payload, req: HttpRequest| { - start_web_socket(req, stream, Arc::new(params.clone())) + start_web_socket( + req, + stream, + Arc::new(params.clone()), + event_loop_hdl.clone(), + ) }, ), ); @@ -205,14 +208,13 @@ impl Server { /// Add a new web socket route to the routing tables /// can be called after the server has been started pub fn add_web_socket_route( - &self, + &mut self, route: &str, // handler, is_async, number of params connect_route: (Py, bool, u8), close_route: (Py, bool, u8), message_route: (Py, bool, u8), ) { - println!("WS Route added for {} ", route); self.router .add_websocket_route(route, connect_route, close_route, message_route); } diff --git a/src/web_socket_connection.rs b/src/web_socket_connection.rs index 33d1e3381..3a77e2f89 100644 --- a/src/web_socket_connection.rs +++ b/src/web_socket_connection.rs @@ -1,6 +1,7 @@ use crate::types::PyFunction; -use actix::{Actor, StreamHandler}; +use actix::prelude::*; +use actix::{Actor, AsyncContext, StreamHandler}; use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; use actix_web_actors::ws::WebsocketContext; @@ -10,8 +11,45 @@ use std::collections::HashMap; use std::sync::Arc; /// Define HTTP actor +#[derive(Clone)] struct MyWs { router: Arc>, + event_loop: PyObject, +} + +fn execute_ws_functionn( + handler_function: &PyFunction, + event_loop: PyObject, + ctx: &mut ws::WebsocketContext, + ws: &MyWs, +) { + match handler_function { + PyFunction::SyncFunction(handler) => Python::with_gil(|py| { + let handler = handler.as_ref(py); + // call execute function + let op = handler.call0().unwrap(); + let op: &str = op.extract().unwrap(); + ctx.text(op); + }), + PyFunction::CoRoutine(handler) => { + let fut = Python::with_gil(|py| { + let handler = handler.as_ref(py); + let coro = handler.call0().unwrap(); + pyo3_asyncio::into_future_with_loop(event_loop.as_ref(py), coro).unwrap() + }); + let f = async move { + let output = fut.await.unwrap(); + Python::with_gil(|py| { + let output: &str = output.extract(py).unwrap(); + output.to_string() + }) + }; + let f = f.into_actor(ws).map(|res, _, ctx| { + ctx.text(res); + }); + ctx.spawn(f); + } + } } // By default mailbox capacity is 16 messages. @@ -19,14 +57,26 @@ impl Actor for MyWs { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut WebsocketContext) { + let handler_function = &self.router.get("connect").unwrap().0; + let _number_of_params = &self.router.get("connect").unwrap().1; + execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self); + println!("Actor is alive"); } - fn stopped(&mut self, _ctx: &mut WebsocketContext) { - println!("Actor is alive"); + fn stopped(&mut self, ctx: &mut WebsocketContext) { + let handler_function = &self.router.get("close").expect("No close function").0; + let _number_of_params = &self.router.get("close").unwrap().1; + execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self); + + println!("Actor is dead"); } } +#[derive(Message)] +#[rtype(result = "Result<(), ()>")] +struct CommandRunner(String); + /// Handler for ws::Message message impl StreamHandler> for MyWs { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { @@ -36,19 +86,7 @@ impl StreamHandler> for MyWs { let handler_function = &self.router.get("connect").unwrap().0; let _number_of_params = &self.router.get("connect").unwrap().1; println!("{:?}", handler_function); - match handler_function { - PyFunction::SyncFunction(handler) => Python::with_gil(|py| { - let handler = handler.as_ref(py); - // call execute function - let op = handler.call0().unwrap(); - let op: &str = op.extract().unwrap(); - - println!("{}", op); - }), - PyFunction::CoRoutine(handler) => { - println!("Async functions are not supported in WS right now."); - } - } + execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self); ctx.pong(&msg) } @@ -61,43 +99,15 @@ impl StreamHandler> for MyWs { // need to also passs this text as a param let handler_function = &self.router.get("message").unwrap().0; let _number_of_params = &self.router.get("message").unwrap().1; - println!("{:?}", handler_function); - match handler_function { - PyFunction::SyncFunction(handler) => Python::with_gil(|py| { - let handler = handler.as_ref(py); - // call execute function - let op = handler.call0().unwrap(); - let op: &str = op.extract().unwrap(); - - return ctx.text(op); - }), - PyFunction::CoRoutine(_handler) => { - println!("Async functions are not supported in WS right now."); - return ctx.text("Async Functions are not supported in WS right now."); - } - } + execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self); } Ok(ws::Message::Binary(bin)) => ctx.binary(bin), Ok(ws::Message::Close(_close_reason)) => { println!("Socket was closed"); - let router = &self.router; - let handler_function = &self.router.get("close").unwrap().0; - let number_of_params = &self.router.get("close").unwrap().1; - println!("{:?}", handler_function); - match handler_function { - PyFunction::SyncFunction(handler) => Python::with_gil(|py| { - let handler = handler.as_ref(py); - // call execute function - let op = handler.call0().unwrap(); - let op: &str = op.extract().unwrap(); - - println!("{:?}", op); - }), - PyFunction::CoRoutine(handler) => { - println!("Async functions are not supported in WS right now."); - } - } + let handler_function = &self.router.get("close").expect("No close function").0; + let _number_of_params = &self.router.get("close").unwrap().1; + execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self); } _ => (), } @@ -108,9 +118,17 @@ pub async fn start_web_socket( req: HttpRequest, stream: web::Payload, router: Arc>, + event_loop: PyObject, ) -> Result { // execute the async function here - let resp = ws::start(MyWs { router }, &req, stream); + let resp = ws::start( + MyWs { + router, + event_loop: event_loop.clone(), + }, + &req, + stream, + ); println!("{:?}", resp); resp }