Skip to content

Commit 05bb6cd

Browse files
geieredgarRalith
authored andcommitted
Implement stream prioritization (fix #165)
1 parent 3aca40b commit 05bb6cd

File tree

4 files changed

+159
-10
lines changed

4 files changed

+159
-10
lines changed

quinn-proto/src/connection/mod.rs

+31
Original file line numberDiff line numberDiff line change
@@ -1653,6 +1653,37 @@ where
16531653
Ok(())
16541654
}
16551655

1656+
/// Set the priority of a stream
1657+
///
1658+
/// # Panics
1659+
/// - when applied to a receive stream
1660+
pub fn set_priority(
1661+
&mut self,
1662+
stream_id: StreamId,
1663+
priority: i32,
1664+
) -> Result<(), UnknownStream> {
1665+
assert!(
1666+
stream_id.dir() == Dir::Bi || stream_id.initiator() == self.side,
1667+
"only streams supporting outgoing data may change priority"
1668+
);
1669+
1670+
self.streams.set_priority(stream_id, priority)?;
1671+
Ok(())
1672+
}
1673+
1674+
/// Get the priority of a stream
1675+
///
1676+
/// # Panics
1677+
/// - when applied to a receive stream
1678+
pub fn priority(&mut self, stream_id: StreamId) -> Result<i32, UnknownStream> {
1679+
assert!(
1680+
stream_id.dir() == Dir::Bi || stream_id.initiator() == self.side,
1681+
"only streams supporting outgoing data have a priority"
1682+
);
1683+
1684+
Ok(self.streams.priority(stream_id)?)
1685+
}
1686+
16561687
/// Handle the already-decrypted first packet from the client
16571688
///
16581689
/// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more

quinn-proto/src/connection/streams.rs

