Skip to content

Commit

Permalink
EdgeHub loses cloud subscriptions if a feedback message is received t…
Browse files Browse the repository at this point in the history
…wice. (#1494)

* Fix message completion bug

* Add test in DeviceMessageHandlerTest

* Add more tests

* Add Subscription processor tests

* Fix stylecop
  • Loading branch information
varunpuranik authored Jul 31, 2019
1 parent ef7fe90 commit f9d1e07
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public async Task UpdateReportedPropertiesAsync(IMessage reportedPropertiesMessa
public async Task SendFeedbackMessageAsync(string messageId, FeedbackStatus feedbackStatus)
{
Preconditions.CheckNonWhiteSpace(messageId, nameof(messageId));
Events.SendFeedbackMessage(this);
Events.SendFeedbackMessage(this, messageId);
this.timer.Reset();
try
{
Expand Down Expand Up @@ -514,9 +514,9 @@ public static void UpdateReportedProperties(CloudProxy cloudProxy)
Log.LogDebug((int)EventIds.UpdateReportedProperties, Invariant($"Updating reported properties for device {cloudProxy.clientId}"));
}

public static void SendFeedbackMessage(CloudProxy cloudProxy)
public static void SendFeedbackMessage(CloudProxy cloudProxy, string messageId)
{
Log.LogDebug((int)EventIds.SendFeedbackMessage, Invariant($"Sending feedback message for device {cloudProxy.clientId}"));
Log.LogDebug((int)EventIds.SendFeedbackMessage, Invariant($"Sending feedback message with id {messageId} for device {cloudProxy.clientId}"));
}

public static void MessageReceived(string clientId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public SubscriptionProcessor(IConnectionManager connectionManager, IInvokeMethod
this.invokeMethodHandler = Preconditions.CheckNotNull(invokeMethodHandler, nameof(invokeMethodHandler));
this.pendingSubscriptions = new ConcurrentDictionary<string, ConcurrentQueue<(DeviceSubscription, bool)>>();
this.processSubscriptionsBlock = new ActionBlock<string>(this.ProcessPendingSubscriptions);
deviceConnectivityManager.DeviceConnected += this.DeviceConnected;
deviceConnectivityManager.DeviceConnected += this.CloudConnectivityEstablished;
connectionManager.CloudConnectionEstablished += this.ClientCloudConnectionEstablished;
}

protected override void HandleSubscriptions(string id, List<(DeviceSubscription, bool)> subscriptions) =>
Expand Down Expand Up @@ -109,7 +110,7 @@ async Task ProcessSubscription(string id, Option<ICloudProxy> cloudProxy, Device
}
}

async void DeviceConnected(object sender, EventArgs eventArgs)
async void CloudConnectivityEstablished(object sender, EventArgs eventArgs)
{
Events.DeviceConnectedProcessingSubscriptions();
try
Expand All @@ -134,6 +135,19 @@ async void DeviceConnected(object sender, EventArgs eventArgs)
}
}

async void ClientCloudConnectionEstablished(object sender, IIdentity identity)
{
try
{
Events.ClientConnectedProcessingSubscriptions(identity);
await this.ProcessExistingSubscriptions(identity.Id);
}
catch (Exception e)
{
Events.ErrorProcessingSubscriptions(e, identity);
}
}

async Task ProcessExistingSubscriptions(string id)
{
Option<ICloudProxy> cloudProxy = await this.ConnectionManager.GetCloudConnection(id);
Expand Down Expand Up @@ -200,6 +214,7 @@ enum EventIds
ErrorAddingSubscription,
ProcessingSubscriptions,
ProcessingSubscription,
ClientConnectedProcessingSubscriptions,
ProcessingSubscriptionsNoCloudProxy
}

Expand Down Expand Up @@ -263,6 +278,11 @@ internal static void ErrorProcessingSubscriptions(Exception e)
Log.LogWarning((int)EventIds.ProcessingSubscription, e, Invariant($"Error processing subscriptions for connected clients."));
}

public static void ClientConnectedProcessingSubscriptions(IIdentity identity)
{
Log.LogInformation((int)EventIds.ClientConnectedProcessingSubscriptions, Invariant($"Client {identity.Id} connected to cloud, processing existing subscriptions."));
}

public static void ProcessingSubscriptions(string id)
{
Log.LogInformation((int)EventIds.ProcessingSubscription, Invariant($"Processing pending subscriptions for {id}"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class DeviceMessageHandler : IDeviceListener, IDeviceProxy

readonly ConcurrentDictionary<string, TaskCompletionSource<DirectMethodResponse>> methodCallTaskCompletionSources = new ConcurrentDictionary<string, TaskCompletionSource<DirectMethodResponse>>();
readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> messageTaskCompletionSources = new ConcurrentDictionary<string, TaskCompletionSource<bool>>();
readonly ConcurrentDictionary<string, bool> c2dMessageTaskCompletionSources = new ConcurrentDictionary<string, bool>();

readonly IEdgeHub edgeHub;
readonly IConnectionManager connectionManager;
Expand Down Expand Up @@ -104,11 +105,16 @@ public async Task ProcessMessageFeedbackAsync(string messageId, FeedbackStatus f
taskCompletionSource.SetException(new EdgeHubIOException($"Message not completed by client {this.Identity.Id}"));
}
}
else
else if (this.c2dMessageTaskCompletionSources.TryRemove(messageId, out bool value) && value)
{
Events.ReceivedC2DFeedbackMessage(this.Identity, messageId);
Option<ICloudProxy> cloudProxy = await this.connectionManager.GetCloudConnection(this.Identity.Id);
await cloudProxy.ForEachAsync(cp => cp.SendFeedbackMessageAsync(messageId, feedbackStatus));
}
else
{
Events.UnknownFeedbackMessage(this.Identity, messageId, feedbackStatus);
}
}

public Task AddSubscription(DeviceSubscription subscription)
Expand Down Expand Up @@ -262,7 +268,12 @@ enum EventIds
MessageSentToClient,
ErrorGettingTwin,
ErrorUpdatingReportedProperties,
ProcessedGetTwin
ProcessedGetTwin,
C2dNoLockToken,
SendingC2DMessage,
ReceivedC2DMessageWithSameToken,
ReceivedC2DFeedbackMessage,
UnknownFeedbackMessage
}

public static void BindDeviceProxy(IIdentity identity)
Expand Down Expand Up @@ -334,11 +345,53 @@ public static void ProcessedGetTwin(string identityId)
{
Log.LogDebug((int)EventIds.ProcessedGetTwin, Invariant($"Processed GetTwin for {identityId}"));
}

public static void C2dNoLockToken(IIdentity identity)
{
Log.LogWarning((int)EventIds.C2dNoLockToken, Invariant($"Received C2D message for {identity.Id} with no lock token. Abandoning message."));
}

public static void SendingC2DMessage(IIdentity identity, string lockToken)
{
Log.LogDebug((int)EventIds.SendingC2DMessage, Invariant($"Sending C2D message with lock token {lockToken} to {identity.Id}"));
}

public static void ReceivedC2DMessageWithSameToken(IIdentity identity, string lockToken)
{
Log.LogWarning((int)EventIds.ReceivedC2DMessageWithSameToken, Invariant($"Received duplicate C2D message for {identity.Id} with the same lock token {lockToken}. Abandoning message."));
}

public static void ReceivedC2DFeedbackMessage(IIdentity identity, string messageId)
{
Log.LogDebug((int)EventIds.ReceivedC2DFeedbackMessage, Invariant($"Received C2D feedback message from {identity.Id} with lock token {messageId}"));
}

public static void UnknownFeedbackMessage(IIdentity identity, string messageId, FeedbackStatus feedbackStatus)
{
Log.LogWarning((int)EventIds.UnknownFeedbackMessage, Invariant($"Received unknown feedback message from {identity.Id} with lock token {messageId} and status {feedbackStatus}. Abandoning message."));
}
}

