From f9d1e07a57fba5e171d1956e012e901a8427b72c Mon Sep 17 00:00:00 2001 From: Varun Puranik Date: Wed, 31 Jul 2019 16:06:00 -0700 Subject: [PATCH] EdgeHub loses cloud subscriptions if a feedback message is received twice. (#1494) * Fix message completion bug * Add test in DeviceMessageHandlerTest * Add more tests * Add Subscription processor tests * Fix stylecop --- .../CloudProxy.cs | 6 +- .../SubscriptionProcessor.cs | 24 ++- .../device/DeviceMessageHandler.cs | 59 +++++- .../DeviceMessageHandlerTest.cs | 185 ++++++++++++++++-- .../SubscriptionProcessorTest.cs | 77 ++++++-- 5 files changed, 313 insertions(+), 38 deletions(-) diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.CloudProxy/CloudProxy.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.CloudProxy/CloudProxy.cs index 5930ee32a62..42189eac758 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.CloudProxy/CloudProxy.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.CloudProxy/CloudProxy.cs @@ -204,7 +204,7 @@ public async Task UpdateReportedPropertiesAsync(IMessage reportedPropertiesMessa public async Task SendFeedbackMessageAsync(string messageId, FeedbackStatus feedbackStatus) { Preconditions.CheckNonWhiteSpace(messageId, nameof(messageId)); - Events.SendFeedbackMessage(this); + Events.SendFeedbackMessage(this, messageId); this.timer.Reset(); try { @@ -514,9 +514,9 @@ public static void UpdateReportedProperties(CloudProxy cloudProxy) Log.LogDebug((int)EventIds.UpdateReportedProperties, Invariant($"Updating reported properties for device {cloudProxy.clientId}")); } - public static void SendFeedbackMessage(CloudProxy cloudProxy) + public static void SendFeedbackMessage(CloudProxy cloudProxy, string messageId) { - Log.LogDebug((int)EventIds.SendFeedbackMessage, Invariant($"Sending feedback message for device {cloudProxy.clientId}")); + Log.LogDebug((int)EventIds.SendFeedbackMessage, Invariant($"Sending feedback message with id {messageId} for device {cloudProxy.clientId}")); } public static void MessageReceived(string clientId) diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/SubscriptionProcessor.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/SubscriptionProcessor.cs index 9f444a9cac2..085e6654572 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/SubscriptionProcessor.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/SubscriptionProcessor.cs @@ -44,7 +44,8 @@ public SubscriptionProcessor(IConnectionManager connectionManager, IInvokeMethod this.invokeMethodHandler = Preconditions.CheckNotNull(invokeMethodHandler, nameof(invokeMethodHandler)); this.pendingSubscriptions = new ConcurrentDictionary>(); this.processSubscriptionsBlock = new ActionBlock(this.ProcessPendingSubscriptions); - deviceConnectivityManager.DeviceConnected += this.DeviceConnected; + deviceConnectivityManager.DeviceConnected += this.CloudConnectivityEstablished; + connectionManager.CloudConnectionEstablished += this.ClientCloudConnectionEstablished; } protected override void HandleSubscriptions(string id, List<(DeviceSubscription, bool)> subscriptions) => @@ -109,7 +110,7 @@ async Task ProcessSubscription(string id, Option cloudProxy, Device } } - async void DeviceConnected(object sender, EventArgs eventArgs) + async void CloudConnectivityEstablished(object sender, EventArgs eventArgs) { Events.DeviceConnectedProcessingSubscriptions(); try @@ -134,6 +135,19 @@ async void DeviceConnected(object sender, EventArgs eventArgs) } } + async void ClientCloudConnectionEstablished(object sender, IIdentity identity) + { + try + { + Events.ClientConnectedProcessingSubscriptions(identity); + await this.ProcessExistingSubscriptions(identity.Id); + } + catch (Exception e) + { + Events.ErrorProcessingSubscriptions(e, identity); + } + } + async Task ProcessExistingSubscriptions(string id) { Option cloudProxy = await this.ConnectionManager.GetCloudConnection(id); @@ -200,6 +214,7 @@ enum EventIds ErrorAddingSubscription, ProcessingSubscriptions, ProcessingSubscription, + ClientConnectedProcessingSubscriptions, ProcessingSubscriptionsNoCloudProxy } @@ -263,6 +278,11 @@ internal static void ErrorProcessingSubscriptions(Exception e) Log.LogWarning((int)EventIds.ProcessingSubscription, e, Invariant($"Error processing subscriptions for connected clients.")); } + public static void ClientConnectedProcessingSubscriptions(IIdentity identity) + { + Log.LogInformation((int)EventIds.ClientConnectedProcessingSubscriptions, Invariant($"Client {identity.Id} connected to cloud, processing existing subscriptions.")); + } + public static void ProcessingSubscriptions(string id) { Log.LogInformation((int)EventIds.ProcessingSubscription, Invariant($"Processing pending subscriptions for {id}")); diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs index 746f2c0facc..d57670623e1 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs @@ -21,6 +21,7 @@ class DeviceMessageHandler : IDeviceListener, IDeviceProxy readonly ConcurrentDictionary> methodCallTaskCompletionSources = new ConcurrentDictionary>(); readonly ConcurrentDictionary> messageTaskCompletionSources = new ConcurrentDictionary>(); + readonly ConcurrentDictionary c2dMessageTaskCompletionSources = new ConcurrentDictionary(); readonly IEdgeHub edgeHub; readonly IConnectionManager connectionManager; @@ -104,11 +105,16 @@ public async Task ProcessMessageFeedbackAsync(string messageId, FeedbackStatus f taskCompletionSource.SetException(new EdgeHubIOException($"Message not completed by client {this.Identity.Id}")); } } - else + else if (this.c2dMessageTaskCompletionSources.TryRemove(messageId, out bool value) && value) { + Events.ReceivedC2DFeedbackMessage(this.Identity, messageId); Option cloudProxy = await this.connectionManager.GetCloudConnection(this.Identity.Id); await cloudProxy.ForEachAsync(cp => cp.SendFeedbackMessageAsync(messageId, feedbackStatus)); } + else + { + Events.UnknownFeedbackMessage(this.Identity, messageId, feedbackStatus); + } } public Task AddSubscription(DeviceSubscription subscription) @@ -262,7 +268,12 @@ enum EventIds MessageSentToClient, ErrorGettingTwin, ErrorUpdatingReportedProperties, - ProcessedGetTwin + ProcessedGetTwin, + C2dNoLockToken, + SendingC2DMessage, + ReceivedC2DMessageWithSameToken, + ReceivedC2DFeedbackMessage, + UnknownFeedbackMessage } public static void BindDeviceProxy(IIdentity identity) @@ -334,11 +345,53 @@ public static void ProcessedGetTwin(string identityId) { Log.LogDebug((int)EventIds.ProcessedGetTwin, Invariant($"Processed GetTwin for {identityId}")); } + + public static void C2dNoLockToken(IIdentity identity) + { + Log.LogWarning((int)EventIds.C2dNoLockToken, Invariant($"Received C2D message for {identity.Id} with no lock token. Abandoning message.")); + } + + public static void SendingC2DMessage(IIdentity identity, string lockToken) + { + Log.LogDebug((int)EventIds.SendingC2DMessage, Invariant($"Sending C2D message with lock token {lockToken} to {identity.Id}")); + } + + public static void ReceivedC2DMessageWithSameToken(IIdentity identity, string lockToken) + { + Log.LogWarning((int)EventIds.ReceivedC2DMessageWithSameToken, Invariant($"Received duplicate C2D message for {identity.Id} with the same lock token {lockToken}. Abandoning message.")); + } + + public static void ReceivedC2DFeedbackMessage(IIdentity identity, string messageId) + { + Log.LogDebug((int)EventIds.ReceivedC2DFeedbackMessage, Invariant($"Received C2D feedback message from {identity.Id} with lock token {messageId}")); + } + + public static void UnknownFeedbackMessage(IIdentity identity, string messageId, FeedbackStatus feedbackStatus) + { + Log.LogWarning((int)EventIds.UnknownFeedbackMessage, Invariant($"Received unknown feedback message from {identity.Id} with lock token {messageId} and status {feedbackStatus}. Abandoning message.")); + } } #region IDeviceProxy - public Task SendC2DMessageAsync(IMessage message) => this.underlyingProxy.SendC2DMessageAsync(message); + public Task SendC2DMessageAsync(IMessage message) + { + if (!message.SystemProperties.TryGetValue(SystemProperties.LockToken, out string lockToken) || string.IsNullOrWhiteSpace(lockToken)) + { + // TODO - Should we throw here instead? + Events.C2dNoLockToken(this.Identity); + return Task.CompletedTask; + } + + Events.SendingC2DMessage(this.Identity, lockToken); + if (!this.c2dMessageTaskCompletionSources.TryAdd(lockToken, true)) + { + Events.ReceivedC2DMessageWithSameToken(this.Identity, lockToken); + return Task.CompletedTask; + } + + return this.underlyingProxy.SendC2DMessageAsync(message); + } /// /// This method sends the message to the device, and adds the TaskCompletionSource (that awaits the response) to the messageTaskCompletionSources list. diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/DeviceMessageHandlerTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/DeviceMessageHandlerTest.cs index 49f3f84eb1e..8fb8ee0fbdd 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/DeviceMessageHandlerTest.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/DeviceMessageHandlerTest.cs @@ -57,22 +57,6 @@ public async Task ProcessMessageBatchAsync_RouteAsyncTest() Mock.Get(edgeHub).Verify(eh => eh.ProcessDeviceMessageBatch(identity, It.IsAny>()), Times.Once()); } - [Fact] - public async Task ProcessFeedbackMessageAsync_RouteAsyncTest() - { - var cloudProxy = Mock.Of(); - var connectionManager = Mock.Of(); - var edgeHub = Mock.Of(); - var identity = Mock.Of(); - string messageId = "messageId"; - var status = FeedbackStatus.Complete; - Mock.Get(connectionManager).Setup(c => c.GetCloudConnection(It.IsAny())).Returns(Task.FromResult(Option.Some(cloudProxy))); - var deviceListener = new DeviceMessageHandler(identity, edgeHub, connectionManager); - await deviceListener.ProcessMessageFeedbackAsync(messageId, status); - - Mock.Get(cloudProxy).Verify(cp => cp.SendFeedbackMessageAsync(messageId, status), Times.Once()); - } - [Fact] public async Task ForwardsTwinPatchOperationToTheCloudProxy() { @@ -181,8 +165,8 @@ public async Task MessageCompletionTest() string messageId = receivedMessage.SystemProperties[SystemProperties.LockToken]; await deviceMessageHandler.ProcessMessageFeedbackAsync(messageId, FeedbackStatus.Complete); + await Task.Delay(TimeSpan.FromSeconds(1)); Assert.True(sendMessageTask.IsCompleted); - Assert.False(sendMessageTask.IsFaulted); } [Fact] @@ -231,6 +215,42 @@ public async Task MessageCompletionMismatchedResponseTest() Assert.False(sendMessageTask.IsCompleted); } + [Fact] + public async Task MultipleMessageCompletionTest() + { + var connMgr = new Mock(); + connMgr.Setup(c => c.AddDeviceConnection(It.IsAny(), It.IsAny())); + var identity = Mock.Of(m => m.DeviceId == "device1" && m.ModuleId == "module1" && m.Id == "device1/module1"); + var cloudProxy = new Mock(); + cloudProxy.Setup(c => c.SendFeedbackMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + var edgeHub = Mock.Of(); + var underlyingDeviceProxy = new Mock(); + IMessage receivedMessage = null; + underlyingDeviceProxy.Setup(d => d.SendMessageAsync(It.IsAny(), It.IsAny())) + .Callback((m, s) => receivedMessage = m) + .Returns(Task.CompletedTask); + connMgr.Setup(c => c.GetCloudConnection(It.IsAny())).Returns(Task.FromResult(Option.Some(cloudProxy.Object))); + var deviceMessageHandler = new DeviceMessageHandler(identity, edgeHub, connMgr.Object); + deviceMessageHandler.BindDeviceProxy(underlyingDeviceProxy.Object); + + IMessage message = new EdgeMessage.Builder(new byte[0]).Build(); + Task sendMessageTask = deviceMessageHandler.SendMessageAsync(message, "input1"); + Assert.False(sendMessageTask.IsCompleted); + + string messageId = receivedMessage.SystemProperties[SystemProperties.LockToken]; + await deviceMessageHandler.ProcessMessageFeedbackAsync(messageId, FeedbackStatus.Complete); + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.True(sendMessageTask.IsCompleted); + + await deviceMessageHandler.ProcessMessageFeedbackAsync(messageId, FeedbackStatus.Complete); + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.True(sendMessageTask.IsCompleted); + cloudProxy.Verify(c => c.SendFeedbackMessageAsync(It.IsAny(), It.IsAny()), Times.Never); + } + [Fact] public async Task X509DeviceCanSendMessageTest() { @@ -319,6 +339,137 @@ public async Task ProcessRemoveDesiredPropertiesUpdateSubscription() edgeHub.VerifyAll(); } + [Fact] + public async Task ProcessC2DMessageTest() + { + // Arrange + var identity = Mock.Of(m => m.DeviceId == "device1" && m.Id == "device1"); + var cloudProxy = new Mock(); + cloudProxy.Setup(c => c.SendFeedbackMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + var edgeHub = Mock.Of(); + var underlyingDeviceProxy = new Mock(); + underlyingDeviceProxy.Setup(d => d.SendC2DMessageAsync(It.IsAny())).Returns(Task.CompletedTask); + + var connMgr = new Mock(); + connMgr.Setup(c => c.AddDeviceConnection(It.IsAny(), It.IsAny())); + connMgr.Setup(c => c.GetCloudConnection(It.IsAny())).Returns(Task.FromResult(Option.Some(cloudProxy.Object))); + var deviceMessageHandler = new DeviceMessageHandler(identity, edgeHub, connMgr.Object); + deviceMessageHandler.BindDeviceProxy(underlyingDeviceProxy.Object); + + string lockToken = Guid.NewGuid().ToString(); + var systemProperties = new Dictionary + { + [SystemProperties.LockToken] = lockToken + }; + + var message = Mock.Of(m => m.SystemProperties == systemProperties); + + // Act + await deviceMessageHandler.SendC2DMessageAsync(message); + + // Assert + underlyingDeviceProxy.Verify(d => d.SendC2DMessageAsync(It.IsAny()), Times.Once); + + // Act + await deviceMessageHandler.ProcessMessageFeedbackAsync(lockToken, FeedbackStatus.Complete); + + // Assert + cloudProxy.Verify(c => c.SendFeedbackMessageAsync(It.IsAny(), It.IsAny()), Times.Once); + } + + [Fact] + public async Task ProcessC2DMessageWithNoLockTokenTest() + { + // Arrange + var identity = Mock.Of(m => m.DeviceId == "device1" && m.Id == "device1"); + var cloudProxy = new Mock(); + cloudProxy.Setup(c => c.SendFeedbackMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + var edgeHub = Mock.Of(); + var underlyingDeviceProxy = new Mock(); + underlyingDeviceProxy.Setup(d => d.SendC2DMessageAsync(It.IsAny())).Returns(Task.CompletedTask); + + var connMgr = new Mock(); + connMgr.Setup(c => c.AddDeviceConnection(It.IsAny(), It.IsAny())); + connMgr.Setup(c => c.GetCloudConnection(It.IsAny())).Returns(Task.FromResult(Option.Some(cloudProxy.Object))); + var deviceMessageHandler = new DeviceMessageHandler(identity, edgeHub, connMgr.Object); + deviceMessageHandler.BindDeviceProxy(underlyingDeviceProxy.Object); + + string lockToken = Guid.NewGuid().ToString(); + var systemProperties = new Dictionary + { + [SystemProperties.MessageId] = lockToken + }; + + var message = Mock.Of(m => m.SystemProperties == systemProperties); + + // Act + await deviceMessageHandler.SendC2DMessageAsync(message); + await Task.Delay(TimeSpan.FromSeconds(1)); + + // Assert + underlyingDeviceProxy.Verify(d => d.SendC2DMessageAsync(It.IsAny()), Times.Never); + + // Act + await deviceMessageHandler.ProcessMessageFeedbackAsync(lockToken, FeedbackStatus.Complete); + + // Assert + cloudProxy.Verify(c => c.SendFeedbackMessageAsync(It.IsAny(), It.IsAny()), Times.Never); + } + + [Fact] + public async Task ProcessDuplicateC2DMessageTest() + { + // Arrange + var identity = Mock.Of(m => m.DeviceId == "device1" && m.Id == "device1"); + var cloudProxy = new Mock(); + cloudProxy.Setup(c => c.SendFeedbackMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + var edgeHub = Mock.Of(); + var underlyingDeviceProxy = new Mock(); + underlyingDeviceProxy.Setup(d => d.SendC2DMessageAsync(It.IsAny())).Returns(Task.CompletedTask); + + var connMgr = new Mock(); + connMgr.Setup(c => c.AddDeviceConnection(It.IsAny(), It.IsAny())); + connMgr.Setup(c => c.GetCloudConnection(It.IsAny())).Returns(Task.FromResult(Option.Some(cloudProxy.Object))); + var deviceMessageHandler = new DeviceMessageHandler(identity, edgeHub, connMgr.Object); + deviceMessageHandler.BindDeviceProxy(underlyingDeviceProxy.Object); + + string lockToken = Guid.NewGuid().ToString(); + var systemProperties1 = new Dictionary + { + [SystemProperties.LockToken] = lockToken + }; + + var message1 = Mock.Of(m => m.SystemProperties == systemProperties1); + + var systemProperties2 = new Dictionary + { + [SystemProperties.LockToken] = lockToken + }; + + var message2 = Mock.Of(m => m.SystemProperties == systemProperties2); + + // Act + await deviceMessageHandler.SendC2DMessageAsync(message1); + + // Assert + underlyingDeviceProxy.Verify(d => d.SendC2DMessageAsync(It.IsAny()), Times.Once); + + // Act + await deviceMessageHandler.SendC2DMessageAsync(message2); + + // Assert + underlyingDeviceProxy.Verify(d => d.SendC2DMessageAsync(It.IsAny()), Times.Once); + + // Act + await deviceMessageHandler.ProcessMessageFeedbackAsync(lockToken, FeedbackStatus.Complete); + + // Assert + cloudProxy.Verify(c => c.SendFeedbackMessageAsync(It.IsAny(), It.IsAny()), Times.Once); + } + DeviceMessageHandler GetDeviceMessageHandler() { var identity = Mock.Of(m => m.DeviceId == "device1" && m.ModuleId == "module1" && m.Id == "device1/module1"); diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/SubscriptionProcessorTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/SubscriptionProcessorTest.cs index 18e0bfe2c63..28e4c8b8dad 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/SubscriptionProcessorTest.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/SubscriptionProcessorTest.cs @@ -9,8 +9,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test using Microsoft.Azure.Devices.Edge.Hub.Core.Identity; using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.Test.Common; - using Microsoft.Azure.Devices.Routing.Core; - using Microsoft.Azure.Devices.Routing.Core.MessageSources; using Moq; using Xunit; @@ -301,7 +299,7 @@ public async Task RemoveSubscriptionTimesOutTest() } [Fact] - public async Task ProcessSubscriptionsOnDeviceConnected() + public void ProcessSubscriptionsOnDeviceConnected() { // Arrange string d1 = "d1"; @@ -346,16 +344,6 @@ public async Task ProcessSubscriptionsOnDeviceConnected() && c.GetCloudConnection(d1) == Task.FromResult(Option.Some(device1CloudProxy)) && c.GetCloudConnection(m1) == Task.FromResult(Option.Some(module1CloudProxy))); - var endpoint = new Mock("myId"); - var endpointExecutor = Mock.Of(); - Mock.Get(endpointExecutor).SetupGet(ee => ee.Endpoint).Returns(() => endpoint.Object); - var endpointExecutorFactory = Mock.Of(); - Mock.Get(endpointExecutorFactory).Setup(eef => eef.CreateAsync(It.IsAny())).ReturnsAsync(endpointExecutor); - var endpoints = new HashSet { endpoint.Object }; - var route = new Route("myRoute", "true", "myIotHub", TelemetryMessageSource.Instance, endpoints); - var routerConfig = new RouterConfig(new[] { route }); - Router router = await Router.CreateAsync("myRouter", "myIotHub", routerConfig, endpointExecutorFactory); - var deviceConnectivityManager = Mock.Of(); var subscriptionProcessor = new SubscriptionProcessor(connectionManager, invokeMethodHandler, deviceConnectivityManager); @@ -370,5 +358,68 @@ public async Task ProcessSubscriptionsOnDeviceConnected() Mock.Get(invokeMethodHandler).VerifyAll(); Mock.Get(connectionManager).VerifyAll(); } + + [Fact] + public void ProcessSubscriptionsOnClientCloudConnectionEstablished() + { + // Arrange + string d1 = "d1"; + var deviceIdentity = Mock.Of(d => d.Id == d1); + string m1 = "d2/m1"; + var moduleIdentity = Mock.Of(m => m.Id == m1); + + IReadOnlyDictionary device1Subscriptions = new Dictionary() + { + [DeviceSubscription.Methods] = true, + [DeviceSubscription.DesiredPropertyUpdates] = true + }; + + IReadOnlyDictionary module1Subscriptions = new Dictionary() + { + [DeviceSubscription.Methods] = true, + [DeviceSubscription.ModuleMessages] = true + }; + + var device1CloudProxy = Mock.Of( + dc => dc.SetupDesiredPropertyUpdatesAsync() == Task.CompletedTask + && dc.SetupCallMethodAsync() == Task.CompletedTask); + Mock.Get(device1CloudProxy).SetupGet(d => d.IsActive).Returns(true); + var module1CloudProxy = Mock.Of(mc => mc.SetupCallMethodAsync() == Task.CompletedTask && mc.IsActive); + + var invokeMethodHandler = Mock.Of( + m => + m.ProcessInvokeMethodSubscription(d1) == Task.CompletedTask + && m.ProcessInvokeMethodSubscription(m1) == Task.CompletedTask); + + var connectionManager = Mock.Of( + c => + c.GetSubscriptions(d1) == Option.Some(device1Subscriptions) + && c.GetSubscriptions(m1) == Option.Some(module1Subscriptions) + && c.GetCloudConnection(d1) == Task.FromResult(Option.Some(device1CloudProxy)) + && c.GetCloudConnection(m1) == Task.FromResult(Option.Some(module1CloudProxy))); + + var deviceConnectivityManager = Mock.Of(); + + var subscriptionProcessor = new SubscriptionProcessor(connectionManager, invokeMethodHandler, deviceConnectivityManager); + + // Act + Mock.Get(connectionManager).Raise(d => d.CloudConnectionEstablished += null, this, deviceIdentity); + + // Assert + Mock.Get(device1CloudProxy).Verify(d => d.SetupDesiredPropertyUpdatesAsync(), Times.Once); + Mock.Get(device1CloudProxy).Verify(d => d.SetupCallMethodAsync(), Times.Once); + Mock.Get(module1CloudProxy).Verify(m => m.SetupCallMethodAsync(), Times.Never); + + // Act + Mock.Get(connectionManager).Raise(d => d.CloudConnectionEstablished += null, this, moduleIdentity); + + // Assert + Mock.Get(device1CloudProxy).Verify(d => d.SetupDesiredPropertyUpdatesAsync(), Times.Once); + Mock.Get(device1CloudProxy).Verify(d => d.SetupCallMethodAsync(), Times.Once); + Mock.Get(module1CloudProxy).Verify(m => m.SetupCallMethodAsync(), Times.Once); + + Mock.Get(invokeMethodHandler).VerifyAll(); + Mock.Get(connectionManager).VerifyAll(); + } } }