Skip to content

Commit

Permalink
[streams] split into QuicStreamReceiver / QuicStreamSender
Browse files Browse the repository at this point in the history
  • Loading branch information
jlaine committed Jul 17, 2021
1 parent 7e4ffff commit 67d3fdd
Show file tree
Hide file tree
Showing 3 changed files with 439 additions and 399 deletions.
54 changes: 30 additions & 24 deletions src/aioquic/quic/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ def reset_stream(self, stream_id: int, error_code: int) -> None:
:param error_code: An error code indicating why the stream is being reset.
"""
stream = self._get_or_create_stream_for_send(stream_id)
stream.reset(error_code)
stream.sender.reset(error_code)

def send_ping(self, uid: int) -> None:
"""
Expand Down Expand Up @@ -1053,7 +1053,7 @@ def send_stream_data(
:param end_stream: If set to `True`, the FIN bit will be set.
"""
stream = self._get_or_create_stream_for_send(stream_id)
stream.write(data, end_stream=end_stream)
stream.sender.write(data, end_stream=end_stream)

# Private

Expand Down Expand Up @@ -1448,7 +1448,7 @@ def _handle_crypto_frame(
)

stream = self._crypto_streams[context.epoch]
event = stream.add_frame(frame)
event = stream.receiver.handle_frame(frame)
if event is not None:
# pass data to TLS layer
try:
Expand Down Expand Up @@ -1888,7 +1888,7 @@ def _handle_reset_stream_frame(
frame_type=frame_type,
reason_phrase="Over stream data limit",
)
newly_received = max(0, final_size - stream._recv_highest)
newly_received = max(0, final_size - stream.receiver.highest_offset)
if self._local_max_data.used + newly_received > self._local_max_data.value:
raise QuicConnectionError(
error_code=QuicErrorCode.FLOW_CONTROL_ERROR,
Expand All @@ -1904,7 +1904,9 @@ def _handle_reset_stream_frame(
final_size,
)
try:
event = stream.handle_reset(error_code=error_code, final_size=final_size)
event = stream.receiver.handle_reset(
error_code=error_code, final_size=final_size
)
except FinalSizeError as exc:
raise QuicConnectionError(
error_code=QuicErrorCode.FINAL_SIZE_ERROR,
Expand Down Expand Up @@ -2023,7 +2025,7 @@ def _handle_stream_frame(
frame_type=frame_type,
reason_phrase="Over stream data limit",
)
newly_received = max(0, offset + length - stream._recv_highest)
newly_received = max(0, offset + length - stream.receiver.highest_offset)
if self._local_max_data.used + newly_received > self._local_max_data.value:
raise QuicConnectionError(
error_code=QuicErrorCode.FLOW_CONTROL_ERROR,
Expand All @@ -2033,7 +2035,7 @@ def _handle_stream_frame(

# process data
try:
event = stream.add_frame(frame)
event = stream.receiver.handle_frame(frame)
except FinalSizeError as exc:
raise QuicConnectionError(
error_code=QuicErrorCode.FINAL_SIZE_ERROR,
Expand Down Expand Up @@ -2265,7 +2267,7 @@ def _retire_peer_cid(self, connection_id: QuicConnectionId) -> None:

def _push_crypto_data(self) -> None:
for epoch, buf in self._crypto_buffers.items():
self._crypto_streams[epoch].write(buf.data)
self._crypto_streams[epoch].sender.write(buf.data)
buf.seek(0)

def _send_probe(self) -> None:
Expand Down Expand Up @@ -2624,7 +2626,7 @@ def _write_application(
self._probe_pending = False

# CRYPTO
if crypto_stream is not None and not crypto_stream.send_buffer_is_empty:
if crypto_stream is not None and not crypto_stream.sender.buffer_is_empty:
self._write_crypto_frame(
builder=builder, space=space, stream=crypto_stream
)
Expand All @@ -2643,19 +2645,19 @@ def _write_application(

# STREAM and RESET_STREAM
for stream in self._streams.values():
if stream.reset_pending:
if stream.sender.reset_pending:
self._write_reset_stream_frame(
builder=builder,
frame_type=QuicFrameType.RESET_STREAM,
stream=stream,
)
elif not stream.is_blocked and not stream.send_buffer_is_empty:
elif not stream.is_blocked and not stream.sender.buffer_is_empty:
self._remote_max_data_used += self._write_stream_frame(
builder=builder,
space=space,
stream=stream,
max_offset=min(
stream._send_highest
stream.sender.highest_offset
+ self._remote_max_data
- self._remote_max_data_used,
stream.max_stream_data_remote,
Expand Down Expand Up @@ -2689,7 +2691,7 @@ def _write_handshake(
self._write_ack_frame(builder=builder, space=space, now=now)

# CRYPTO
if not crypto_stream.send_buffer_is_empty:
if not crypto_stream.sender.buffer_is_empty:
if self._write_crypto_frame(
builder=builder, space=space, stream=crypto_stream
):
Expand Down Expand Up @@ -2819,13 +2821,13 @@ def _write_connection_limits(
def _write_crypto_frame(
self, builder: QuicPacketBuilder, space: QuicPacketSpace, stream: QuicStream
) -> bool:
frame_overhead = 3 + size_uint_var(stream.next_send_offset)
frame = stream.get_frame(builder.remaining_flight_space - frame_overhead)
frame_overhead = 3 + size_uint_var(stream.sender.next_offset)
frame = stream.sender.get_frame(builder.remaining_flight_space - frame_overhead)
if frame is not None:
buf = builder.start_frame(
QuicFrameType.CRYPTO,
capacity=frame_overhead,
handler=stream.on_data_delivery,
handler=stream.sender.on_data_delivery,
handler_args=(frame.offset, frame.offset + len(frame.data)),
)
buf.push_uint_var(frame.offset)
Expand Down Expand Up @@ -2965,9 +2967,9 @@ def _write_reset_stream_frame(
buf = builder.start_frame(
frame_type=frame_type,
capacity=RESET_STREAM_CAPACITY,
handler=stream.on_reset_delivery,
handler=stream.sender.on_reset_delivery,
)
reset = stream.get_reset_frame()
reset = stream.sender.get_reset_frame()
buf.push_uint_var(stream.stream_id)
buf.push_uint_var(reset.error_code)
buf.push_uint_var(reset.final_size)
Expand Down Expand Up @@ -3011,10 +3013,14 @@ def _write_stream_frame(
frame_overhead = (
3
+ size_uint_var(stream.stream_id)
+ (size_uint_var(stream.next_send_offset) if stream.next_send_offset else 0)
+ (
size_uint_var(stream.sender.next_offset)
if stream.sender.next_offset
else 0
)
)
previous_send_highest = stream._send_highest
frame = stream.get_frame(
previous_send_highest = stream.sender.highest_offset
frame = stream.sender.get_frame(
builder.remaining_flight_space - frame_overhead, max_offset
)

Expand All @@ -3027,7 +3033,7 @@ def _write_stream_frame(
buf = builder.start_frame(
frame_type,
capacity=frame_overhead,
handler=stream.on_data_delivery,
handler=stream.sender.on_data_delivery,
handler_args=(frame.offset, frame.offset + len(frame.data)),
)
buf.push_uint_var(stream.stream_id)
Expand All @@ -3044,7 +3050,7 @@ def _write_stream_frame(
)
)

return stream._send_highest - previous_send_highest
return stream.sender.highest_offset - previous_send_highest
else:
return 0

Expand All @@ -3060,7 +3066,7 @@ def _write_stream_limits(
"""
if (
stream.max_stream_data_local
and stream._recv_highest * 2 > stream.max_stream_data_local
and stream.receiver.highest_offset * 2 > stream.max_stream_data_local
):
stream.max_stream_data_local *= 2
self._logger.debug(
Expand Down
Loading

0 comments on commit 67d3fdd

Please sign in to comment.