Skip to content

Commit

Permalink
Adding parallel processing to CloudConnectivityEstablished in Subscri…
Browse files Browse the repository at this point in the history
…ptionProcessor (#2686)

Currently, CloudConnectivityEstablished processes each subscription synchronously. It should be asynchronous.

This also fixes an issue where one of the subscriptions gets stuck forever and blocks the other subscriptions from being processed. Now, all the subscriptions will get processed even if one gets stuck or times out at some point.
  • Loading branch information
dylanbronson authored Mar 12, 2020
1 parent c1ed8c3 commit eac1a18
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,23 @@ async Task ProcessSubscription(string id, Option<ICloudProxy> cloudProxy, Device
async void CloudConnectivityEstablished(object sender, EventArgs eventArgs)
{
Events.DeviceConnectedProcessingSubscriptions();
try
async Task ProcessSubscriptionByIdentity(IIdentity identity)
{
IEnumerable<IIdentity> connectedClients = this.ConnectionManager.GetConnectedClients().ToList();
foreach (IIdentity identity in connectedClients)
try
{
try
{
Events.ProcessingSubscriptionsOnDeviceConnected(identity);
await this.ProcessExistingSubscriptions(identity.Id);
}
catch (Exception e)
{
Events.ErrorProcessingSubscriptions(e, identity);
}
Events.ProcessingSubscriptionsOnDeviceConnected(identity);
await this.ProcessExistingSubscriptions(identity.Id);
}
catch (Exception e)
{
Events.ErrorProcessingSubscriptions(e, identity);
}
}

try
{
IEnumerable<Task> tasks = this.ConnectionManager.GetConnectedClients().Select(id => ProcessSubscriptionByIdentity(id));
await Task.WhenAll(tasks);
}
catch (Exception e)
{
Expand Down Expand Up @@ -260,7 +262,7 @@ public static void ErrorProcessingSubscription(string id, DeviceSubscription sub

public static void ProcessingSubscriptionsOnDeviceConnected(IIdentity identity)
{
Log.LogInformation((int)EventIds.ProcessingSubscriptions, Invariant($"Processing subscriptions for client {identity.Id}."));
Log.LogInformation((int)EventIds.ProcessingSubscriptions, Invariant($"Processing subscriptions for client {identity.Id} on device connected."));
}

public static void ProcessingSubscription(string id, DeviceSubscription deviceSubscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,127 @@ public void ProcessSubscriptionsOnDeviceConnected()
Mock.Get(connectionManager).VerifyAll();
}

[Fact]
public void ProcessSubscriptionsOnDeviceConnectedWithGetCloudConnectionTimeout()
{
// Arrange
string d1 = "d1";
var deviceIdentity = Mock.Of<IIdentity>(d => d.Id == d1);
string m1 = "d2/m1";
var moduleIdentity = Mock.Of<IIdentity>(m => m.Id == m1);

var connectedClients = new List<IIdentity>
{
deviceIdentity,
moduleIdentity
};

IReadOnlyDictionary<DeviceSubscription, bool> device1Subscriptions = new Dictionary<DeviceSubscription, bool>()
{
[DeviceSubscription.Methods] = true,
[DeviceSubscription.DesiredPropertyUpdates] = true
};

IReadOnlyDictionary<DeviceSubscription, bool> module1Subscriptions = new Dictionary<DeviceSubscription, bool>()
{
[DeviceSubscription.Methods] = true,
[DeviceSubscription.ModuleMessages] = true
};

var device1CloudProxy = Mock.Of<ICloudProxy>(
dc => dc.SetupDesiredPropertyUpdatesAsync() == Task.CompletedTask
&& dc.SetupCallMethodAsync() == Task.CompletedTask);
Mock.Get(device1CloudProxy).SetupGet(d => d.IsActive).Returns(true);
var module1CloudProxy = Mock.Of<ICloudProxy>(mc => mc.SetupCallMethodAsync() == Task.CompletedTask && mc.IsActive);

var invokeMethodHandler = Mock.Of<IInvokeMethodHandler>(
m => m.ProcessInvokeMethodSubscription(m1) == Task.CompletedTask);

var connectionManager = Mock.Of<IConnectionManager>(
c =>
c.GetConnectedClients() == connectedClients
&& c.GetSubscriptions(m1) == Option.Some(module1Subscriptions)
&& c.GetCloudConnection(m1) == Task.FromResult(Option.Some(module1CloudProxy)));

Mock.Get(connectionManager).Setup(c => c.GetCloudConnection(d1)).Throws(new TimeoutException("Test GetCloudConnection Timeout"));

var deviceConnectivityManager = Mock.Of<IDeviceConnectivityManager>();

var subscriptionProcessor = new SubscriptionProcessor(connectionManager, invokeMethodHandler, deviceConnectivityManager);

// Act
Mock.Get(deviceConnectivityManager).Raise(d => d.DeviceConnected += null, new EventArgs());

// Assert
Mock.Get(device1CloudProxy).Verify(d => d.SetupDesiredPropertyUpdatesAsync(), Times.Never);
Mock.Get(device1CloudProxy).Verify(d => d.SetupCallMethodAsync(), Times.Never);
Mock.Get(module1CloudProxy).Verify(m => m.SetupCallMethodAsync(), Times.Once);
Mock.Get(invokeMethodHandler).VerifyAll();
Mock.Get(connectionManager).VerifyAll();
}

[Fact]
public void ProcessSubscriptionsOnDeviceConnectedWithProcessInvokeMethodSubscriptionException()
{
// Arrange
string d1 = "d1";
var deviceIdentity = Mock.Of<IIdentity>(d => d.Id == d1);
string m1 = "d2/m1";
var moduleIdentity = Mock.Of<IIdentity>(m => m.Id == m1);

var connectedClients = new List<IIdentity>
{
deviceIdentity,
moduleIdentity
};

IReadOnlyDictionary<DeviceSubscription, bool> device1Subscriptions = new Dictionary<DeviceSubscription, bool>()
{
[DeviceSubscription.Methods] = true,
[DeviceSubscription.DesiredPropertyUpdates] = true
};

IReadOnlyDictionary<DeviceSubscription, bool> module1Subscriptions = new Dictionary<DeviceSubscription, bool>()
{
[DeviceSubscription.Methods] = true,
[DeviceSubscription.ModuleMessages] = true
};

var device1CloudProxy = Mock.Of<ICloudProxy>(
dc => dc.SetupDesiredPropertyUpdatesAsync() == Task.CompletedTask
&& dc.SetupCallMethodAsync() == Task.CompletedTask);
Mock.Get(device1CloudProxy).SetupGet(d => d.IsActive).Returns(true);
var module1CloudProxy = Mock.Of<ICloudProxy>(mc => mc.SetupCallMethodAsync() == Task.CompletedTask && mc.IsActive);

var invokeMethodHandler = Mock.Of<IInvokeMethodHandler>(
m =>
m.ProcessInvokeMethodSubscription(d1) == Task.CompletedTask);

Mock.Get(invokeMethodHandler).Setup(m => m.ProcessInvokeMethodSubscription(m1)).Throws(new TimeoutException("Test ProcessInvokeMethodSubscription timeout"));

var connectionManager = Mock.Of<IConnectionManager>(
c =>
c.GetConnectedClients() == connectedClients
&& c.GetSubscriptions(d1) == Option.Some(device1Subscriptions)
&& c.GetSubscriptions(m1) == Option.Some(module1Subscriptions)
&& c.GetCloudConnection(d1) == Task.FromResult(Option.Some(device1CloudProxy))
&& c.GetCloudConnection(m1) == Task.FromResult(Option.Some(module1CloudProxy)));

var deviceConnectivityManager = Mock.Of<IDeviceConnectivityManager>();

var subscriptionProcessor = new SubscriptionProcessor(connectionManager, invokeMethodHandler, deviceConnectivityManager);

// Act
Mock.Get(deviceConnectivityManager).Raise(d => d.DeviceConnected += null, new EventArgs());

// Assert
Mock.Get(device1CloudProxy).Verify(d => d.SetupDesiredPropertyUpdatesAsync(), Times.Once);
Mock.Get(device1CloudProxy).Verify(d => d.SetupCallMethodAsync(), Times.Once);
Mock.Get(module1CloudProxy).Verify(m => m.SetupCallMethodAsync(), Times.Exactly(2));
Mock.Get(invokeMethodHandler).VerifyAll();
Mock.Get(connectionManager).VerifyAll();
}

[Fact]
public void ProcessSubscriptionsOnClientCloudConnectionEstablished()
{
Expand Down

0 comments on commit eac1a18

Please sign in to comment.