#region IDeviceProxy

public Task SendC2DMessageAsync(IMessage message) => this.underlyingProxy.SendC2DMessageAsync(message);
public Task SendC2DMessageAsync(IMessage message)
{
if (!message.SystemProperties.TryGetValue(SystemProperties.LockToken, out string lockToken) || string.IsNullOrWhiteSpace(lockToken))
{
// TODO - Should we throw here instead?
Events.C2dNoLockToken(this.Identity);
return Task.CompletedTask;
}

Events.SendingC2DMessage(this.Identity, lockToken);
if (!this.c2dMessageTaskCompletionSources.TryAdd(lockToken, true))
{
Events.ReceivedC2DMessageWithSameToken(this.Identity, lockToken);
return Task.CompletedTask;
}

return this.underlyingProxy.SendC2DMessageAsync(message);
}

/// <summary>
/// This method sends the message to the device, and adds the TaskCompletionSource (that awaits the response) to the messageTaskCompletionSources list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,6 @@ public async Task ProcessMessageBatchAsync_RouteAsyncTest()
Mock.Get(edgeHub).Verify(eh => eh.ProcessDeviceMessageBatch(identity, It.IsAny<IEnumerable<IMessage>>()), Times.Once());
}

[Fact]
public async Task ProcessFeedbackMessageAsync_RouteAsyncTest()
{
var cloudProxy = Mock.Of<ICloudProxy>();
var connectionManager = Mock.Of<IConnectionManager>();
var edgeHub = Mock.Of<IEdgeHub>();
var identity = Mock.Of<IDeviceIdentity>();
string messageId = "messageId";
var status = FeedbackStatus.Complete;
Mock.Get(connectionManager).Setup(c => c.GetCloudConnection(It.IsAny<string>())).Returns(Task.FromResult(Option.Some(cloudProxy)));
var deviceListener = new DeviceMessageHandler(identity, edgeHub, connectionManager);
await deviceListener.ProcessMessageFeedbackAsync(messageId, status);

Mock.Get(cloudProxy).Verify(cp => cp.SendFeedbackMessageAsync(messageId, status), Times.Once());
}

