Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert "Invalid operation while connection is closing" to retriable ServiceBusException. #17023

Merged
merged 2 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,5 +263,18 @@ public static Exception GetInnerException(this AmqpObject amqpObject)

return innerException == null ? null : GetClientException(innerException, null, null, connectionError);
}

public static bool TryTranslateToRetriableException(Exception exception, out ServiceBusException retriableException)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize we don't have tests for this class and not sure if I should add a test for this method. I took this class from Serkant's PR fix.

{
retriableException = null;

// InvalidOperationException with 'connection is closing' message from AMQP layer is retriable.
if (exception is InvalidOperationException && exception.Message.IndexOf("connection is closing", StringComparison.OrdinalIgnoreCase) != -1)
{
retriableException = new ServiceBusException(true, exception);
}

return retriableException != null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ public async Task OnAddRuleAsync(RuleDescription description)
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}

Expand All @@ -153,7 +160,14 @@ public async Task OnRemoveRuleAsync(string ruleName)
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}

Expand Down Expand Up @@ -194,7 +208,14 @@ public async Task<IList<RuleDescription>> OnGetRulesAsync(int top, int skip)
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,14 @@ protected virtual async Task<IList<Message>> OnPeekAsync(long fromSequenceNumber
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}

Expand Down Expand Up @@ -1225,7 +1232,14 @@ protected virtual async Task<IList<Message>> OnReceiveDeferredMessageAsync(long[
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}

return messages;
Expand Down Expand Up @@ -1310,7 +1324,14 @@ protected virtual async Task<DateTime> OnRenewLockAsync(string lockToken)
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}

return lockedUntilUtc;
Expand Down Expand Up @@ -1487,7 +1508,14 @@ async Task DisposeMessagesAsync(IEnumerable<Guid> lockTokens, Outcome outcome)
throw new MessageLockLostException(Resources.MessageLockLost);
}

throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}

Expand Down Expand Up @@ -1550,7 +1578,14 @@ async Task DisposeMessageRequestResponseAsync(Guid[] lockTokens, DispositionStat
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,14 @@ async Task OnSendAsync(IList<Message> messageList)
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neeraj once made a fix for this Github issue but not sure why it didn't resolve it and similar cases kept coming up: https://github.com/Azure/azure-sdk-for-net/pull/6940/files/1b6a6da7e95ecb0868691ae94948acaa27851b03
We have made sure their SDK had Neeraj's fix. Add the retriable exception to every place where this error might occur to attempt to resolve this incident.

}
}
}
}
Expand Down Expand Up @@ -642,7 +649,14 @@ async Task<long> OnScheduleMessageAsync(Message message)
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception, sendLink?.GetTrackingId(), null, sendLink?.Session.IsClosing() ?? false);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception, sendLink?.GetTrackingId(), null, sendLink?.Session.IsClosing() ?? false);
}
}
}
}
Expand Down Expand Up @@ -675,7 +689,14 @@ async Task OnCancelScheduledMessageAsync(long sequenceNumber)
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception, sendLink?.GetTrackingId(), null, sendLink?.Session.IsClosing() ?? false);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception, sendLink?.GetTrackingId(), null, sendLink?.Session.IsClosing() ?? false);
}
}
}

Expand Down
27 changes: 24 additions & 3 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,14 @@ protected async Task<byte[]> OnGetStateAsync()
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
DorothySun216 marked this conversation as resolved.
Show resolved Hide resolved
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}

Expand Down Expand Up @@ -140,7 +147,14 @@ protected async Task OnSetStateAsync(byte[] sessionState)
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}

Expand Down Expand Up @@ -170,7 +184,14 @@ protected async Task OnRenewSessionLockAsync()
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception);
if (AmqpExceptionHelper.TryTranslateToRetriableException(exception, out var retriableEx))
{
throw retriableEx;
}
else
{
throw AmqpExceptionHelper.GetClientException(exception);
}
}
}

Expand Down