Skip to content

Commit

Permalink
Ensure http.disconnect event in ASGI protocol (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
gi0baro authored Jan 17, 2024
1 parent da6be30 commit 571d443
Showing 1 changed file with 30 additions and 4 deletions.
34 changes: 30 additions & 4 deletions src/asgi/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub(crate) struct ASGIHTTPProtocol {
response_status: Option<i16>,
response_headers: Option<HeaderMap>,
body_tx: Option<mpsc::Sender<Result<body::Bytes, anyhow::Error>>>,
flow_rx_exhausted: Arc<std::sync::RwLock<bool>>,
flow_tx_waiter: Arc<tokio::sync::Notify>,
}

impl ASGIHTTPProtocol {
Expand All @@ -57,6 +59,8 @@ impl ASGIHTTPProtocol {
response_status: None,
response_headers: None,
body_tx: None,
flow_rx_exhausted: Arc::new(std::sync::RwLock::new(false)),
flow_tx_waiter: Arc::new(tokio::sync::Notify::new()),
}
}

Expand Down Expand Up @@ -96,7 +100,20 @@ impl ASGIHTTPProtocol {
#[pymethods]
impl ASGIHTTPProtocol {
fn receive<'p>(&mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
if *self.flow_rx_exhausted.read().unwrap() {
let holder = self.flow_tx_waiter.clone();
return future_into_py_futlike(self.rt.clone(), py, async move {
let () = holder.notified().await;
Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.disconnect"))?;
Ok(dict.to_object(py))
})
});
}

let body_ref = self.request_body.clone();
let flow_ref = self.flow_rx_exhausted.clone();
future_into_py_iter(self.rt.clone(), py, async move {
let mut bodym = body_ref.lock().await;
let body = &mut *bodym;
Expand All @@ -110,6 +127,11 @@ impl ASGIHTTPProtocol {
}
_ => body::Bytes::new(),
};
if !more_body {
let mut flow = flow_ref.write().unwrap();
*flow = true;
}

Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.request"))?;
Expand Down Expand Up @@ -143,6 +165,7 @@ impl ASGIHTTPProtocol {
.map_err(|e| match e {})
.boxed(),
);
self.flow_tx_waiter.notify_one();
empty_future_into_py(py)
}
(true, true, false) => {
Expand All @@ -164,10 +187,13 @@ impl ASGIHTTPProtocol {
_ => error_flow!(),
},
(true, false, true) => match self.body_tx.take() {
Some(tx) => match body.is_empty() {
false => self.send_body(py, tx, body),
true => empty_future_into_py(py),
},
Some(tx) => {
self.flow_tx_waiter.notify_one();
match body.is_empty() {
false => self.send_body(py, tx, body),
true => empty_future_into_py(py),
}
}
_ => error_flow!(),
},
_ => error_flow!(),
Expand Down

0 comments on commit 571d443

Please sign in to comment.