[Fact]
public async Task ForwardsTwinPatchOperationToTheCloudProxy()
{
Expand Down Expand Up @@ -181,8 +165,8 @@ public async Task MessageCompletionTest()
string messageId = receivedMessage.SystemProperties[SystemProperties.LockToken];
await deviceMessageHandler.ProcessMessageFeedbackAsync(messageId, FeedbackStatus.Complete);

await Task.Delay(TimeSpan.FromSeconds(1));
Assert.True(sendMessageTask.IsCompleted);
Assert.False(sendMessageTask.IsFaulted);
}

[Fact]
Expand Down Expand Up @@ -231,6 +215,42 @@ public async Task MessageCompletionMismatchedResponseTest()
Assert.False(sendMessageTask.IsCompleted);
}

[Fact]
public async Task MultipleMessageCompletionTest()
{
var connMgr = new Mock<IConnectionManager>();
connMgr.Setup(c => c.AddDeviceConnection(It.IsAny<IIdentity>(), It.IsAny<IDeviceProxy>()));
var identity = Mock.Of<IModuleIdentity>(m => m.DeviceId == "device1" && m.ModuleId == "module1" && m.Id == "device1/module1");
var cloudProxy = new Mock<ICloudProxy>();
cloudProxy.Setup(c => c.SendFeedbackMessageAsync(It.IsAny<string>(), It.IsAny<FeedbackStatus>()))
.Returns(Task.CompletedTask);
var edgeHub = Mock.Of<IEdgeHub>();
var underlyingDeviceProxy = new Mock<IDeviceProxy>();
IMessage receivedMessage = null;
underlyingDeviceProxy.Setup(d => d.SendMessageAsync(It.IsAny<IMessage>(), It.IsAny<string>()))
.Callback<IMessage, string>((m, s) => receivedMessage = m)
.Returns(Task.CompletedTask);
connMgr.Setup(c => c.GetCloudConnection(It.IsAny<string>())).Returns(Task.FromResult(Option.Some(cloudProxy.Object)));
var deviceMessageHandler = new DeviceMessageHandler(identity, edgeHub, connMgr.Object);
deviceMessageHandler.BindDeviceProxy(underlyingDeviceProxy.Object);

IMessage message = new EdgeMessage.Builder(new byte[0]).Build();
Task sendMessageTask = deviceMessageHandler.SendMessageAsync(message, "input1");
Assert.False(sendMessageTask.IsCompleted);

string messageId = receivedMessage.SystemProperties[SystemProperties.LockToken];
await deviceMessageHandler.ProcessMessageFeedbackAsync(messageId, FeedbackStatus.Complete);
await Task.Delay(TimeSpan.FromSeconds(1));

Assert.True(sendMessageTask.IsCompleted);

await deviceMessageHandler.ProcessMessageFeedbackAsync(messageId, FeedbackStatus.Complete);
await Task.Delay(TimeSpan.FromSeconds(1));

Assert.True(sendMessageTask.IsCompleted);
cloudProxy.Verify(c => c.SendFeedbackMessageAsync(It.IsAny<string>(), It.IsAny<FeedbackStatus>()), Times.Never);
}

