From de309f72f238e25abe2b58c56d35639365557f1e Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 21 Apr 2024 12:19:26 +1200 Subject: [PATCH] Expose synchronize flush (#16) --- lib/protocol/http2/connection.rb | 15 +++++++++++++-- lib/protocol/http2/framer.rb | 16 ++++++++++------ test/protocol/http2/framer.rb | 7 +++++++ test/protocol/http2/server.rb | 12 ++++++++++++ 4 files changed, 42 insertions(+), 8 deletions(-) diff --git a/lib/protocol/http2/connection.rb b/lib/protocol/http2/connection.rb index 69bdaea..88b42f4 100644 --- a/lib/protocol/http2/connection.rb +++ b/lib/protocol/http2/connection.rb @@ -143,9 +143,14 @@ def ignore_frame?(frame) end end + def synchronize + yield + end + # Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed. def read_frame frame = @framer.read_frame(@local_settings.maximum_frame_size) + # puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})" # puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}" @@ -207,12 +212,18 @@ def receive_goaway(frame) end def write_frame(frame) - @framer.write_frame(frame) + synchronize do + @framer.write_frame(frame) + @framer.flush + end end def write_frames if @framer - yield @framer + synchronize do + yield @framer + @framer.flush + end else raise EOFError, "Connection closed!" end diff --git a/lib/protocol/http2/framer.rb b/lib/protocol/http2/framer.rb index 6c5bcc7..a0032fd 100644 --- a/lib/protocol/http2/framer.rb +++ b/lib/protocol/http2/framer.rb @@ -41,6 +41,10 @@ def initialize(stream, frames = FRAMES) @frames = frames end + def flush + @stream.flush + end + def close @stream.close end @@ -69,7 +73,7 @@ def read_frame(maximum_frame_size = MAXIMUM_ALLOWED_FRAME_SIZE) # Read the header: length, type, flags, stream_id = read_header - # Async.logger.debug(self) {"read_frame: length=#{length} type=#{type} flags=#{flags} stream_id=#{stream_id} -> klass=#{@frames[type].inspect}"} + # Console.debug(self) {"read_frame: length=#{length} type=#{type} flags=#{flags} stream_id=#{stream_id} -> klass=#{@frames[type].inspect}"} # Allocate the frame: klass = @frames[type] || Frame @@ -78,19 +82,19 @@ def read_frame(maximum_frame_size = MAXIMUM_ALLOWED_FRAME_SIZE) # Read the payload: frame.read(@stream, maximum_frame_size) - # Async.logger.debug(self, name: "read") {frame.inspect} + # Console.debug(self, name: "read") {frame.inspect} return frame end + # Write a frame to the underlying IO. + # After writing one or more frames, you should call flush to ensure the frames are sent to the remote peer. + # @parameter frame [Frame] the frame to write. def write_frame(frame) - # Async.logger.debug(self, name: "write") {frame.inspect} + # Console.debug(self, name: "write") {frame.inspect} frame.write(@stream) - # Don't call @stream.flush here because it can cause significant contention if there is a semaphore around this method. - # @stream.flush - return frame end diff --git a/test/protocol/http2/framer.rb b/test/protocol/http2/framer.rb index 67c2217..e391759 100644 --- a/test/protocol/http2/framer.rb +++ b/test/protocol/http2/framer.rb @@ -9,6 +9,13 @@ let(:stream) {StringIO.new} let(:framer) {subject.new(stream)} + with "#flush" do + it "flushes the underlying stream" do + expect(stream).to receive(:flush) + framer.flush + end + end + with "#closed?" do it "reports the status of the underlying stream" do expect(stream).to receive(:closed?).and_return(true) diff --git a/test/protocol/http2/server.rb b/test/protocol/http2/server.rb index 98ec9b1..72628ab 100644 --- a/test/protocol/http2/server.rb +++ b/test/protocol/http2/server.rb @@ -42,6 +42,18 @@ expect(server.state).to be == :new end + it "fails with protocol error if first frame is not settings frame" do + framer.write_connection_preface + + data_frame = Protocol::HTTP2::DataFrame.new + data_frame.pack("Hello, World!") + framer.write_frame(data_frame) + + expect do + server.read_connection_preface + end.to raise_exception(Protocol::HTTP2::ProtocolError, message: be =~ /First frame must be #{Protocol::HTTP2::SettingsFrame}/) + end + it "cannot read connection preface in open state" do server.open! expect do