Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add managed packet recycling #389

Merged
merged 3 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIMarsQueuedPacket.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNINpHandle.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIPacket.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIPacketPool.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIPhysicalHandle.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIProxy.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNITcpHandle.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SslOverTdsStream.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ internal abstract class SNIHandle
/// Send a packet asynchronously
/// </summary>
/// <param name="packet">SNI packet</param>
/// <param name="disposePacketAfterSendAsync"></param>
/// <param name="callback">Completion callback</param>
/// <returns>SNI error code</returns>
public abstract uint SendAsync(SNIPacket packet, bool disposePacketAfterSendAsync, SNIAsyncCallback callback = null);
public abstract uint SendAsync(SNIPacket packet, SNIAsyncCallback callback = null);

/// <summary>
/// Receive a packet synchronously
Expand Down Expand Up @@ -87,6 +86,11 @@ internal abstract class SNIHandle
public abstract Guid ConnectionId { get; }

public virtual int ReserveHeaderSize => 0;

public abstract SNIPacket RentPacket(int headerSize, int dataSize);

public abstract void ReturnPacket(SNIPacket packet);

#if DEBUG
/// <summary>
/// Test handle for killing underlying connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public uint SendAsync(SNIPacket packet, SNIAsyncCallback callback)
{
lock (this)
{
return _lowerHandle.SendAsync(packet, false, callback);
return _lowerHandle.SendAsync(packet, callback);
}
}
finally
Expand All @@ -136,7 +136,7 @@ public uint ReceiveAsync(ref SNIPacket packet)
{
if (packet != null)
{
packet.Release();
ReturnPacket(packet);
packet = null;
}

Expand Down Expand Up @@ -188,7 +188,8 @@ public void HandleReceiveError(SNIPacket packet)
handle.HandleReceiveError(packet);
}
}
packet?.Release();
Debug.Assert(!packet.IsInvalid, "packet was returned by MarsConnection child, child sessions should not release the packet");
ReturnPacket(packet);
}

/// <summary>
Expand Down Expand Up @@ -258,7 +259,7 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
_currentHeader.Read(_headerBytes);

_dataBytesLeft = (int)_currentHeader.length;
_currentPacket = new SNIPacket(headerSize: 0, dataSize: (int)_currentHeader.length);
_currentPacket = _lowerHandle.RentPacket(headerSize: 0, dataSize: (int)_currentHeader.length);
}

currentHeader = _currentHeader;
Expand Down Expand Up @@ -322,6 +323,11 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
{
SNICommon.ReportSNIError(SNIProviders.SMUX_PROV, SNICommon.InternalExceptionError, e);
}

Debug.Assert(_currentPacket == currentPacket, "current and _current are not the same");
ReturnPacket(currentPacket);
currentPacket = null;
_currentPacket = null;
}

lock (this)
Expand Down Expand Up @@ -379,6 +385,16 @@ public void DisableSsl()
}
}

public SNIPacket RentPacket(int headerSize, int dataSize)
{
return _lowerHandle.RentPacket(headerSize, dataSize);
}

public void ReturnPacket(SNIPacket packet)
{
_lowerHandle.ReturnPacket(packet);
}

#if DEBUG
/// <summary>
/// Test handle for killing underlying connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal sealed class SNIMarsHandle : SNIHandle
private readonly ManualResetEventSlim _packetEvent = new ManualResetEventSlim(false);
private readonly ManualResetEventSlim _ackEvent = new ManualResetEventSlim(false);
private readonly SNISMUXHeader _currentHeader = new SNISMUXHeader();
private readonly SNIAsyncCallback _handleSendCompleteCallback;

