Skip to content

Commit

Permalink
Reintroduce locking around socket writes
Browse files Browse the repository at this point in the history
when transmitting multiple frames at once.

While changes in #350, #354 are supposed to be safe, there's some
evidence (#681) that sometimes they are not which leads to incorrect
byte stream interleaving on the wire.

Update API approval test expectations

Re-introduce lock for all socket writes

The official `NetworkStream` docs are confusing:

https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.networkstream?view=netframework-4.5.1

> Read and write operations can be performed simultaneously on an
instance of the NetworkStream class without the need for
synchronization. As long as there is one unique thread for the write
operations and one unique thread for the read operations, there will be
no cross-interference between read and write threads and no
synchronization is required.

This is a poorly-written way to say "multiple threads must be synchronized".

Fixes #681

Update API approval test expectations
  • Loading branch information
michaelklishin authored and lukebakken committed Jan 30, 2020
1 parent d0c5123 commit 664c8e9
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryEr
}
}
}

public string ClientProvidedName { get; private set; }

public ushort ChannelMax
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ public class SocketFrameHandler : IFrameHandler
private readonly ITcpClient m_socket;
private readonly NetworkBinaryWriter m_writer;
private readonly object _semaphore = new object();
private readonly object _sslStreamLock = new object();
private readonly object _streamLock = new object();
private bool _closed;
private bool _ssl = false;
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
Expand Down Expand Up @@ -112,7 +111,6 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
try
{
netstream = SslHelper.TcpUpgrade(netstream, endpoint.Ssl);
_ssl = true;
}
catch (Exception)
{
Expand Down Expand Up @@ -249,14 +247,7 @@ public void WriteFrameSet(IList<OutboundFrame> frames)

private void Write(ArraySegment<byte> bufferSegment)
{
if(_ssl)
{
lock (_sslStreamLock)
{
m_writer.Write(bufferSegment.Array, bufferSegment.Offset, bufferSegment.Count);
}
}
else
lock (_streamLock)
{
m_writer.Write(bufferSegment.Array, bufferSegment.Offset, bufferSegment.Count);
}
Expand Down

0 comments on commit 664c8e9

Please sign in to comment.