Skip to content

Commit

Permalink
Gracefull handling of exceptions that occurs when DevHostAgent closes…
Browse files Browse the repository at this point in the history
… sockets (#265)

* Fix for IOException

* Fixed IO Exception

* Reverted change in AssemblyVersion

* Added OperationAborted

* Fixed build error
  • Loading branch information
Eneuman authored Jul 5, 2023
1 parent 41569c1 commit f55a535
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 7 deletions.
21 changes: 19 additions & 2 deletions src/common/DevHostAgent/DevHostAgentExecutorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,18 @@ public async Task ReversePortForwardStartAsync(PortForwardStartInfo port, Func<i
var channelReader = await connection.StreamAsChannelAsync<PortForwardStreamBlock>("RunReversePortForward", port, cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
PortForwardStreamBlock b = await channelReader.ReadAsync(cancellationToken);
PortForwardStreamBlock b = null;

try
{
b = await channelReader.ReadAsync(cancellationToken);
}
catch (Exception ex) when (ex is OperationCanceledException)
{
// Cancellation requested
break;
}

switch (b.Flag)
{
case PortForwardStreamFlag.Connected:
Expand All @@ -98,6 +109,10 @@ public async Task ReversePortForwardStartAsync(PortForwardStartInfo port, Func<i
{
await connection.InvokeAsync("StopReversePortForward", port.Port, b.StreamId);
}
catch (TaskCanceledException)
{
// Task Canceled
}
catch (Exception ex)
{
// This is needed because this run inside a task that is forgotten, so any exception that is not processed inside the task is going to get leaked outside the normal processing scope.
Expand All @@ -112,7 +127,7 @@ public async Task ReversePortForwardStartAsync(PortForwardStartInfo port, Func<i
{
await dataHandler(b.StreamId, b.Content);
}
catch (Exception)
catch (Exception dex)
{
// closes this connection
try
Expand All @@ -124,6 +139,8 @@ public async Task ReversePortForwardStartAsync(PortForwardStartInfo port, Func<i
_log.Exception(ex);
}

_log.Exception(dex);

closedHandler(b.StreamId);
}
break;
Expand Down
34 changes: 31 additions & 3 deletions src/common/PortForward/ReversePortForwardManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
Expand Down Expand Up @@ -127,8 +128,12 @@ private async Task OnDataReceived(int streamId, byte[] data)
Task.Run(() => this.StartReceivingAsync(t, streamId, _cancellationToken)).Forget();
return t;
});
await tcpClient.GetStream().WriteAsync(data, 0, data.Length);
_log.Verbose($"Sent {data.Length} bytes to workload on id {streamId}.");

if (tcpClient.Connected)
{
await tcpClient.GetStream().WriteAsync(data, 0, data.Length);
_log.Verbose($"Sent {data.Length} bytes to workload on id {streamId}.");
}
}

private void OnClosed(int streamId)
Expand All @@ -149,15 +154,38 @@ private async Task StartReceivingAsync(TcpClient tcpClient, int streamId, Cancel
{
while (!cancellation.IsCancellationRequested)
{
int cRead = await tcpClient.GetStream().ReadAsync(buffer, 0, buffer.Length, cancellation);
int cRead = 0;
try
{
cRead = await tcpClient.GetStream().ReadAsync(buffer, cancellation);
}
catch (IOException ex)
{
if (ex.InnerException is OperationCanceledException)
{
// Cancellation requested
break;
}
if (ex.InnerException is SocketException se && (se.SocketErrorCode == SocketError.ConnectionReset || se.SocketErrorCode == SocketError.OperationAborted))
{
_log.Verbose($"Connection is already closed by DevHostAgent on socket {streamId} (StartReceivingAsync)");
break;
}

throw;
}

_log.Verbose($"ReversePortForwarder receive {cRead} bytes from port {_port.Port} on id {streamId}");

if (cRead == 0)
{
await _agentClient.ReversePortForwardStopAsync(_port.Port, streamId, cancellation);
break;
}

byte[] content = new byte[cRead];
Array.Copy(buffer, content, cRead);

try
{
await _agentClient.ReversePortForwardSendAsync(_port.Port, streamId, content, cancellation);
Expand Down
23 changes: 22 additions & 1 deletion src/common/PortForward/ServicePortForwardManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,28 @@ private async Task RunLoopAsync(TcpClient connection, CancellationToken cancella

while (!requestProcessingCancellationTokenSource.Token.IsCancellationRequested)
{
int cRead = await stream.ReadAsync(buffer, 0, buffer.Length, requestProcessingCancellationTokenSource.Token);
int cRead = 0;
try
{
cRead = await stream.ReadAsync(buffer, requestProcessingCancellationTokenSource.Token);
}
catch (IOException ex)
{
if (ex.InnerException is OperationCanceledException)
{
// Cancellation requested
break;
}
if (ex.InnerException is SocketException se && (se.SocketErrorCode == SocketError.ConnectionReset || se.SocketErrorCode == SocketError.OperationAborted))
{
_log.Verbose($"Connection is already closed by DevHostAgent on socket {streamId} (RunLoopAsync)");
requestProcessingCancellationTokenSource.Cancel();
break;
}

throw;
}

if (cRead == 0)
{
_log.Verbose($"ServicePortForward: stream {streamId}: StopLocal");
Expand Down
3 changes: 2 additions & 1 deletion src/devhostagent/Services/AgentExecutorHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public ChannelReader<PortForwardStreamBlock> RunReversePortForward(PortForwardSt
Task.Run(async () =>
{
connector = _reversePortForwardConnectors.GetOrAdd(startInfo.Port, (p) => new ReversePortForwardConnector(p, _log, _platform));
await connector.ConnectAsync(async (id) =>
await connector.ConnectAsync(
async (id) =>
{
_log.Verbose($"AgentHub connnected for {startInfo.Port}, id {id}");
await channel.Writer.WriteAsync(PortForwardStreamBlock.Connected(id));
Expand Down

0 comments on commit f55a535

Please sign in to comment.