From 25ebf58de507720b1560ee5a06b33bd0d279d66d Mon Sep 17 00:00:00 2001 From: Andrew Smith Date: Wed, 8 Jan 2020 19:34:15 -0800 Subject: [PATCH] Changed CloudEndpoint to fail subsequent batches after a transient error (#1845) (#2275) * Changed CloudEndpoint to fail subsequent batches after a transient error * Changed ModuleEndpoint, so after a failed message it fails all subsequent in order to keep message order for retry Co-authored-by: vipeller <51135538+vipeller@users.noreply.github.com> --- .../routing/CloudEndpoint.cs | 87 ++--- .../routing/ModuleEndpoint.cs | 12 +- .../SinkResult.cs | 59 +++- .../routing/CloudMessageProcessorTests.cs | 148 ++++++--- .../routing/ModuleMessageProcessorTest.cs | 304 ++++++++++-------- .../CollectionEx.cs | 2 + 6 files changed, 361 insertions(+), 251 deletions(-) diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs index 3c96b877594..d3bc15308f1 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs @@ -100,7 +100,7 @@ public Task ProcessAsync(ICollection routingMessag public Task CloseAsync(CancellationToken token) => Task.CompletedTask; internal static int GetBatchSize(int batchSize, long messageSize) => - Math.Min((int)(Constants.MaxMessageSize / messageSize), batchSize); + Math.Min((int)(Constants.MaxMessageSize / Math.Max(1, messageSize)), batchSize); static bool IsRetryable(Exception ex) => ex != null && RetryableExceptions.Any(re => re.IsInstanceOfType(ex)); @@ -136,18 +136,14 @@ static ISinkResult GetSyncResultForInvalidMessages(Exception ex, List ProcessByClients(ICollection routingMessages, CancellationToken token) { + var result = new MergingSinkResult(); + var routingMessageGroups = (from r in routingMessages group r by this.GetIdentity(r) into g select new { Id = g.Key, RoutingMessages = g.ToList() }) .ToList(); - var succeeded = new List(); - var failed = new List(); - var invalid = new List>(); - Option sendFailureDetails = - Option.None(); - Events.ProcessingMessageGroups(routingMessages, routingMessageGroups.Count, this.cloudEndpoint.FanOutFactor); foreach (var groupBatch in routingMessageGroups.Batch(this.cloudEndpoint.FanOutFactor)) @@ -155,84 +151,41 @@ into g IEnumerable>> sendTasks = groupBatch .Select(item => this.ProcessClientMessages(item.Id, item.RoutingMessages, token)); ISinkResult[] sinkResults = await Task.WhenAll(sendTasks); - foreach (ISinkResult res in sinkResults) + + foreach (var res in sinkResults) { - succeeded.AddRange(res.Succeeded); - failed.AddRange(res.Failed); - invalid.AddRange(res.InvalidDetailsList); - // Different branches could have different results, but only the most significant will be reported - if (IsMoreSignificant(sendFailureDetails, res.SendFailureDetails)) - { - sendFailureDetails = res.SendFailureDetails; - } + result.Merge(res); } } - return new SinkResult( - succeeded, - failed, - invalid, - sendFailureDetails.GetOrElse(default(SendFailureDetails))); + return result; } // Process all messages for a particular client async Task> ProcessClientMessages(string id, List routingMessages, CancellationToken token) { - var succeeded = new List(); - var failed = new List(); - var invalid = new List>(); - Option sendFailureDetails = - Option.None(); + var result = new MergingSinkResult(); // Find the maximum message size, and divide messages into largest batches // not exceeding max allowed IoTHub message size. long maxMessageSize = routingMessages.Select(r => r.Size()).Max(); int batchSize = GetBatchSize(Math.Min(this.cloudEndpoint.maxBatchSize, routingMessages.Count), maxMessageSize); - foreach (IEnumerable batch in routingMessages.Batch(batchSize)) - { - ISinkResult res = await this.ProcessClientMessagesBatch(id, batch.ToList(), token); - succeeded.AddRange(res.Succeeded); - failed.AddRange(res.Failed); - invalid.AddRange(res.InvalidDetailsList); - if (IsMoreSignificant(sendFailureDetails, res.SendFailureDetails)) - { - sendFailureDetails = res.SendFailureDetails; - } + var iterator = routingMessages.Batch(batchSize).GetEnumerator(); + while (iterator.MoveNext()) + { + result.Merge(await this.ProcessClientMessagesBatch(id, iterator.Current.ToList(), token)); + if (!result.IsSuccessful) + break; } - return new SinkResult( - succeeded, - failed, - invalid, - sendFailureDetails.GetOrElse(default(SendFailureDetails))); - } - - static bool IsMoreSignificant(Option baseDetails, Option currentDetails) - { - // whatever happend before, if no details now, that cannot be more significant - if (currentDetails == Option.None()) - return false; - - // if something wrong happened now, but nothing before, then that is more significant - if (baseDetails == Option.None()) - return true; - - // at this point something has happened before, as well as now. Pick the more significant - var baseUnwrapped = baseDetails.Expect(ThrowBadProgramLogic); - var currentUnwrapped = currentDetails.Expect(ThrowBadProgramLogic); - - // in theory this case is represened by Option.None and handled earlier, but let's check it just for sure - if (currentUnwrapped.FailureKind == FailureKind.None) - return false; - - // Transient beats non-transient - if (baseUnwrapped.FailureKind != FailureKind.Transient && currentUnwrapped.FailureKind == FailureKind.Transient) - return true; - - return false; + // if failed earlier, fast-fail the rest + while (iterator.MoveNext()) + { + result.AddFailed(iterator.Current); + } - InvalidOperationException ThrowBadProgramLogic() => new InvalidOperationException("Error in program logic, uwrapped Option should have had value"); + return result; } async Task> ProcessClientMessagesBatch(string id, List routingMessages, CancellationToken token) diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/ModuleEndpoint.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/ModuleEndpoint.cs index fdaa3bea7a4..778eb494584 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/ModuleEndpoint.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/ModuleEndpoint.cs @@ -161,8 +161,16 @@ async Task ProcessAsync(ICollection routingMessage IMessage message = this.moduleEndpoint.messageConverter.ToMessage(routingMessage); try { - await dp.SendMessageAsync(message, this.moduleEndpoint.Input); - succeeded.Add(routingMessage); + if (failed.Count == 0) + { + await dp.SendMessageAsync(message, this.moduleEndpoint.Input); + succeeded.Add(routingMessage); + } + else + { + // if one failed, fail the rest, so retry will keep message order + failed.Add(routingMessage); + } } catch (Exception ex) { diff --git a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/SinkResult.cs b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/SinkResult.cs index fe95497edc1..b2c9a7690c7 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/SinkResult.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/SinkResult.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Azure.Devices.Routing.Core { + using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; @@ -44,8 +45,64 @@ public SinkResult(ICollection succeeded, ICollection failed, ICollection> InvalidDetailsList { get; } - public Option SendFailureDetails { get; } + public Option SendFailureDetails { get; protected set; } public bool IsSuccessful => !this.Failed.Any(); } + + public class MergingSinkResult : SinkResult + { + public MergingSinkResult() + : base(new List(), new List(), new List>(), null) + { + } + + public void Merge(ISinkResult other) + { + this.Succeeded.AddRange(other.Succeeded); + this.Failed.AddRange(other.Failed); + this.InvalidDetailsList.AddRange(other.InvalidDetailsList); + + if (IsMoreSignificant(this.SendFailureDetails, other.SendFailureDetails)) + { + this.SendFailureDetails = other.SendFailureDetails; + } + } + + public void AddFailed(IEnumerable failed) + { + this.Failed.AddRange(failed); + } + + private new List Succeeded => base.Succeeded as List; + private new List Failed => base.Failed as List; + private new List> InvalidDetailsList => base.InvalidDetailsList as List>; + + private static bool IsMoreSignificant(Option baseDetails, Option currentDetails) + { + // whatever happend before, if no details now, that cannot be more significant + if (currentDetails == Option.None()) + return false; + + // if something wrong happened now, but nothing before, then that is more significant + if (baseDetails == Option.None()) + return true; + + // at this point something has happened before, as well as now. Pick the more significant + var baseUnwrapped = baseDetails.Expect(ThrowBadProgramLogic); + var currentUnwrapped = currentDetails.Expect(ThrowBadProgramLogic); + + // in theory this case is represened by Option.None, but let's check it just for sure + if (currentUnwrapped.FailureKind == FailureKind.None) + return false; + + // Transient beats non-transient + if (baseUnwrapped.FailureKind != FailureKind.Transient && currentUnwrapped.FailureKind == FailureKind.Transient) + return true; + + return false; + + InvalidOperationException ThrowBadProgramLogic() => new InvalidOperationException("Error in program logic, uwrapped Option should have had value"); + } + } } diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudMessageProcessorTests.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudMessageProcessorTests.cs index f49b38b9a1c..5b9919c87e3 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudMessageProcessorTests.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudMessageProcessorTests.cs @@ -347,7 +347,9 @@ public async Task NoErrorFromCloudProxy_NoErrorDetailsReturned() string cloudEndpointId = Guid.NewGuid().ToString(); var cloudProxy = ThrowingCloudProxy - .CreateWithResponses(30, ThrowingCloudProxy.Success()) + .Create() + .WithBatchSize(batchSize) + .WithReportSuccessfulBatch(3) .Build(); Task> GetCloudProxy(string id) => Task.FromResult(Option.Some(cloudProxy)); @@ -366,17 +368,17 @@ public async Task NoErrorFromCloudProxy_NoErrorDetailsReturned() [Fact] [Unit] - public async Task ErrorInFirstBatch_ErrorDetailsReturned() + public async Task TransientErrorInFirstBatch_FastFailsRest() { var batchSize = 10; Core.IMessageConverter routingMessageConverter = new RoutingMessageConverter(); string cloudEndpointId = Guid.NewGuid().ToString(); var cloudProxy = ThrowingCloudProxy - .CreateWithResponses(3, ThrowingCloudProxy.Success()) - .Then(ThrowingCloudProxy.Throw()) - .ThenMany(batchSize - 4, ThrowingCloudProxy.Success()) // first batch ends here (3 success + 1 fail -> 10 - 4 to go) - .ThenMany(2 * batchSize, ThrowingCloudProxy.Success()) + .Create() + .WithBatchSize(batchSize) + .WithReportErrorInBatch(ThrowingCloudProxy.Throw()) + .WithReportSuccessfulBatch(2) .Build(); Task> GetCloudProxy(string id) => Task.FromResult(Option.Some(cloudProxy)); @@ -386,9 +388,10 @@ public async Task ErrorInFirstBatch_ErrorDetailsReturned() var sinkResult = await cloudMessageProcessor.ProcessAsync(GetMessages("device1", 3 * batchSize), CancellationToken.None); + // although the test is setup to succeed with batch 2 and 3, they will fast fail because of the first one Assert.False(sinkResult.IsSuccessful); - Assert.Equal(2 * batchSize, sinkResult.Succeeded.Count); - Assert.Equal(1 * batchSize, sinkResult.Failed.Count); + Assert.Equal(0, sinkResult.Succeeded.Count); + Assert.Equal(30, sinkResult.Failed.Count); Assert.Equal(0, sinkResult.InvalidDetailsList.Count); Assert.True(sinkResult.SendFailureDetails.HasValue); Assert.Equal(FailureKind.Transient, sinkResult.SendFailureDetails.Expect(() => new Exception()).FailureKind); @@ -396,20 +399,17 @@ public async Task ErrorInFirstBatch_ErrorDetailsReturned() [Fact] [Unit] - public async Task TransientErrorInFirstBatch_FailureDetailsDoesNotGetOverwritten() + public async Task NonTransientErrorInFirstBatch_LetsTryTheRest_ButReportsSendFailure() { var batchSize = 10; Core.IMessageConverter routingMessageConverter = new RoutingMessageConverter(); string cloudEndpointId = Guid.NewGuid().ToString(); var cloudProxy = ThrowingCloudProxy - .CreateWithResponses(3, ThrowingCloudProxy.Success()) - .Then(ThrowingCloudProxy.Throw()) // transient error - .ThenMany(batchSize - 4, ThrowingCloudProxy.Success()) // first batch ends here (3 success + 1 fail -> 10 - 4 to go) - .ThenMany(3, ThrowingCloudProxy.Success()) - .Then(ThrowingCloudProxy.Throw()) // non-transient error - .ThenMany(batchSize - 4, ThrowingCloudProxy.Success()) // second batch ends here - .ThenMany(1 * batchSize, ThrowingCloudProxy.Success()) + .Create() + .WithBatchSize(batchSize) + .WithReportErrorInBatch(ThrowingCloudProxy.Throw()) + .WithReportSuccessfulBatch(2) .Build(); Task> GetCloudProxy(string id) => Task.FromResult(Option.Some(cloudProxy)); @@ -419,30 +419,28 @@ public async Task TransientErrorInFirstBatch_FailureDetailsDoesNotGetOverwritten var sinkResult = await cloudMessageProcessor.ProcessAsync(GetMessages("device1", 3 * batchSize), CancellationToken.None); - Assert.False(sinkResult.IsSuccessful); - Assert.Equal(batchSize, sinkResult.Succeeded.Count); - Assert.Equal(batchSize, sinkResult.Failed.Count); + Assert.True(sinkResult.IsSuccessful); // non-transient errors are ignored, but reported in SendFailureDetails + Assert.Equal(2 * batchSize, sinkResult.Succeeded.Count); + Assert.Equal(0, sinkResult.Failed.Count); Assert.Equal(batchSize, sinkResult.InvalidDetailsList.Count); Assert.True(sinkResult.SendFailureDetails.HasValue); - Assert.Equal(FailureKind.Transient, sinkResult.SendFailureDetails.Expect(() => new Exception()).FailureKind); + Assert.Equal(FailureKind.InvalidInput, sinkResult.SendFailureDetails.Expect(() => new Exception()).FailureKind); } [Fact] [Unit] - public async Task TransientErrorInSecondBatch_OverwritesNonTransientFailureDetails() + public async Task TransientErrorInSecondBatch_FastFailsRest_OverwritesNonTransientResult() { var batchSize = 10; Core.IMessageConverter routingMessageConverter = new RoutingMessageConverter(); string cloudEndpointId = Guid.NewGuid().ToString(); var cloudProxy = ThrowingCloudProxy - .CreateWithResponses(3, ThrowingCloudProxy.Success()) - .Then(ThrowingCloudProxy.Throw()) // non-transient error - .ThenMany(batchSize - 4, ThrowingCloudProxy.Success()) // first batch ends here (3 success + 1 fail -> 10 - 4 to go) - .ThenMany(3, ThrowingCloudProxy.Success()) - .Then(ThrowingCloudProxy.Throw()) // transient error - .ThenMany(batchSize - 4, ThrowingCloudProxy.Success()) // second batch ends here - .ThenMany(1 * batchSize, ThrowingCloudProxy.Success()) + .Create() + .WithBatchSize(batchSize) + .WithReportErrorInBatch(ThrowingCloudProxy.Throw()) + .WithReportErrorInBatch(ThrowingCloudProxy.Throw()) + .WithReportSuccessfulBatch() .Build(); Task> GetCloudProxy(string id) => Task.FromResult(Option.Some(cloudProxy)); @@ -453,9 +451,52 @@ public async Task TransientErrorInSecondBatch_OverwritesNonTransientFailureDetai var sinkResult = await cloudMessageProcessor.ProcessAsync(GetMessages("device1", 3 * batchSize), CancellationToken.None); Assert.False(sinkResult.IsSuccessful); - Assert.Equal(batchSize, sinkResult.Succeeded.Count); - Assert.Equal(batchSize, sinkResult.Failed.Count); - Assert.Equal(batchSize, sinkResult.InvalidDetailsList.Count); + Assert.Equal(0, sinkResult.Succeeded.Count); + Assert.Equal(2 * batchSize, sinkResult.Failed.Count); + Assert.Equal(1 * batchSize, sinkResult.InvalidDetailsList.Count); + Assert.True(sinkResult.SendFailureDetails.HasValue); + Assert.Equal(FailureKind.Transient, sinkResult.SendFailureDetails.Expect(() => new Exception()).FailureKind); + } + + [Fact] + [Unit] + public async Task TransientErrorOfFirstIdentity_DoesNotFastFailsSecondIdentity_ButReportsError() + { + var batchSize = 10; + Core.IMessageConverter routingMessageConverter = new RoutingMessageConverter(); + string cloudEndpointId = Guid.NewGuid().ToString(); + + // this wont fast fail + var cloudProxy1 = ThrowingCloudProxy + .Create() + .WithBatchSize(batchSize) + .WithReportSuccessfulBatch(3) + .Build(); + + // this will fast fail after a batch (skipping the second and the third) + var cloudProxy2 = ThrowingCloudProxy + .Create() + .WithBatchSize(batchSize) + .WithReportSuccessfulBatch() + .WithReportErrorInBatch(ThrowingCloudProxy.Throw()) + .WithReportSuccessfulBatch() + .Build(); + + var proxyMap = new Dictionary { ["device1"] = cloudProxy1, ["device2"] = cloudProxy2 }; + + Task> GetCloudProxy(string id) => Task.FromResult(Option.Some(proxyMap[id])); + + var cloudEndpoint = new CloudEndpoint(cloudEndpointId, GetCloudProxy, routingMessageConverter); + var cloudMessageProcessor = cloudEndpoint.CreateProcessor(); + + var random = new Random(35325); + var messages = GetMessages("device1", 3 * batchSize).Concat(GetMessages("device2", 3 * batchSize)).OrderBy(order => random.Next()).ToList(); + var sinkResult = await cloudMessageProcessor.ProcessAsync(messages, CancellationToken.None); + + Assert.False(sinkResult.IsSuccessful); // one batch went wrong, should report here + Assert.Equal(3 * batchSize + 1 * batchSize, sinkResult.Succeeded.Count); // dev1 all good, dev2 1st good + Assert.Equal(2 * batchSize, sinkResult.Failed.Count); + Assert.Equal(0, sinkResult.InvalidDetailsList.Count); Assert.True(sinkResult.SendFailureDetails.HasValue); Assert.Equal(FailureKind.Transient, sinkResult.SendFailureDetails.Expect(() => new Exception()).FailureKind); } @@ -494,12 +535,6 @@ static IRoutingMessage GetMessage(string id) { SystemProperties.DeviceId, id } }; - var cancelProperties = new Dictionary() - { - { "Delay", "true" }, - { "Prop2", "Val2" } - }; - var message = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); return message; } @@ -558,27 +593,48 @@ internal static Func Throw() return () => Task.FromException(new T()); } - internal static CloudProxyBuilder CreateWithResponses(int count, Func action) + internal static CloudProxyBuilder Create() { - var result = new CloudProxyBuilder(); - result.ThenMany(count, action); - - return result; + return new CloudProxyBuilder(); } internal class CloudProxyBuilder { + private Random random = new Random(834793); private List<(int, Func)> callResponses = new List<(int, Func)>(); + private int batchSize = 10; + private Func successAction = ThrowingCloudProxy.Success(); - internal CloudProxyBuilder Then(Func action) + internal CloudProxyBuilder WithBatchSize(int batchSize) { - this.callResponses.Add((1, action)); + this.batchSize = batchSize; return this; } - internal CloudProxyBuilder ThenMany(int count, Func action) + internal CloudProxyBuilder WithSuccessAction(Func successAction) { - this.callResponses.Add((count, action)); + this.successAction = successAction; + return this; + } + + internal CloudProxyBuilder WithReportSuccessfulBatch(int batchCount = 1) + { + this.callResponses.Add((batchCount * this.batchSize, this.successAction)); + return this; + } + + internal CloudProxyBuilder WithReportErrorInBatch(Func errorAction) + { + var index = this.random.Next(this.batchSize); + + if (index > 0) + this.callResponses.Add((index, this.successAction)); + + this.callResponses.Add((1, errorAction)); + + if (index + 1 != this.batchSize) + this.callResponses.Add((this.batchSize - (index + 1), this.successAction)); + return this; } diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/ModuleMessageProcessorTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/ModuleMessageProcessorTest.cs index 9f53c73fca4..7c22ff7e844 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/ModuleMessageProcessorTest.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/ModuleMessageProcessorTest.cs @@ -3,7 +3,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Routing { using System; using System.Collections.Generic; - using System.Collections.ObjectModel; + using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -41,52 +41,24 @@ public void BasicTest() [Unit] public async Task ProcessAsyncTest() { - Core.IMessageConverter routingMessageConverter = new RoutingMessageConverter(); - - const string Mod1Id = "device1/module1"; - const string ModEndpointId = "in1"; - - var deviceProxyMock = new Mock(); - deviceProxyMock.Setup(c => c.SendMessageAsync(It.IsAny(), It.Is((ep) => ep.Equals(ModEndpointId, StringComparison.OrdinalIgnoreCase)))) - .Returns(Task.CompletedTask); - - deviceProxyMock.SetupGet(p => p.IsActive).Returns(true); - - IReadOnlyDictionary deviceSubscriptions = new ReadOnlyDictionary( - new Dictionary - { - [DeviceSubscription.ModuleMessages] = true - }); - var connectionManager = new Mock(); - connectionManager.Setup(c => c.GetDeviceConnection(It.IsAny())).Returns(Option.Some(deviceProxyMock.Object)); - connectionManager.Setup(c => c.GetSubscriptions(It.IsAny())).Returns(Option.Some(deviceSubscriptions)); - - byte[] messageBody = Encoding.UTF8.GetBytes("Message body"); - var properties = new Dictionary() - { - { "Prop1", "Val1" }, - { "Prop2", "Val2" }, - }; + var throwingDeviceProxy = ThrowingDeviceProxyBuilder + .Create() + .WithSuccess() + .Build(); - var systemProperties = new Dictionary - { - { SystemProperties.DeviceId, Mod1Id } - }; - - var message1 = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); - var message2 = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); + var messageProcessor = this.CreateMessageProcessor(throwingDeviceProxy); - var moduleEndpoint = new ModuleEndpoint($"{Mod1Id}/{ModEndpointId}", Mod1Id, ModEndpointId, connectionManager.Object, routingMessageConverter); - IProcessor moduleMessageProcessor = moduleEndpoint.CreateProcessor(); + var message1 = GetMessage(); + var message2 = GetMessage(); - ISinkResult result = await moduleMessageProcessor.ProcessAsync(message1, CancellationToken.None); + ISinkResult result = await messageProcessor.ProcessAsync(message1, CancellationToken.None); Assert.NotNull(result); Assert.NotEmpty(result.Succeeded); Assert.Empty(result.Failed); Assert.Empty(result.InvalidDetailsList); Assert.False(result.SendFailureDetails.HasValue); - ISinkResult resultBatch = await moduleMessageProcessor.ProcessAsync(new[] { message1, message2 }, CancellationToken.None); + ISinkResult resultBatch = await messageProcessor.ProcessAsync(new[] { message1, message2 }, CancellationToken.None); Assert.NotNull(resultBatch); Assert.NotEmpty(resultBatch.Succeeded); Assert.Empty(resultBatch.Failed); @@ -98,46 +70,24 @@ public async Task ProcessAsyncTest() [Unit] public async Task ProcessAsync_NoConnection_ShoulFailTest() { - Core.IMessageConverter routingMessageConverter = new RoutingMessageConverter(); - - const string Mod1Id = "device1/module1"; - const string ModEndpointId = "in1"; - - IReadOnlyDictionary deviceSubscriptions = new ReadOnlyDictionary( - new Dictionary - { - [DeviceSubscription.ModuleMessages] = true - }); - var connectionManager = new Mock(); - connectionManager.Setup(c => c.GetDeviceConnection(It.IsAny())).Returns(Option.None()); - connectionManager.Setup(c => c.GetSubscriptions(It.IsAny())).Returns(Option.Some(deviceSubscriptions)); - - byte[] messageBody = Encoding.UTF8.GetBytes("Message body"); - var properties = new Dictionary() - { - { "Prop1", "Val1" }, - { "Prop2", "Val2" }, - }; - - var systemProperties = new Dictionary - { - { SystemProperties.DeviceId, Mod1Id } - }; + var throwingDeviceProxy = ThrowingDeviceProxyBuilder + .Create() + .WithActiveStatus(false) + .Build(); - var message1 = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); - var message2 = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); + var messageProcessor = this.CreateMessageProcessor(throwingDeviceProxy); - var moduleEndpoint = new ModuleEndpoint($"{Mod1Id}/{ModEndpointId}", Mod1Id, ModEndpointId, connectionManager.Object, routingMessageConverter); - IProcessor moduleMessageProcessor = moduleEndpoint.CreateProcessor(); + var message1 = GetMessage(); + var message2 = GetMessage(); - ISinkResult result = await moduleMessageProcessor.ProcessAsync(message1, CancellationToken.None); + ISinkResult result = await messageProcessor.ProcessAsync(message1, CancellationToken.None); Assert.NotNull(result); Assert.Empty(result.Succeeded); Assert.Equal(1, result.Failed.Count); Assert.Empty(result.InvalidDetailsList); Assert.True(result.SendFailureDetails.HasValue); - ISinkResult resultBatch = await moduleMessageProcessor.ProcessAsync(new[] { message1, message2 }, CancellationToken.None); + ISinkResult resultBatch = await messageProcessor.ProcessAsync(new[] { message1, message2 }, CancellationToken.None); Assert.NotNull(resultBatch); Assert.Empty(resultBatch.Succeeded); Assert.Equal(2, resultBatch.Failed.Count); @@ -149,52 +99,24 @@ public async Task ProcessAsync_NoConnection_ShoulFailTest() [Unit] public async Task ProcessAsync_SendThrowsRetryable_Test() { - Core.IMessageConverter routingMessageConverter = new RoutingMessageConverter(); - - const string Mod1Id = "device1/module1"; - const string ModEndpointId = "in1"; - - var deviceProxyMock = new Mock(); - deviceProxyMock.Setup(c => c.SendMessageAsync(It.IsAny(), It.Is((ep) => ep.Equals(ModEndpointId, StringComparison.OrdinalIgnoreCase)))) - .Throws(); - - deviceProxyMock.SetupGet(p => p.IsActive).Returns(true); - - IReadOnlyDictionary deviceSubscriptions = new ReadOnlyDictionary( - new Dictionary - { - [DeviceSubscription.ModuleMessages] = true - }); - var connectionManager = new Mock(); - connectionManager.Setup(c => c.GetDeviceConnection(It.IsAny())).Returns(Option.Some(deviceProxyMock.Object)); - connectionManager.Setup(c => c.GetSubscriptions(It.IsAny())).Returns(Option.Some(deviceSubscriptions)); + var throwingDeviceProxy = ThrowingDeviceProxyBuilder + .Create() + .WithThrow() + .Build(); - byte[] messageBody = Encoding.UTF8.GetBytes("Message body"); - var properties = new Dictionary() - { - { "Prop1", "Val1" }, - { "Prop2", "Val2" }, - }; + var messageProcessor = this.CreateMessageProcessor(throwingDeviceProxy); - var systemProperties = new Dictionary - { - { SystemProperties.DeviceId, Mod1Id } - }; + var message1 = GetMessage(); + var message2 = GetMessage(); - var message1 = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); - var message2 = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); - - var moduleEndpoint = new ModuleEndpoint($"{Mod1Id}/{ModEndpointId}", Mod1Id, ModEndpointId, connectionManager.Object, routingMessageConverter); - IProcessor moduleMessageProcessor = moduleEndpoint.CreateProcessor(); - - ISinkResult result = await moduleMessageProcessor.ProcessAsync(message1, CancellationToken.None); + ISinkResult result = await messageProcessor.ProcessAsync(message1, CancellationToken.None); Assert.NotNull(result); Assert.Empty(result.Succeeded); Assert.Equal(1, result.Failed.Count); Assert.Empty(result.InvalidDetailsList); Assert.True(result.SendFailureDetails.HasValue); - ISinkResult resultBatch = await moduleMessageProcessor.ProcessAsync(new[] { message1, message2 }, CancellationToken.None); + ISinkResult resultBatch = await messageProcessor.ProcessAsync(new[] { message1, message2 }, CancellationToken.None); Assert.NotNull(resultBatch); Assert.Empty(resultBatch.Succeeded); Assert.Equal(2, resultBatch.Failed.Count); @@ -206,57 +128,169 @@ public async Task ProcessAsync_SendThrowsRetryable_Test() [Unit] public async Task ProcessAsync_SendThrowsNonRetryable_Test() { - Core.IMessageConverter routingMessageConverter = new RoutingMessageConverter(); + var throwingDeviceProxy = ThrowingDeviceProxyBuilder + .Create() + .WithThrow() + .Build(); - const string Mod1Id = "device1/module1"; - const string ModEndpointId = "in1"; + var messageProcessor = this.CreateMessageProcessor(throwingDeviceProxy); - var deviceProxyMock = new Mock(); - deviceProxyMock.Setup(c => c.SendMessageAsync(It.IsAny(), It.Is((ep) => ep.Equals(ModEndpointId, StringComparison.OrdinalIgnoreCase)))) - .Throws(); + var message1 = GetMessage(); + var message2 = GetMessage(); - deviceProxyMock.SetupGet(p => p.IsActive).Returns(true); + ISinkResult result = await messageProcessor.ProcessAsync(message1, CancellationToken.None); + Assert.NotNull(result); + Assert.Empty(result.Succeeded); + Assert.Empty(result.Failed); + Assert.Equal(1, result.InvalidDetailsList.Count); + Assert.False(result.SendFailureDetails.HasValue); + + ISinkResult resultBatch = await messageProcessor.ProcessAsync(new[] { message1, message2 }, CancellationToken.None); + Assert.NotNull(resultBatch); + Assert.Empty(resultBatch.Succeeded); + Assert.Empty(resultBatch.Failed); + Assert.Equal(2, resultBatch.InvalidDetailsList.Count); + Assert.False(resultBatch.SendFailureDetails.HasValue); + } + + [Fact] + [Unit] + public async Task ProcessAsync_FastFailMessages_AfterOneFailedWithRetriable() + { + var throwingDeviceProxy = ThrowingDeviceProxyBuilder + .Create() + .WithSuccess() + .WithThrow() + .WithSuccess() + .Build(); + + var messageProcessor = this.CreateMessageProcessor(throwingDeviceProxy); + var messages = GetMessages(5); + + var result = await messageProcessor.ProcessAsync(messages, CancellationToken.None); + + Assert.False(result.IsSuccessful); + Assert.Equal(1, result.Succeeded.Count); + Assert.Equal(4, result.Failed.Count); + Assert.Equal(FailureKind.Transient, result.SendFailureDetails.Expect(() => new Exception()).FailureKind); + } + + [Fact] + [Unit] + public async Task ProcessAsync_DontFastFailMessages_AfterOneFailedWithNonRetriable() + { + var throwingDeviceProxy = ThrowingDeviceProxyBuilder + .Create() + .WithSuccess() + .WithThrow() + .WithSuccess() + .Build(); + + var messageProcessor = this.CreateMessageProcessor(throwingDeviceProxy); + var messages = GetMessages(5); - IReadOnlyDictionary deviceSubscriptions = new ReadOnlyDictionary( - new Dictionary + var result = await messageProcessor.ProcessAsync(messages, CancellationToken.None); + + Assert.True(result.IsSuccessful); // non-retriable is not reported as failure + Assert.Equal(4, result.Succeeded.Count); + Assert.Equal(1, result.InvalidDetailsList.Count); + Assert.Equal(0, result.Failed.Count); + // SendFailureDetails is not reported for Non-Retriable - this is not consistent with cloud proxy, skip the assert for now + } + + private IProcessor CreateMessageProcessor(IDeviceProxy deviceProxy) + { + IReadOnlyDictionary deviceSubscriptions = + new Dictionary() { [DeviceSubscription.ModuleMessages] = true - }); + }; + var connectionManager = new Mock(); - connectionManager.Setup(c => c.GetDeviceConnection(It.IsAny())).Returns(Option.Some(deviceProxyMock.Object)); + connectionManager.Setup(call => call.GetDeviceConnection(It.IsAny())).Returns(Option.Some(deviceProxy)); connectionManager.Setup(c => c.GetSubscriptions(It.IsAny())).Returns(Option.Some(deviceSubscriptions)); + var moduleEndpoint = new ModuleEndpoint("device1/module1", "module1", "in1", connectionManager.Object, new RoutingMessageConverter()); + return moduleEndpoint.CreateProcessor(); + } + + static IList GetMessages(int count) + { + var messages = new List(); + for (int i = 0; i < count; i++) + { + messages.Add(GetMessage()); + } + + return messages; + } + + static IRoutingMessage GetMessage() + { byte[] messageBody = Encoding.UTF8.GetBytes("Message body"); var properties = new Dictionary() { { "Prop1", "Val1" }, - { "Prop2", "Val2" }, + { "Prop2", "Val2" } }; var systemProperties = new Dictionary { - { SystemProperties.DeviceId, Mod1Id } + { SystemProperties.DeviceId, "device1/module1" } }; - var message1 = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); - var message2 = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); + var message = new RoutingMessage(TelemetryMessageSource.Instance, messageBody, properties, systemProperties); + return message; + } + } - var moduleEndpoint = new ModuleEndpoint($"{Mod1Id}/{ModEndpointId}", Mod1Id, ModEndpointId, connectionManager.Object, routingMessageConverter); - IProcessor moduleMessageProcessor = moduleEndpoint.CreateProcessor(); + internal class ThrowingDeviceProxyBuilder + { + private List> responses = new List>(); + private bool isActive = true; - ISinkResult result = await moduleMessageProcessor.ProcessAsync(message1, CancellationToken.None); - Assert.NotNull(result); - Assert.Empty(result.Succeeded); - Assert.Empty(result.Failed); - Assert.Equal(1, result.InvalidDetailsList.Count); - Assert.False(result.SendFailureDetails.HasValue); + internal static ThrowingDeviceProxyBuilder Create() => new ThrowingDeviceProxyBuilder(); - ISinkResult resultBatch = await moduleMessageProcessor.ProcessAsync(new[] { message1, message2 }, CancellationToken.None); - Assert.NotNull(resultBatch); - Assert.Empty(resultBatch.Succeeded); - Assert.Empty(resultBatch.Failed); - Assert.Equal(2, resultBatch.InvalidDetailsList.Count); - Assert.False(resultBatch.SendFailureDetails.HasValue); + internal ThrowingDeviceProxyBuilder WithThrow() + where T : Exception + { + this.responses.Add(() => Task.FromException(Activator.CreateInstance(typeof(T), "test error") as Exception)); + return this; + } + + internal ThrowingDeviceProxyBuilder WithSuccess(int count = 1) + { + this.responses.AddRange(Enumerable.Repeat>(() => Task.CompletedTask, count)); + return this; + } + + internal ThrowingDeviceProxyBuilder WithActiveStatus(bool isActive) + { + this.isActive = isActive; + return this; + } + + internal IDeviceProxy Build() + { + var nextResponse = this.responses.GetEnumerator(); + var mock = new Mock(); + + mock.SetupGet(call => call.IsActive).Returns(this.isActive); + mock.Setup( + call => call.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns(() => + { + if (nextResponse.MoveNext()) + { + return nextResponse.Current(); + } + else + { + return this.responses.Last()(); + } + }); + + return mock.Object; } } } diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/CollectionEx.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/CollectionEx.cs index 10851249de4..6f8c018c7fa 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/CollectionEx.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/CollectionEx.cs @@ -143,6 +143,8 @@ public static bool TryGetNonEmptyValue(this IDictionary> Batch(this IEnumerable list, int batchSize) { + Preconditions.CheckArgument(batchSize > 0, "BatchSize should be > 0"); + var current = new List(); foreach (T item in list) {