+107-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
2-
collections::{hash_map, HashMap, VecDeque},
2+
cell::RefCell,
3+
collections::{binary_heap::PeekMut, hash_map, BinaryHeap, HashMap, VecDeque},
34
convert::TryFrom,
45
mem,
56
};
@@ -50,7 +51,7 @@ pub struct Streams {
5051
/// permitted to open but which have not yet been opened.
5152
send_streams: usize,
5253
/// Streams with outgoing data queued
53-
pending: VecDeque<StreamId>,
54+
pending: BinaryHeap<PendingLevel>,
5455

5556
events: VecDeque<StreamEvent>,
5657
/// Streams blocked on connection-level flow control or stream window space
@@ -105,7 +106,7 @@ impl Streams {
105106
opened: [false, false],
106107
next_reported_remote: [0, 0],
107108
send_streams: 0,
108-
pending: VecDeque::new(),
109+
pending: BinaryHeap::new(),
109110
events: VecDeque::new(),
110111
connection_blocked: Vec::new(),
111112
max_data: 0,
@@ -272,7 +273,7 @@ impl Streams {
272273
self.unacked_data += len as u64;
273274
trace!(stream = %id, "wrote {} bytes", len);
274275
if !was_pending {
275-
self.pending.push_back(id);
276+
push_pending(&mut self.pending, id, stream.priority);
276277
}
277278
Ok(len)
278279
}
@@ -393,7 +394,7 @@ impl Streams {
393394
let was_pending = stream.is_pending();
394395
stream.finish()?;
395396
if !was_pending {
396-
self.pending.push_back(id);
397+
push_pending(&mut self.pending, id, stream.priority);
397398
}
398399
Ok(())
399400
}
@@ -423,6 +424,25 @@ impl Streams {
423424
Ok(())
424425
}
425426

427+
pub fn set_priority(&mut self, id: StreamId, priority: i32) -> Result<(), UnknownStream> {
428+
let stream = match self.send.get_mut(&id) {
429+
Some(ss) => ss,
430+
None => return Err(UnknownStream { _private: () }),
431+
};
432+
433+
stream.priority = priority;
434+
Ok(())
435+
}
436+
437+
pub fn priority(&mut self, id: StreamId) -> Result<i32, UnknownStream> {
438+
let stream = match self.send.get_mut(&id) {
439+
Some(ss) => ss,
440+
None => return Err(UnknownStream { _private: () }),
441+
};
442+
443+
Ok(stream.priority)
444+
}
445+
426446
pub fn reset_acked(&mut self, id: StreamId) {
427447
match self.send.entry(id) {
428448
hash_map::Entry::Vacant(_) => {}
@@ -594,14 +614,21 @@ impl Streams {
594614
Some(x) => x,
595615
None => break,
596616
};
617+
let mut level = match self.pending.peek_mut() {
618+
Some(x) => x,
619+
None => break,
620+
};
597621
// Poppping data from the front of the queue, storing as much data
598622
// as possible in a single frame, and enqueing sending further
599623
// remaining data at the end of the queue helps with fairness.
600624
// Other streams will have a chance to write data before we touch
601625
// this stream again.
602-
let id = match self.pending.pop_front() {
626+
let id = match level.queue.get_mut().pop_front() {
603627
Some(x) => x,
604-
None => break,
628+
None => {
629+
PeekMut::pop(level);
630+
continue;
631+
}
605632
};
606633
let stream = match self.send.get_mut(&id) {
607634
Some(s) => s,
@@ -622,7 +649,12 @@ impl Streams {
622649
stream.fin_pending = false;
623650
}
624651
if stream.is_pending() {
625-
self.pending.push_back(id);
652+
if level.priority == stream.priority {
653+
level.queue.get_mut().push_back(id);
654+
} else {
655+
drop(level);
656+
push_pending(&mut self.pending, id, stream.priority);
657+
}
626658
}
627659

628660
let meta = frame::StreamMeta { id, offsets, fin };
@@ -691,7 +723,7 @@ impl Streams {
691723
Some(x) => x,
692724
};
693725
if !stream.is_pending() {
694-
self.pending.push_back(frame.id);
726+
push_pending(&mut self.pending, frame.id, stream.priority);
695727
}
696728
stream.fin_pending |= frame.fin;
697729
stream.pending.retransmit(frame.offsets);
@@ -708,7 +740,7 @@ impl Streams {
708740
continue;
709741
}
710742
if !stream.is_pending() {
711-
self.pending.push_back(id);
743+
push_pending(&mut self.pending, id, stream.priority);
712744
}
713745
stream.pending.retransmit_all_for_0rtt();
714746
}
@@ -916,6 +948,47 @@ impl Streams {
916948
}
917949
}
918950

951+
fn push_pending(pending: &mut BinaryHeap<PendingLevel>, id: StreamId, priority: i32) {
952+
for level in pending.iter() {
953+
if priority == level.priority {
954+
level.queue.borrow_mut().push_back(id);
955+
return;
956+
}
957+
}
958+
let mut queue = VecDeque::new();
959+
queue.push_back(id);
960+
pending.push(PendingLevel {
961+
queue: RefCell::new(queue),
962+
priority,
963+
});
964+
}
965+
966+
struct PendingLevel {
967+
// RefCell is needed because BinaryHeap doesn't have an iter_mut()
968+
queue: RefCell<VecDeque<StreamId>>,
969+
priority: i32,
970+
}
971+
972+
impl PartialEq for PendingLevel {
973+
fn eq(&self, other: &Self) -> bool {
974+
self.priority.eq(&other.priority)
975+
}
976+
}
977+
978+
impl PartialOrd for PendingLevel {
979+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
980+
Some(self.cmp(other))
981+
}
982+
}
983+
984+
impl Eq for PendingLevel {}
985+
986+
impl Ord for PendingLevel {
987+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
988+
self.priority.cmp(&other.priority)
989+
}
990+
}
991+
919992
/// Application events about streams
920993
#[derive(Debug)]
921994
pub enum StreamEvent {
@@ -1209,4 +1282,28 @@ mod tests {
12091282
TransportErrorCode::FLOW_CONTROL_ERROR
12101283
);
12111284
}
1285+
1286+
#[test]
1287+
fn stream_priority() {
1288+
let mut server = make(Side::Server);
1289+
server.set_params(&TransportParameters {
1290+
initial_max_streams_bidi: 3u32.into(),
1291+
initial_max_data: 10u32.into(),
1292+
initial_max_stream_data_bidi_remote: 10u32.into(),
1293+
..Default::default()
1294+
});
1295+
let id_high = server.open(Dir::Bi).unwrap();
1296+
let id_mid = server.open(Dir::Bi).unwrap();
1297+
let id_low = server.open(Dir::Bi).unwrap();
1298+
server.set_priority(id_low, -1).unwrap();
1299+
server.set_priority(id_high, 1).unwrap();
1300+
server.write(id_mid, b"mid").unwrap();
1301+
server.write(id_low, b"low").unwrap();
1302+
server.write(id_high, b"high").unwrap();
1303+
let mut buf = Vec::with_capacity(40);
1304+
let meta = server.write_stream_frames(&mut buf, 40);
1305+
assert_eq!(meta[0].id, id_high);
1306+
assert_eq!(meta[1].id, id_mid);
1307+
assert_eq!(meta[2].id, id_low);
1308+
}
12121309
}

quinn-proto/src/connection/streams/send.rs

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub(super) struct Send {
99
pub(super) max_data: u64,
1010
pub(super) state: SendState,
1111
pub(super) pending: SendBuffer,
12+
pub(super) priority: i32,
1213
/// Whether a frame containing a FIN bit must be transmitted, even if we don't have any new data
1314
pub(super) fin_pending: bool,
1415
/// Whether this stream is in the `connection_blocked` list of `Streams`
@@ -23,6 +24,7 @@ impl Send {
2324
max_data: max_data.into(),
2425
state: SendState::Ready,
2526
pending: SendBuffer::new(),
27+
priority: 0,
2628
fin_pending: false,
2729
connection_blocked: false,
2830
stop_reason: None,

quinn/src/streams.rs

+19
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,25 @@ where
150150
Ok(())
151151
}
152152

153+
/// Set the priority of the send stream
154+
///
155+
/// Every send stream has an initial priority of 0. Locally buffered data from streams with
156+
/// higher priority will be transmitted before data from streams with lower priority. Changing
157+
/// the priority of a stream with pending data may only take effect after that data has been
158+
/// transmitted. Using many different priority levels per connection may have a negative
159+
/// impact on performance.
160+
pub fn set_priority(&self, priority: i32) -> Result<(), UnknownStream> {
161+
let mut conn = self.conn.lock().unwrap();
162+
conn.inner.set_priority(self.stream, priority)?;
163+
Ok(())
164+
}
165+
166+
/// Get the priority of the send stream
167+
pub fn priority(&self) -> Result<i32, UnknownStream> {
168+
let mut conn = self.conn.lock().unwrap();
169+
Ok(conn.inner.priority(self.stream)?)
170+
}
171+
153172
/// Completes if/when the peer stops the stream, yielding the error code
154173
pub fn stopped(&mut self) -> Stopped<'_, S> {
155174
Stopped { stream: self }

0 commit comments

Comments
 (0)