[Fact]
public async Task X509DeviceCanSendMessageTest()
{
Expand Down Expand Up @@ -319,6 +339,137 @@ public async Task ProcessRemoveDesiredPropertiesUpdateSubscription()
edgeHub.VerifyAll();
}

[Fact]
public async Task ProcessC2DMessageTest()
{
// Arrange
var identity = Mock.Of<IModuleIdentity>(m => m.DeviceId == "device1" && m.Id == "device1");
var cloudProxy = new Mock<ICloudProxy>();
cloudProxy.Setup(c => c.SendFeedbackMessageAsync(It.IsAny<string>(), It.IsAny<FeedbackStatus>()))
.Returns(Task.CompletedTask);
var edgeHub = Mock.Of<IEdgeHub>();
var underlyingDeviceProxy = new Mock<IDeviceProxy>();
underlyingDeviceProxy.Setup(d => d.SendC2DMessageAsync(It.IsAny<IMessage>())).Returns(Task.CompletedTask);

var connMgr = new Mock<IConnectionManager>();
connMgr.Setup(c => c.AddDeviceConnection(It.IsAny<IIdentity>(), It.IsAny<IDeviceProxy>()));
connMgr.Setup(c => c.GetCloudConnection(It.IsAny<string>())).Returns(Task.FromResult(Option.Some(cloudProxy.Object)));
var deviceMessageHandler = new DeviceMessageHandler(identity, edgeHub, connMgr.Object);
deviceMessageHandler.BindDeviceProxy(underlyingDeviceProxy.Object);

string lockToken = Guid.NewGuid().ToString();
var systemProperties = new Dictionary<string, string>
{
[SystemProperties.LockToken] = lockToken
};

var message = Mock.Of<IMessage>(m => m.SystemProperties == systemProperties);

// Act
await deviceMessageHandler.SendC2DMessageAsync(message);

// Assert
underlyingDeviceProxy.Verify(d => d.SendC2DMessageAsync(It.IsAny<IMessage>()), Times.Once);

// Act
await deviceMessageHandler.ProcessMessageFeedbackAsync(lockToken, FeedbackStatus.Complete);

// Assert
cloudProxy.Verify(c => c.SendFeedbackMessageAsync(It.IsAny<string>(), It.IsAny<FeedbackStatus>()), Times.Once);
}

