Skip to content

Commit

Permalink
Add async support in WS (#134)
Browse files Browse the repository at this point in the history
* WS async support

* Async function execution implemented
Add more support for create, close and message

* fix tests

* Update docs
  • Loading branch information
sansyrox authored Dec 20, 2021
1 parent 907adf4 commit 9282a3e
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 90 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ app.start(port=5000)

## Features
- Under active development!
- Written in Russt, btw xD
- Written in Rust, btw xD
- A multithreaded Runtime
- Extensible
- A simple API
Expand Down
38 changes: 37 additions & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ app = Robyn(__file__)
websocket = WS(app, "/web_socket")
```

Now, you can define 3 methods for every web_socket for their lifecycle, they are as follows:
Now, you can define 3 methods for every web_socket for their life cycle, they are as follows:

```python3
@websocket.on("message")
Expand Down Expand Up @@ -198,6 +198,42 @@ The three methods:

To see a complete service in action, you can go to the folder [../integration_tests/base_routes.py](../integration_tests/base_routes.py)

### Update(20/12/21)

Async functions are supported in Web Sockets now!

Async functions are executed out of order for web sockets. i.e. the order of response is not guaranteed. This is done to achieve a non blocking concurrent effect.

A blocking async web socket is in plans for the future.

### Usage

```python3
@websocket.on("message")
async def connect():
global i
i+=1
if i==0:
return "Whaaat??"
elif i==1:
return "Whooo??"
elif i==2:
return "*chika* *chika* Slim Shady."
elif i==3:
i= -1
return ""

@websocket.on("close")
async def close():
return "Goodbye world, from ws"

@websocket.on("connect")
async def message():
return "Hello world, from ws"

```


## MutliCore Scaling

To run Robyn across multiple cores, you can use the following command:
Expand Down
6 changes: 2 additions & 4 deletions integration_tests/base_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
i = -1

@websocket.on("message")
def connect():
async def connect():
global i
i+=1
if i==0:
Expand All @@ -21,12 +21,10 @@ def connect():

@websocket.on("close")
def close():
print("Hello world")
return "Hello world, from ws"
return "GoodBye world, from ws"

@websocket.on("connect")
def message():
print("Hello world")
return "Hello world, from ws"


Expand Down
4 changes: 2 additions & 2 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

@pytest.fixture
def session():
subprocess.call(["freeport", "5000"])
subprocess.call(["yes | freeport 5000"], shell=True)
os.environ["ROBYN_URL"] = "127.0.0.1"
current_file_path = pathlib.Path(__file__).parent.resolve()
base_routes = os.path.join(current_file_path, "./base_routes.py")
process = subprocess.Popen(["python3", base_routes])
time.sleep(1)
time.sleep(5)
yield
process.terminate()
del os.environ["ROBYN_URL"]
Expand Down
1 change: 1 addition & 0 deletions integration_tests/test_web_sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
def test_web_socket(session):
async def start_ws(uri):
async with connect(uri) as websocket:
assert( await websocket.recv() == "Hello world, from ws")
await websocket.send("My name is?")
assert( await websocket.recv() == "Whaaat??")
await websocket.send("My name is?")
Expand Down
44 changes: 21 additions & 23 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use pyo3::prelude::*;
use pyo3::types::PyAny;

use actix_web::http::Method;
use dashmap::DashMap;
use matchit::Node;

/// Contains the thread safe hashmaps of different routes
Expand All @@ -21,7 +20,7 @@ pub struct Router {
options_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
connect_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
trace_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
web_socket_routes: DashMap<String, HashMap<String, (PyFunction, u8)>>,
web_socket_routes: Arc<RwLock<HashMap<String, HashMap<String, (PyFunction, u8)>>>>,
}

impl Router {
Expand All @@ -36,7 +35,7 @@ impl Router {
options_routes: Arc::new(RwLock::new(Node::new())),
connect_routes: Arc::new(RwLock::new(Node::new())),
trace_routes: Arc::new(RwLock::new(Node::new())),
web_socket_routes: DashMap::new(),
web_socket_routes: Arc::new(RwLock::new(HashMap::new())),
}
}

Expand All @@ -57,7 +56,9 @@ impl Router {
}

#[inline]
pub fn get_web_socket_map(&self) -> &DashMap<String, HashMap<String, (PyFunction, u8)>> {
pub fn get_web_socket_map(
&self,
) -> &Arc<RwLock<HashMap<String, HashMap<String, (PyFunction, u8)>>>> {
&self.web_socket_routes
}

Expand Down Expand Up @@ -117,42 +118,39 @@ impl Router {
let (close_route_function, close_route_is_async, close_route_params) = close_route;
let (message_route_function, message_route_is_async, message_route_params) = message_route;

let insert_in_router = |table: &DashMap<String, HashMap<String, (PyFunction, u8)>>,
handler: Py<PyAny>,
is_async: bool,
number_of_params: u8,
socket_type: &str| {
let function = if is_async {
PyFunction::CoRoutine(handler)
} else {
PyFunction::SyncFunction(handler)
let insert_in_router =
|handler: Py<PyAny>, is_async: bool, number_of_params: u8, socket_type: &str| {
let function = if is_async {
PyFunction::CoRoutine(handler)
} else {
PyFunction::SyncFunction(handler)
};

println!("socket type is {:?} {:?}", table, route);

table
.write()
.unwrap()
.entry(route.to_string())
.or_default()
.insert(socket_type.to_string(), (function, number_of_params))
};

let mut route_map = HashMap::new();
route_map.insert(socket_type.to_string(), (function, number_of_params));

println!("{:?}", table);
table.insert(route.to_string(), route_map);
};

insert_in_router(
table,
connect_route_function,
connect_route_is_async,
connect_route_params,
"connect",
);

insert_in_router(
table,
close_route_function,
close_route_is_async,
close_route_params,
"close",
);

insert_in_router(
table,
message_route_function,
message_route_is_async,
message_route_params,
Expand Down
22 changes: 12 additions & 10 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,10 @@ impl Server {
return Ok(());
}

println!("{}", name);

let borrow = socket.try_borrow_mut()?;
let held_socket: &SocketHeld = &*borrow;

let raw_socket = held_socket.get_socket();
println!("Got our socket {:?}", raw_socket);

let router = self.router.clone();
let headers = self.headers.clone();
Expand Down Expand Up @@ -123,17 +120,23 @@ impl Server {
.app_data(web::Data::new(headers.clone()));

let web_socket_map = router_copy.get_web_socket_map();
for elem in (web_socket_map).iter() {
let route = elem.key().clone();
let params = elem.value().clone();
for (elem, value) in (web_socket_map.read().unwrap()).iter() {
let route = elem.clone();
let params = value.clone();
let event_loop_hdl = event_loop_hdl.clone();
app = app.route(
&route,
&route.clone(),
web::get().to(
move |_router: web::Data<Arc<Router>>,
_headers: web::Data<Arc<Headers>>,
stream: web::Payload,
req: HttpRequest| {
start_web_socket(req, stream, Arc::new(params.clone()))
start_web_socket(
req,
stream,
Arc::new(params.clone()),
event_loop_hdl.clone(),
)
},
),
);
Expand Down Expand Up @@ -205,14 +208,13 @@ impl Server {
/// Add a new web socket route to the routing tables
/// can be called after the server has been started
pub fn add_web_socket_route(
&self,
&mut self,
route: &str,
// handler, is_async, number of params
connect_route: (Py<PyAny>, bool, u8),
close_route: (Py<PyAny>, bool, u8),
message_route: (Py<PyAny>, bool, u8),
) {
println!("WS Route added for {} ", route);
self.router
.add_websocket_route(route, connect_route, close_route, message_route);
}
Expand Down
Loading

0 comments on commit 9282a3e

Please sign in to comment.