Skip to content

Commit

Permalink
Phlin/edge agent get twin fix (#3075)
Browse files Browse the repository at this point in the history
When Edge agent calling GetTwinAsync method throws exception after retry policy, if the exception is not timeout exception it will trigger to close module client and retry again.
  • Loading branch information
philipktlin authored Jun 15, 2020
1 parent fd51a9e commit 60b125e
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class EdgeAgentConnection : IEdgeAgentConnection
static readonly ITransientErrorDetectionStrategy AllButFatalErrorDetectionStrategy = new DelegateErrorDetectionStrategy(ex => ex.IsFatal() == false);

static readonly RetryStrategy TransientRetryStrategy =
new ExponentialBackoff(5, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(4));
new ExponentialBackoff(3, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(4));

readonly AsyncLock twinLock = new AsyncLock();
readonly ISerde<DeploymentConfig> desiredPropertiesSerDe;
Expand Down Expand Up @@ -184,14 +184,16 @@ await twinOption.ForEachAsync(
});
}

async Task<Option<Twin>> GetTwinFromIoTHub()
async Task<Option<Twin>> GetTwinFromIoTHub(bool retrying = false)
{
IModuleClient moduleClient = null;

try
{
async Task<Twin> GetTwinFunc()
{
Events.GettingModuleClient();
IModuleClient moduleClient = await this.moduleConnection.GetOrCreateModuleClient();
Events.GettingModuleClient(retrying);
moduleClient = await this.moduleConnection.GetOrCreateModuleClient();
Events.GotModuleClient();
return await moduleClient.GetTwinAsync();
}
Expand All @@ -210,6 +212,21 @@ async Task<Twin> GetTwinFunc()
catch (Exception e)
{
Events.ErrorGettingTwin(e);

if (!retrying && moduleClient != null && !(e is TimeoutException))
{
try
{
await moduleClient.CloseAsync();
}
catch (Exception e2)
{
Events.ErrorClosingModuleClientForRetry(e2);
}

return await this.GetTwinFromIoTHub(true);
}

return Option.None<Twin>();
}
}
Expand Down Expand Up @@ -307,7 +324,8 @@ enum EventIds
UpdatedReportedProperties,
ErrorUpdatingReportedProperties,
GotModuleClient,
GettingModuleClient
GettingModuleClient,
ErrorClosingModuleClient,
}

public static void DesiredPropertiesPatchFailed(Exception exception)
Expand Down Expand Up @@ -354,9 +372,9 @@ public static void ErrorUpdatingReportedProperties(Exception ex)
Log.LogDebug((int)EventIds.ErrorUpdatingReportedProperties, ex, "Error updating reported properties in IoT Hub");
}

public static void GettingModuleClient()
public static void GettingModuleClient(bool retrying)
{
Log.LogDebug((int)EventIds.GettingModuleClient, "Getting module client to refresh the twin");
Log.LogDebug((int)EventIds.GettingModuleClient, $"Getting module client to refresh the twin with retrying set to {retrying}");
}

public static void GotModuleClient()
Expand Down Expand Up @@ -423,6 +441,11 @@ internal static void RetryingGetTwin(RetryingEventArgs args)
{
Log.LogDebug((int)EventIds.RetryingGetTwin, $"Edge agent is retrying GetTwinAsync. Attempt #{args.CurrentRetryCount}. Last error: {args.LastException?.Message}");
}

public static void ErrorClosingModuleClientForRetry(Exception e)
{
Log.LogWarning((int)EventIds.ErrorClosingModuleClient, e, "Error closing module client for retry");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ public async Task GetTwinFailureDoesNotUpdateState()
receivedDeploymentConfigInfo = await edgeAgentConnection.GetDeploymentConfigInfoAsync();

// Assert
moduleClient.Verify(m => m.GetTwinAsync(), Times.Exactly(5));
moduleClient.Verify(m => m.GetTwinAsync(), Times.Exactly(7));
Assert.True(receivedDeploymentConfigInfo.HasValue);
Assert.False(receivedDeploymentConfigInfo.OrDefault().Exception.HasValue);
Assert.Equal(deploymentConfig, receivedDeploymentConfigInfo.OrDefault().DeploymentConfig);
Expand Down Expand Up @@ -1301,6 +1301,90 @@ public async Task GetTwinRetryLogicGetsNewClient()
}
}