[Fact]
public async Task ProcessC2DMessageWithNoLockTokenTest()
{
// Arrange
var identity = Mock.Of<IModuleIdentity>(m => m.DeviceId == "device1" && m.Id == "device1");
var cloudProxy = new Mock<ICloudProxy>();
cloudProxy.Setup(c => c.SendFeedbackMessageAsync(It.IsAny<string>(), It.IsAny<FeedbackStatus>()))
.Returns(Task.CompletedTask);
var edgeHub = Mock.Of<IEdgeHub>();
var underlyingDeviceProxy = new Mock<IDeviceProxy>();
underlyingDeviceProxy.Setup(d => d.SendC2DMessageAsync(It.IsAny<IMessage>())).Returns(Task.CompletedTask);

var connMgr = new Mock<IConnectionManager>();
connMgr.Setup(c => c.AddDeviceConnection(It.IsAny<IIdentity>(), It.IsAny<IDeviceProxy>()));
connMgr.Setup(c => c.GetCloudConnection(It.IsAny<string>())).Returns(Task.FromResult(Option.Some(cloudProxy.Object)));
var deviceMessageHandler = new DeviceMessageHandler(identity, edgeHub, connMgr.Object);
deviceMessageHandler.BindDeviceProxy(underlyingDeviceProxy.Object);

string lockToken = Guid.NewGuid().ToString();
var systemProperties = new Dictionary<string, string>
{
[SystemProperties.MessageId] = lockToken
};

var message = Mock.Of<IMessage>(m => m.SystemProperties == systemProperties);

// Act
await deviceMessageHandler.SendC2DMessageAsync(message);
await Task.Delay(TimeSpan.FromSeconds(1));

// Assert
underlyingDeviceProxy.Verify(d => d.SendC2DMessageAsync(It.IsAny<IMessage>()), Times.Never);

// Act
await deviceMessageHandler.ProcessMessageFeedbackAsync(lockToken, FeedbackStatus.Complete);

// Assert
cloudProxy.Verify(c => c.SendFeedbackMessageAsync(It.IsAny<string>(), It.IsAny<FeedbackStatus>()), Times.Never);
}

[Fact]
public async Task ProcessDuplicateC2DMessageTest()
{
// Arrange
var identity = Mock.Of<IModuleIdentity>(m => m.DeviceId == "device1" && m.Id == "device1");
var cloudProxy = new Mock<ICloudProxy>();
cloudProxy.Setup(c => c.SendFeedbackMessageAsync(It.IsAny<string>(), It.IsAny<FeedbackStatus>()))
.Returns(Task.CompletedTask);
var edgeHub = Mock.Of<IEdgeHub>();
var underlyingDeviceProxy = new Mock<IDeviceProxy>();
underlyingDeviceProxy.Setup(d => d.SendC2DMessageAsync(It.IsAny<IMessage>())).Returns(Task.CompletedTask);

var connMgr = new Mock<IConnectionManager>();
connMgr.Setup(c => c.AddDeviceConnection(It.IsAny<IIdentity>(), It.IsAny<IDeviceProxy>()));
connMgr.Setup(c => c.GetCloudConnection(It.IsAny<string>())).Returns(Task.FromResult(Option.Some(cloudProxy.Object)));
var deviceMessageHandler = new DeviceMessageHandler(identity, edgeHub, connMgr.Object);
deviceMessageHandler.BindDeviceProxy(underlyingDeviceProxy.Object);

string lockToken = Guid.NewGuid().ToString();
var systemProperties1 = new Dictionary<string, string>
{
[SystemProperties.LockToken] = lockToken
};

var message1 = Mock.Of<IMessage>(m => m.SystemProperties == systemProperties1);

var systemProperties2 = new Dictionary<string, string>
{
[SystemProperties.LockToken] = lockToken
};

var message2 = Mock.Of<IMessage>(m => m.SystemProperties == systemProperties2);

// Act
await deviceMessageHandler.SendC2DMessageAsync(message1);

// Assert
underlyingDeviceProxy.Verify(d => d.SendC2DMessageAsync(It.IsAny<IMessage>()), Times.Once);

// Act
await deviceMessageHandler.SendC2DMessageAsync(message2);

// Assert
underlyingDeviceProxy.Verify(d => d.SendC2DMessageAsync(It.IsAny<IMessage>()), Times.Once);

// Act
await deviceMessageHandler.ProcessMessageFeedbackAsync(lockToken, FeedbackStatus.Complete);

// Assert
cloudProxy.Verify(c => c.SendFeedbackMessageAsync(It.IsAny<string>(), It.IsAny<FeedbackStatus>()), Times.Once);
}

DeviceMessageHandler GetDeviceMessageHandler()
{
var identity = Mock.Of<IModuleIdentity>(m => m.DeviceId == "device1" && m.ModuleId == "module1" && m.Id == "device1/module1");
Expand Down
Loading

0 comments on commit f9d1e07

Please sign in to comment.