private uint _sendHighwater = 4;
private int _asyncReceives = 0;
Expand Down Expand Up @@ -79,6 +80,7 @@ public SNIMarsHandle(SNIMarsConnection connection, ushort sessionId, object call
_sessionId = sessionId;
_connection = connection;
_callbackObject = callbackObject;
_handleSendCompleteCallback = HandleSendComplete;
SendControlPacket(SNISMUXFlags.SMUX_SYN);
_status = TdsEnums.SNI_SUCCESS;
}
Expand All @@ -92,7 +94,7 @@ private void SendControlPacket(SNISMUXFlags flags)
long scopeID = SqlClientEventSource.Log.SNIScopeEnterEvent("<sc.SNI.SNIMarsHandle.SendControlPacket |SNI|INFO|SCOPE>");
try
{
SNIPacket packet = new SNIPacket(headerSize: SNISMUXHeader.HEADER_LENGTH, dataSize: 0);
SNIPacket packet = RentPacket(headerSize: SNISMUXHeader.HEADER_LENGTH, dataSize: 0);

lock (this)
{
Expand All @@ -102,6 +104,7 @@ private void SendControlPacket(SNISMUXFlags flags)
}

_connection.Send(packet);
ReturnPacket(packet);
}
finally
{
Expand Down Expand Up @@ -263,17 +266,16 @@ private uint SendPendingPackets()
/// Send a packet asynchronously
/// </summary>
/// <param name="packet">SNI packet</param>
/// <param name="disposePacketAfterSendAsync"></param>
/// <param name="callback">Completion callback</param>
/// <returns>SNI error code</returns>
public override uint SendAsync(SNIPacket packet, bool disposePacketAfterSendAsync, SNIAsyncCallback callback = null)
public override uint SendAsync(SNIPacket packet, SNIAsyncCallback callback = null)
{
long scopeID = SqlClientEventSource.Log.SNIScopeEnterEvent("<sc.SNI.SNIMarsHandle.SendAsync |SNI|INFO|SCOPE>");
try
{
lock (this)
{
_sendPacketQueue.Enqueue(new SNIMarsQueuedPacket(packet, callback != null ? callback : HandleSendComplete));
_sendPacketQueue.Enqueue(new SNIMarsQueuedPacket(packet, callback ?? _handleSendCompleteCallback));
}

SendPendingPackets();
Expand Down Expand Up @@ -340,13 +342,17 @@ public void HandleReceiveError(SNIPacket packet)
long scopeID = SqlClientEventSource.Log.SNIScopeEnterEvent("<sc.SNI.SNIMarsHandle.HandleReceiveError |SNI|INFO|SCOPE>");
try
{
// SNIMarsHandle should only receive calls to this function from the SNIMarsConnection aggregator class
// which should handle ownership of the packet because the individual mars handles are not aware of
// each other and cannot know if they are the last one in the list and that it is safe to return the packet

lock (_receivedPacketQueue)
{
_connectionError = SNILoadHandle.SingletonInstance.LastError;
_packetEvent.Set();
}

((TdsParserStateObject)_callbackObject).ReadAsyncCallback(PacketHandle.FromManagedPacket(packet), 1);
((TdsParserStateObject)_callbackObject).ReadAsyncCallback(PacketHandle.FromManagedPacket(packet), 1);
}
finally
{
Expand All @@ -370,6 +376,7 @@ public void HandleSendComplete(SNIPacket packet, uint sniErrorCode)

((TdsParserStateObject)_callbackObject).WriteAsyncCallback(PacketHandle.FromManagedPacket(packet), sniErrorCode);
}
_connection.ReturnPacket(packet);
}
finally
{
Expand Down Expand Up @@ -432,6 +439,8 @@ public void HandleReceiveComplete(SNIPacket packet, SNISMUXHeader header)

((TdsParserStateObject)_callbackObject).ReadAsyncCallback(PacketHandle.FromManagedPacket(packet), 0);
}

_connection.ReturnPacket(packet);
}

lock (this)
Expand Down Expand Up @@ -556,21 +565,14 @@ public override void SetBufferSize(int bufferSize)
{
}

/// <summary>
/// Enable SSL
/// </summary>
public override uint EnableSsl(uint options)
{
return _connection.EnableSsl(options);
}
public override uint EnableSsl(uint options) => _connection.EnableSsl(options);

public override void DisableSsl() => _connection.DisableSsl();

public override SNIPacket RentPacket(int headerSize, int dataSize) => _connection.RentPacket(headerSize, dataSize);

public override void ReturnPacket(SNIPacket packet) => _connection.ReturnPacket(packet);

/// <summary>
/// Disable SSL
/// </summary>
public override void DisableSsl()
{
_connection.DisableSsl();
}

#if DEBUG
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ namespace Microsoft.Data.SqlClient.SNI
/// <summary>
/// Mars queued packet
/// </summary>
internal class SNIMarsQueuedPacket
internal sealed class SNIMarsQueuedPacket
{
private SNIPacket _packet;
private SNIAsyncCallback _callback;
private readonly SNIPacket _packet;
private readonly SNIAsyncCallback _callback;

/// <summary>
/// Constructor
Expand All @@ -23,36 +23,8 @@ public SNIMarsQueuedPacket(SNIPacket packet, SNIAsyncCallback callback)
_callback = callback;
}

/// <summary>
/// SNI packet
/// </summary>
public SNIPacket Packet
{
get
{
return _packet;
}

set
{
_packet = value;
}
}
public SNIPacket Packet => _packet;

/// <summary>
/// Completion callback
/// </summary>
public SNIAsyncCallback Callback
{
get
{
return _callback;
}

set
{
_callback = value;
}
}
public SNIAsyncCallback Callback => _callback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Data.SqlClient.SNI
/// <summary>
/// Named Pipe connection handle
/// </summary>
internal sealed class SNINpHandle : SNIHandle
internal sealed class SNINpHandle : SNIPhysicalHandle
{
internal const string DefaultPipePath = @"sql\query"; // e.g. \\HOSTNAME\pipe\sql\query
private const int MAX_PIPE_INSTANCES = 255;
Expand Down Expand Up @@ -179,7 +179,7 @@ public override uint Receive(out SNIPacket packet, int timeout)
packet = null;
try
{
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);
packet = RentPacket(headerSize: 0, dataSize: _bufferSize);
packet.ReadFromStream(_stream);

if (packet.Length == 0)
Expand Down Expand Up @@ -220,7 +220,7 @@ public override uint ReceiveAsync(ref SNIPacket packet)
try
{
SNIPacket errorPacket;
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);
packet = RentPacket(headerSize: 0, dataSize: _bufferSize);

try
{
Expand Down Expand Up @@ -307,13 +307,13 @@ public override uint Send(SNIPacket packet)
}
}

public override uint SendAsync(SNIPacket packet, bool disposePacketAfterSendAsync, SNIAsyncCallback callback = null)
public override uint SendAsync(SNIPacket packet, SNIAsyncCallback callback = null)
{
long scopeID = SqlClientEventSource.Log.SNIScopeEnterEvent("<sc.SNI.SNINpHandle.SendAsync |SNI|SCOPE>");
try
{
SNIAsyncCallback cb = callback ?? _sendCallback;
packet.WriteToStreamAsync(_stream, cb, SNIProviders.NP_PROV, disposePacketAfterSendAsync);
packet.WriteToStreamAsync(_stream, cb, SNIProviders.NP_PROV);
return TdsEnums.SNI_SUCCESS_IO_PENDING;
}
finally
Expand Down Expand Up @@ -407,7 +407,7 @@ private uint ReportErrorAndReleasePacket(SNIPacket packet, Exception sniExceptio
{
if (packet != null)
{
packet.Release();
ReturnPacket(packet);
}
return SNICommon.ReportSNIError(SNIProviders.NP_PROV, SNICommon.InternalExceptionError, sniException);
}
Expand All @@ -416,7 +416,7 @@ private uint ReportErrorAndReleasePacket(SNIPacket packet, uint nativeError, uin
{
if (packet != null)
{
packet.Release();
ReturnPacket(packet);
}
return SNICommon.ReportSNIError(SNIProviders.NP_PROV, nativeError, sniError, errorMessage);
}
Expand Down
Loading