diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/ConnectionManager.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/ConnectionManager.cs index 17df39256b5..1b2ef172100 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/ConnectionManager.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/ConnectionManager.cs @@ -68,6 +68,16 @@ public async Task AddDeviceConnection(IIdentity identity, IDeviceProxy devicePro Preconditions.CheckNotNull(deviceProxy, nameof(deviceProxy)); ConnectedDevice device = this.GetOrCreateConnectedDevice(identity); Option currentDeviceConnection = device.AddDeviceConnection(deviceProxy); + currentDeviceConnection.ForEach(async c => + { + // If we add a device connection that already has subscriptions, we will remake the cloud connection. + // Otherwise the cloud connection won't get established until some other operation does it (i.e. telemetry) + if (c.Subscriptions.Count > 0) + { + Events.GettingCloudConnectionForDeviceSubscriptions(); + await this.TryGetCloudConnection(identity.Id); + } + }); Events.NewDeviceConnection(identity); await currentDeviceConnection .Filter(dc => dc.IsActive) @@ -480,7 +490,8 @@ enum EventIds InvokingCloudConnectionEstablishedEvent, HandlingConnectionStatusChangedHandler, CloudConnectionLostClosingClient, - CloudConnectionLostClosingAllClients + CloudConnectionLostClosingAllClients, + GettingCloudConnectionForDeviceSubscriptions } public static void NewCloudConnection(IIdentity identity, Try cloudConnection) @@ -551,6 +562,11 @@ public static void CloudConnectionLostClosingAllClients() { Log.LogDebug((int)EventIds.CloudConnectionLostClosingAllClients, Invariant($"Cloud connection lost, closing all clients.")); } + + public static void GettingCloudConnectionForDeviceSubscriptions() + { + Log.LogDebug((int)EventIds.GettingCloudConnectionForDeviceSubscriptions, $"Device has subscriptions. Trying to get cloud connection."); + } } static class MetricsV0 diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Mqtt/SessionStatePersistenceProvider.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Mqtt/SessionStatePersistenceProvider.cs index 401e38de998..21813981f7d 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Mqtt/SessionStatePersistenceProvider.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Mqtt/SessionStatePersistenceProvider.cs @@ -64,6 +64,7 @@ protected async Task ProcessSessionSubscriptions(string id, SessionState session return (deviceSubscription, addSubscription); }); + Events.ProcessingSessionSubscriptions(id, subscriptions); await this.edgeHub.ProcessSubscriptions(id, subscriptions); } @@ -114,7 +115,8 @@ static class Events enum EventIds { UnknownSubscription = IdStart, - ErrorHandlingSubscription + ErrorHandlingSubscription, + ProcessingSessionSubscriptions } public static void UnknownTopicSubscription(string topicName, string id) @@ -126,6 +128,12 @@ public static void ErrorProcessingSubscriptions(string id, Exception exception) { Log.LogWarning((int)EventIds.ErrorHandlingSubscription, exception, Invariant($"Error processing subscriptions for client {id}.")); } + + internal static void ProcessingSessionSubscriptions(string id, IEnumerable<(DeviceSubscription, bool)> subscriptions) + { + string subscriptionString = string.Join(", ", subscriptions.Select(s => $"{s.Item1}")); + Log.LogDebug((int)EventIds.ProcessingSessionSubscriptions, $"Processing session subscriptions {subscriptionString} for client {id}: "); + } } } } diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/ConnectionManagerTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/ConnectionManagerTest.cs index f251920da07..36767cb8d51 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/ConnectionManagerTest.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/ConnectionManagerTest.cs @@ -220,7 +220,7 @@ public async Task AllClientAreClosedWhenNetworkDisconnectTest() /// 0. A cloud connection is established. /// 1. Device connects - a connection is added in the connection manager /// 2. Connection should have both cloud and device connections - /// 3. Device disconnects - the device connection is removed. Cloud connection stays. + /// 3. Device disconnects - the device connection is removed. Cloud connection is removed. /// 4. Connection manager should have a cloud connection, but no device connection. /// [Fact] @@ -673,8 +673,36 @@ public async Task KeepSubscriptionsOnDeviceRemoveTest() { // Arrange string deviceId = "d1"; - var cloudConnectionProvider = Mock.Of(); - var credentialsCache = Mock.Of(); + string iotHub = "foo.azure-devices.net"; + string token = TokenHelper.CreateSasToken(iotHub); + var module1Credentials = new TokenCredentials(new DeviceIdentity(iotHub, deviceId), token, DummyProductInfo, true); + IClient client1 = GetDeviceClient(); + IClient client2 = GetDeviceClient(); + var messageConverterProvider = Mock.Of(); + var deviceClientProvider = new Mock(); + deviceClientProvider.SetupSequence(d => d.Create(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(client1) + .Returns(client2); + var productInfoStore = Mock.Of(); + ICredentialsCache credentialsCache = new CredentialsCache(new NullCredentialsCache()); + await credentialsCache.Add(module1Credentials); + var edgeHubIdentity = Mock.Of(i => i.Id == "edgeDevice/$edgeHub"); + var cloudConnectionProvider = new CloudConnectionProvider( + messageConverterProvider, + 1, + deviceClientProvider.Object, + Option.None(), + Mock.Of(), + Mock.Of(), + credentialsCache, + edgeHubIdentity, + TimeSpan.FromMinutes(60), + true, + TimeSpan.FromSeconds(20), + false, + Option.None(), + productInfoStore); + cloudConnectionProvider.BindEdgeHub(Mock.Of()); var deviceConnectivityManager = Mock.Of(); var connectionManager = new ConnectionManager(cloudConnectionProvider, credentialsCache, GetIdentityProvider(), deviceConnectivityManager); var identity = Mock.Of(i => i.Id == deviceId); @@ -733,6 +761,8 @@ public async Task KeepSubscriptionsOnDeviceRemoveTest() Assert.Equal(2, subscriptions.Count); Assert.True(subscriptions[DeviceSubscription.Methods]); Assert.True(subscriptions[DeviceSubscription.C2D]); + Option cloudProxy = await connectionManager.GetCloudConnection(deviceId); + Assert.True(cloudProxy.HasValue); // Act connectionManager.AddSubscription(deviceId, DeviceSubscription.DesiredPropertyUpdates);