[Fact]
[Unit]
public async Task GetDeploymentConfigInfoAsync_CreateNewModuleClientWhenGetTwinThrowsMoreThanRetryCount()
{
// Arrange
var moduleClient = new Mock<IModuleClient>();
var serde = new Mock<ISerde<DeploymentConfig>>();
var runtime = new Mock<IRuntimeInfo>();
var edgeAgent = new Mock<IEdgeAgentModule>();
var edgeHub = new Mock<IEdgeHubModule>();
var retryStrategy = new Mock<RetryStrategy>(new object[] { false });

var deploymentConfig = new DeploymentConfig(
"1.0",
runtime.Object,
new SystemModules(edgeAgent.Object, edgeHub.Object),
ImmutableDictionary<string, IModule>.Empty);

var moduleClientProvider = new Mock<IModuleClientProvider>();
moduleClientProvider.Setup(p => p.Create(It.IsAny<ConnectionStatusChangesHandler>()))
.ReturnsAsync(moduleClient.Object);

serde.Setup(s => s.Deserialize(It.IsAny<string>())).Returns(deploymentConfig);
// var retryStrategy = new FixedInterval(1, TimeSpan.FromMilliseconds(1));
retryStrategy.Setup(rs => rs.GetShouldRetry())
.Returns(
(int retryCount, Exception lastException, out TimeSpan delay) =>
{
if (retryCount >= 1)
{
delay = TimeSpan.Zero;
return false;
}
delay = TimeSpan.Zero;
return true;
});

var twin = new Twin
{
Properties = new TwinProperties
{
Desired = new TwinCollection(
JObject.FromObject(
new Dictionary<string, object>
{
{ "$version", 10 },

// This is here to prevent the "empty" twin error from being thrown.
{ "MoreStuff", "MoreStuffHereToo" }
}).ToString()),
Reported = new TwinCollection()
}
};

moduleClient.SetupSequence(d => d.GetTwinAsync())
.ThrowsAsync(new InvalidOperationException())
.ThrowsAsync(new InvalidOperationException())
.ReturnsAsync(twin);
moduleClient.Setup(d => d.SetDesiredPropertyUpdateCallbackAsync(It.IsAny<DesiredPropertyUpdateCallback>()))
.Returns(Task.CompletedTask);
moduleClient.Setup(d => d.SetMethodHandlerAsync(It.IsAny<string>(), It.IsAny<MethodCallback>()))
.Returns(Task.CompletedTask);

IEnumerable<IRequestHandler> requestHandlers = new List<IRequestHandler> { new PingRequestHandler() };

// Act
IEdgeAgentConnection connection = new EdgeAgentConnection(moduleClientProvider.Object, serde.Object, new RequestManager(requestHandlers, DefaultRequestTimeout), true, TimeSpan.FromHours(1), retryStrategy.Object);

// Assert
// The connection hasn't been created yet. So wait for it.
await Task.Delay(TimeSpan.FromSeconds(3));

// Act
Option<DeploymentConfigInfo> deploymentConfigInfo = await connection.GetDeploymentConfigInfoAsync();

// Assert
Assert.True(deploymentConfigInfo.HasValue);
moduleClient.Verify(m => m.GetTwinAsync(), Times.Exactly(3));
moduleClient.Verify(m => m.CloseAsync(), Times.Once);
Assert.Equal(10, deploymentConfigInfo.OrDefault().Version);
Assert.Equal(deploymentConfigInfo.OrDefault().DeploymentConfig, deploymentConfig);
}

[Theory]
[Unit]
[InlineData("1.0", null)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup Condition="'$(DotNet_Runtime)' != 'netcoreapp3.0'">
<TargetFramework>netcoreapp2.1</TargetFramework>
Expand Down

0 comments on commit 60b125e

Please sign in to comment.