Skip to content

Commit

Permalink
Changed CloudEndpoint to fail subsequent batches after a transient er…
Browse files Browse the repository at this point in the history
…ror (#1845) (#2275)

* Changed CloudEndpoint to fail subsequent batches after a transient error
* Changed ModuleEndpoint, so after a failed message it fails all subsequent in order to keep message order for retry

Co-authored-by: vipeller <51135538+vipeller@users.noreply.github.com>
  • Loading branch information
and-rewsmith and vipeller authored Jan 9, 2020
1 parent ada2042 commit 25ebf58
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Task<ISinkResult> ProcessAsync(ICollection<IRoutingMessage> routingMessag
public Task CloseAsync(CancellationToken token) => Task.CompletedTask;

internal static int GetBatchSize(int batchSize, long messageSize) =>
Math.Min((int)(Constants.MaxMessageSize / messageSize), batchSize);
Math.Min((int)(Constants.MaxMessageSize / Math.Max(1, messageSize)), batchSize);

static bool IsRetryable(Exception ex) => ex != null && RetryableExceptions.Any(re => re.IsInstanceOfType(ex));

Expand Down Expand Up @@ -136,103 +136,56 @@ static ISinkResult GetSyncResultForInvalidMessages(Exception ex, List<IRoutingMe

async Task<ISinkResult> ProcessByClients(ICollection<IRoutingMessage> routingMessages, CancellationToken token)
{
var result = new MergingSinkResult<IRoutingMessage>();

var routingMessageGroups = (from r in routingMessages
group r by this.GetIdentity(r)
into g
select new { Id = g.Key, RoutingMessages = g.ToList() })
.ToList();

var succeeded = new List<IRoutingMessage>();
var failed = new List<IRoutingMessage>();
var invalid = new List<InvalidDetails<IRoutingMessage>>();
Option<SendFailureDetails> sendFailureDetails =
Option.None<SendFailureDetails>();

Events.ProcessingMessageGroups(routingMessages, routingMessageGroups.Count, this.cloudEndpoint.FanOutFactor);

foreach (var groupBatch in routingMessageGroups.Batch(this.cloudEndpoint.FanOutFactor))
{
IEnumerable<Task<ISinkResult<IRoutingMessage>>> sendTasks = groupBatch
.Select(item => this.ProcessClientMessages(item.Id, item.RoutingMessages, token));
ISinkResult<IRoutingMessage>[] sinkResults = await Task.WhenAll(sendTasks);
foreach (ISinkResult<IRoutingMessage> res in sinkResults)

foreach (var res in sinkResults)
{
succeeded.AddRange(res.Succeeded);
failed.AddRange(res.Failed);
invalid.AddRange(res.InvalidDetailsList);
// Different branches could have different results, but only the most significant will be reported
if (IsMoreSignificant(sendFailureDetails, res.SendFailureDetails))
{
sendFailureDetails = res.SendFailureDetails;
}
result.Merge(res);
}
}

return new SinkResult<IRoutingMessage>(
succeeded,
failed,
invalid,
sendFailureDetails.GetOrElse(default(SendFailureDetails)));
return result;
}

// Process all messages for a particular client
async Task<ISinkResult<IRoutingMessage>> ProcessClientMessages(string id, List<IRoutingMessage> routingMessages, CancellationToken token)
{
var succeeded = new List<IRoutingMessage>();
var failed = new List<IRoutingMessage>();
var invalid = new List<InvalidDetails<IRoutingMessage>>();
Option<SendFailureDetails> sendFailureDetails =
Option.None<SendFailureDetails>();
var result = new MergingSinkResult<IRoutingMessage>();

// Find the maximum message size, and divide messages into largest batches
// not exceeding max allowed IoTHub message size.
long maxMessageSize = routingMessages.Select(r => r.Size()).Max();
int batchSize = GetBatchSize(Math.Min(this.cloudEndpoint.maxBatchSize, routingMessages.Count), maxMessageSize);
foreach (IEnumerable<IRoutingMessage> batch in routingMessages.Batch(batchSize))
{
ISinkResult res = await this.ProcessClientMessagesBatch(id, batch.ToList(), token);
succeeded.AddRange(res.Succeeded);
failed.AddRange(res.Failed);
invalid.AddRange(res.InvalidDetailsList);

if (IsMoreSignificant(sendFailureDetails, res.SendFailureDetails))
{
sendFailureDetails = res.SendFailureDetails;
}
var iterator = routingMessages.Batch(batchSize).GetEnumerator();
while (iterator.MoveNext())
{
result.Merge(await this.ProcessClientMessagesBatch(id, iterator.Current.ToList(), token));
if (!result.IsSuccessful)
break;
}

return new SinkResult<IRoutingMessage>(
succeeded,
failed,
invalid,
sendFailureDetails.GetOrElse(default(SendFailureDetails)));
}

static bool IsMoreSignificant(Option<SendFailureDetails> baseDetails, Option<SendFailureDetails> currentDetails)
{
// whatever happend before, if no details now, that cannot be more significant
if (currentDetails == Option.None<SendFailureDetails>())
return false;

// if something wrong happened now, but nothing before, then that is more significant
if (baseDetails == Option.None<SendFailureDetails>())
return true;

// at this point something has happened before, as well as now. Pick the more significant
var baseUnwrapped = baseDetails.Expect(ThrowBadProgramLogic);
var currentUnwrapped = currentDetails.Expect(ThrowBadProgramLogic);

// in theory this case is represened by Option.None and handled earlier, but let's check it just for sure
if (currentUnwrapped.FailureKind == FailureKind.None)
return false;

// Transient beats non-transient
if (baseUnwrapped.FailureKind != FailureKind.Transient && currentUnwrapped.FailureKind == FailureKind.Transient)
return true;

return false;
// if failed earlier, fast-fail the rest
while (iterator.MoveNext())
{
result.AddFailed(iterator.Current);
}

InvalidOperationException ThrowBadProgramLogic() => new InvalidOperationException("Error in program logic, uwrapped Option<T> should have had value");
return result;
}

async Task<ISinkResult<IRoutingMessage>> ProcessClientMessagesBatch(string id, List<IRoutingMessage> routingMessages, CancellationToken token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,16 @@ async Task<ISinkResult> ProcessAsync(ICollection<IRoutingMessage> routingMessage
IMessage message = this.moduleEndpoint.messageConverter.ToMessage(routingMessage);
try
{
await dp.SendMessageAsync(message, this.moduleEndpoint.Input);
succeeded.Add(routingMessage);
if (failed.Count == 0)
{
await dp.SendMessageAsync(message, this.moduleEndpoint.Input);
succeeded.Add(routingMessage);
}
else
{
// if one failed, fail the rest, so retry will keep message order
failed.Add(routingMessage);
}
}
catch (Exception ex)
{
Expand Down
59 changes: 58 additions & 1 deletion edge-hub/src/Microsoft.Azure.Devices.Routing.Core/SinkResult.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Routing.Core
{
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
Expand Down Expand Up @@ -44,8 +45,64 @@ public SinkResult(ICollection<T> succeeded, ICollection<T> failed, ICollection<I

public ICollection<InvalidDetails<T>> InvalidDetailsList { get; }

public Option<SendFailureDetails> SendFailureDetails { get; }
public Option<SendFailureDetails> SendFailureDetails { get; protected set; }

public bool IsSuccessful => !this.Failed.Any();
}

public class MergingSinkResult<T> : SinkResult<T>
{
public MergingSinkResult()
: base(new List<T>(), new List<T>(), new List<InvalidDetails<T>>(), null)
{
}

public void Merge(ISinkResult<T> other)
{
this.Succeeded.AddRange(other.Succeeded);
this.Failed.AddRange(other.Failed);
this.InvalidDetailsList.AddRange(other.InvalidDetailsList);

if (IsMoreSignificant(this.SendFailureDetails, other.SendFailureDetails))
{
this.SendFailureDetails = other.SendFailureDetails;
}
}

public void AddFailed(IEnumerable<T> failed)
{
this.Failed.AddRange(failed);
}

private new List<T> Succeeded => base.Succeeded as List<T>;
private new List<T> Failed => base.Failed as List<T>;
private new List<InvalidDetails<T>> InvalidDetailsList => base.InvalidDetailsList as List<InvalidDetails<T>>;

private static bool IsMoreSignificant(Option<SendFailureDetails> baseDetails, Option<SendFailureDetails> currentDetails)
{
// whatever happend before, if no details now, that cannot be more significant
if (currentDetails == Option.None<SendFailureDetails>())
return false;

// if something wrong happened now, but nothing before, then that is more significant
if (baseDetails == Option.None<SendFailureDetails>())
return true;

// at this point something has happened before, as well as now. Pick the more significant
var baseUnwrapped = baseDetails.Expect(ThrowBadProgramLogic);
var currentUnwrapped = currentDetails.Expect(ThrowBadProgramLogic);

// in theory this case is represened by Option.None, but let's check it just for sure
if (currentUnwrapped.FailureKind == FailureKind.None)
return false;

// Transient beats non-transient
if (baseUnwrapped.FailureKind != FailureKind.Transient && currentUnwrapped.FailureKind == FailureKind.Transient)
return true;

return false;

InvalidOperationException ThrowBadProgramLogic() => new InvalidOperationException("Error in program logic, uwrapped Option<T> should have had value");
}
}
}
Loading

0 comments on commit 25ebf58

Please sign in to comment.