Skip to content

Commit

Permalink
Merge branch 'varunpuranik/fixMqttAddressParsing' of https://github.c…
Browse files Browse the repository at this point in the history
…om/Azure/iotedge into varunpuranik/fixMqttAddressParsing
  • Loading branch information
varunpuranik committed Jul 27, 2018
2 parents 71a136b + da0cd78 commit da50ab6
Show file tree
Hide file tree
Showing 21 changed files with 398 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public async Task<Plan> PlanAsync(ModuleSet desired, ModuleSet current, IRuntime
Events.LogDesired(desired);
Events.LogCurrent(current);
// extract list of modules that need attention
var (added, updateDeployed, updateStateChanged, removed, runningGreat) = this.ProcessDiff(desired, current);
(IList<IModule> added, IList<IModule> updateDeployed, IList<IRuntimeModule> updateStateChanged, IList<IRuntimeModule> removed, IList<IRuntimeModule> runningGreat) = this.ProcessDiff(desired, current);

var updateRuntimeCommands = new List<ICommand>();
IModule edgeAgentModule = updateDeployed.FirstOrDefault(m => m.Name.Equals(Constants.EdgeAgentModuleName, StringComparison.OrdinalIgnoreCase));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async Task<bool> ExecuteAsync(long deploymentId, Plan plan, CancellationT
bool skippedModules = false;
foreach (ICommand command in plan.Commands)
{
var (shouldRun, runCount, coolOffPeriod, elapsedTime) = this.ShouldRunCommand(command);
(bool shouldRun, int runCount, TimeSpan coolOffPeriod, TimeSpan elapsedTime) = this.ShouldRunCommand(command);
try
{
if (token.IsCancellationRequested)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class HealthRestartPlannerTest
[Unit]
public void TestCreateValidation()
{
var (factory, store, restartManager, _) = CreatePlanner();
(TestCommandFactory factory, Mock<IEntityStore<string, ModuleState>> store, IRestartPolicyManager restartManager, _) = CreatePlanner();

Assert.Throws<ArgumentNullException>(() => new HealthRestartPlanner(null, store.Object, IntensiveCareTime, restartManager));
Assert.Throws<ArgumentNullException>(() => new HealthRestartPlanner(factory, null, IntensiveCareTime, restartManager));
Expand All @@ -53,7 +53,7 @@ public void TestCreateValidation()
public async void TestMinimalTest()
{
// Arrange
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();
var token = new CancellationToken();
var expectedExecutionList = new List<TestRecordType>();

Expand All @@ -70,7 +70,7 @@ public async void TestMinimalTest()
[Unit]
public async void TestAddRunningModule()
{
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();

IModule addModule = new TestModule("mod1", "version1", "test", ModuleStatus.Running, Config1, RestartPolicy.OnUnhealthy, DefaultConfigurationInfo, EnvVars);
IImmutableDictionary<string, IModuleIdentity> moduleIdentities = GetModuleIdentities(new List<IModule>() { addModule });
Expand All @@ -91,7 +91,7 @@ public async void TestAddRunningModule()
[Unit]
public async void TestAddStoppedModule()
{
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();

IModule addModule = new TestModule("mod1", "version1", "test", ModuleStatus.Stopped, Config1, RestartPolicy.OnUnhealthy, DefaultConfigurationInfo, EnvVars);
IImmutableDictionary<string, IModuleIdentity> moduleIdentities = GetModuleIdentities(new List<IModule>() { addModule });
Expand All @@ -111,7 +111,7 @@ public async void TestAddStoppedModule()
[Unit]
public async void TestUpdateModule()
{
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();

IRuntimeModule currentModule = new TestRuntimeModule(
"mod1", "version1", RestartPolicy.OnUnhealthy, "test", ModuleStatus.Running, Config1,
Expand All @@ -138,7 +138,7 @@ public async void TestUpdateModule()
[Unit]
public async void TestRemoveModule()
{
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();

IRuntimeModule removeModule = new TestRuntimeModule(
"mod1", "version1", RestartPolicy.OnUnhealthy, "test", ModuleStatus.Running, Config1,
Expand Down Expand Up @@ -235,7 +235,7 @@ public async void TestRemoveModule()
[Unit]
public async Task TestRemoveKitchenSink()
{
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();

IRuntimeModule[] removedModules = GetRemoveTestData();

Expand Down Expand Up @@ -396,7 +396,7 @@ public async Task TestUpdateDeployKitchenSink()
// on whether it undergoes a re-deploy or not.

// Arrange
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();
(IRuntimeModule RunningModule, IModule UpdatedModule)[] data = GetUpdateDeployTestData();
IImmutableDictionary<string, IModuleIdentity> moduleIdentities = GetModuleIdentities(data.Select(d => d.UpdatedModule).ToList());
// build "current" and "desired" module sets
Expand Down Expand Up @@ -804,7 +804,7 @@ public async Task TestUpdateDeployKitchenSink()
public async Task TestUpdateStateChangedKitchenSink()
{
// Arrange
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();

// prepare list of modules whose configurations have been updated
(IRuntimeModule RunningModule, IModule UpdatedModule)[] updateDeployModules = GetUpdateDeployTestData();
Expand Down Expand Up @@ -866,7 +866,7 @@ public async Task TestUpdateStateChangedKitchenSink()
public async Task TestResetStatsForHealthyModules()
{
// Arrange
var (factory, store, _, planner) = CreatePlanner();
(TestCommandFactory factory, Mock<IEntityStore<string, ModuleState>> store, _, HealthRestartPlanner planner) = CreatePlanner();

// derive list of "running great" modules from GetUpdateStateChangeTestData()
IList<IRuntimeModule> runningGreatModules = GetUpdateStateChangeTestData()
Expand Down Expand Up @@ -897,7 +897,7 @@ public async Task TestResetStatsForHealthyModules()
public async Task CreateShutdownPlanTest()
{
// Arrange
var (factory, _, _, planner) = CreatePlanner();
(TestCommandFactory factory, _, _, HealthRestartPlanner planner) = CreatePlanner();

IModule module1 = new TestModule("mod1", "version1", "test", ModuleStatus.Running, Config1, RestartPolicy.OnUnhealthy, DefaultConfigurationInfo, EnvVars);
IModule edgeAgentModule = new TestModule(Constants.EdgeAgentModuleName, "version1", "test", ModuleStatus.Running, Config1, RestartPolicy.OnUnhealthy, DefaultConfigurationInfo, EnvVars);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public async Task<ICloudProxy> CreateOrUpdateAsync(IClientCredentials newCredent
// complete the tokenGetter
if (newCredentials is ITokenCredentials tokenAuth && this.tokenGetter.HasValue)
{
if (IsTokenExpired(tokenAuth.Identity.IotHubHostName, tokenAuth.Token))
{
throw new InvalidOperationException($"Token for client {tokenAuth.Identity.Id} is expired");
}
this.tokenGetter.ForEach(tg =>
{
tg.SetResult(tokenAuth.Token);
Expand Down Expand Up @@ -293,7 +298,20 @@ internal static DateTime GetTokenExpiry(string hostName, string token)
}
}

internal static TimeSpan GetTokenExpiryTimeRemaining(string hostname, string token) => GetTokenExpiry(hostname, token) - DateTime.UtcNow;
internal static bool IsTokenExpired(string hostName, string token)
{
try
{
SharedAccessSignature sharedAccessSignature = SharedAccessSignature.Parse(hostName, token);
return sharedAccessSignature.IsExpired();
}
catch (UnauthorizedAccessException)
{
return true;
}
}

internal static TimeSpan GetTokenExpiryTimeRemaining(string hostName, string token) => GetTokenExpiry(hostName, token) - DateTime.UtcNow;

// Checks if the token expires too soon
static bool IsTokenUsable(string hostname, string token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,24 @@ public ConnectivityAwareClient(IClient client, IDeviceConnectivityManager device
this.underlyingClient = Preconditions.CheckNotNull(client, nameof(client));
this.deviceConnectivityManager = Preconditions.CheckNotNull(deviceConnectivityManager, nameof(deviceConnectivityManager));

this.deviceConnectivityManager.DeviceConnected += (_, __) =>
this.connectionStatusChangedHandler?.Invoke(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
this.deviceConnectivityManager.DeviceDisconnected += (_, __) =>
this.connectionStatusChangedHandler?.Invoke(ConnectionStatus.Disconnected, ConnectionStatusChangeReason.No_Network);
this.deviceConnectivityManager.DeviceConnected += this.HandleDeviceConnectedEvent;
this.deviceConnectivityManager.DeviceDisconnected += this.HandleDeviceDisconnectedEvent;
}

void HandleDeviceConnectedEvent(object sender, EventArgs eventArgs) =>
this.connectionStatusChangedHandler?.Invoke(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);

void HandleDeviceDisconnectedEvent(object sender, EventArgs eventArgs) =>
this.connectionStatusChangedHandler?.Invoke(ConnectionStatus.Disconnected, ConnectionStatusChangeReason.No_Network);

public bool IsActive => this.underlyingClient.IsActive;

public Task CloseAsync() => this.underlyingClient.CloseAsync();

public async Task CloseAsync()
{
await this.underlyingClient.CloseAsync();
this.deviceConnectivityManager.DeviceConnected -= this.HandleDeviceConnectedEvent;
this.deviceConnectivityManager.DeviceDisconnected -= this.HandleDeviceDisconnectedEvent;
}

// This method could throw and is not a reliable candidate to check connectivity status
public Task RejectAsync(string messageId) => this.underlyingClient.RejectAsync(messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public DeviceConnectivityManager(TimeSpan minConnectivityCheckFrequency, TimeSpa
.OnEntry(this.OnDisconnected)
.OnExit(this.OnDisconnectedExit);

this.state = State.Trying;
this.state = State.Disconnected;

Events.Created(minConnectivityCheckFrequency, disconnectedCheckFrequency);
}
Expand Down Expand Up @@ -118,8 +118,7 @@ public void CallTimedOut()

void OnConnected()
{
Events.OnConnected();
this.DeviceConnected?.Invoke(this, EventArgs.Empty);
Events.OnConnected();
this.connectedTimer.Start();
}

Expand All @@ -145,6 +144,7 @@ void OnDisconnected()
void OnDisconnectedExit()
{
Events.OnDisconnectedExit();
this.DeviceConnected?.Invoke(this, EventArgs.Empty);
this.disconnectedTimer.Stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,34 @@ async void CloudConnectionStatusChangedHandler(string deviceId,
switch (connectionStatus)
{
case CloudConnectionStatus.TokenNearExpiry:
Option<IClientCredentials> token = await device.DeviceConnection.Map(async d => await d.DeviceProxy.GetUpdatedIdentity())
.GetOrElse(Task.FromResult(Option.None<IClientCredentials>()));
if (token.HasValue)

Option<IDeviceProxy> deviceProxy = device.DeviceConnection.Map(d => d.DeviceProxy).Filter(d => d.IsActive);
if (deviceProxy.HasValue)
{
await token.ForEachAsync(async t =>
Option<IClientCredentials> token = await deviceProxy.Map(d => d.GetUpdatedIdentity())
.GetOrElse(Task.FromResult(Option.None<IClientCredentials>()));
if (token.HasValue)
{
await device.CreateOrUpdateCloudConnection((c) => this.CreateOrUpdateCloudConnection(c, t));
});
await token.ForEachAsync(async t =>
{
Try<ICloudConnection> cloudConnectionTry = await device.CreateOrUpdateCloudConnection(c => this.CreateOrUpdateCloudConnection(c, t));
if (!cloudConnectionTry.Success)
{
await this.RemoveDeviceConnection(device, true);
this.CloudConnectionLost?.Invoke(this, device.Identity);
}
});
}
else
{
await this.RemoveDeviceConnection(device, false);
}
}
else
{
await this.RemoveDeviceConnection(device, false);
await this.RemoveDeviceConnection(device, true);
}

break;

case CloudConnectionStatus.DisconnectedTokenExpired:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class DeviceMessageHandler : IDeviceListener, IDeviceProxy
IDeviceProxy underlyingProxy;

// IoTHub error codes
const int GatewayTimeoutErrorCode = 504101;
const int GenericBadRequest = 400000;

public DeviceMessageHandler(IIdentity identity, IEdgeHub edgeHub, IConnectionManager connectionManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public IProtocolGatewayMessage FromMessage(IMessage message)
{
if (SystemProperties.OutgoingSystemPropertiesMap.TryGetValue(systemProperty.Key, out string onWirePropertyName))
{
properties.Add(onWirePropertyName, systemProperty.Value);
properties[onWirePropertyName] = systemProperty.Value;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void GetTokenExpiryBufferSecondsTest()
string token = TokenHelper.CreateSasToken("azure.devices.net");
TimeSpan timeRemaining = CloudConnection.GetTokenExpiryTimeRemaining("foo.azuredevices.net", token);
Assert.True(timeRemaining > TimeSpan.Zero);
}
}

[Unit]
[Fact]
Expand Down Expand Up @@ -386,5 +386,20 @@ public void CheckTokenExpiredTest()
// Assert
Assert.Equal(DateTime.MinValue, actualExpiryTime);
}

[Unit]
[Fact]
public void GetIsTokenExpiredTest()
{
// Arrange
DateTime tokenExpiry = DateTime.UtcNow.AddYears(1);
string token = TokenHelper.CreateSasToken("azure.devices.net", tokenExpiry);

// Act
TimeSpan expiryTimeRemaining = CloudConnection.GetTokenExpiryTimeRemaining("azure.devices.net", token);

// Assert
Assert.True(expiryTimeRemaining - (tokenExpiry - DateTime.UtcNow) < TimeSpan.FromSeconds(1));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy.Test
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Util.Test.Common;
using Moq;
using Xunit;

public class ConnectivityAwareClientTest
{
[Unit]
[Fact]
public async Task DisableHandlingEventsOnCloseTest()
{
// Arrange
int connectionStatusChangedHandlerCount = 0;
void ConnectionStatusChangedHandler(ConnectionStatus status, ConnectionStatusChangeReason reason)
{
Interlocked.Increment(ref connectionStatusChangedHandlerCount);
}

var deviceConnectivityManager = new DeviceConnectivityManager();
var client = Mock.Of<IClient>();
var connectivityAwareClient = new ConnectivityAwareClient(client, deviceConnectivityManager);
connectivityAwareClient.SetConnectionStatusChangedHandler(ConnectionStatusChangedHandler);

// Act
deviceConnectivityManager.InvokeDeviceConnected();
deviceConnectivityManager.InvokeDeviceDisconnected();

// Assert
Assert.Equal(2, connectionStatusChangedHandlerCount);

// Act
await connectivityAwareClient.CloseAsync();
deviceConnectivityManager.InvokeDeviceConnected();
deviceConnectivityManager.InvokeDeviceDisconnected();

// Assert
Assert.Equal(2, connectionStatusChangedHandlerCount);
}

class DeviceConnectivityManager : IDeviceConnectivityManager
{
public void CallSucceeded()
{
throw new NotImplementedException();
}

public void CallTimedOut()
{
throw new NotImplementedException();
}

public void InvokeDeviceConnected() => this.DeviceConnected?.Invoke(null, null);

public void InvokeDeviceDisconnected() => this.DeviceDisconnected?.Invoke(null, null);

public event EventHandler DeviceConnected;

public event EventHandler DeviceDisconnected;
}
}
}
Loading

0 comments on commit da50ab6

Please sign in to comment.