Skip to content

Commit

Permalink
Metrics Upload to IoTHub (#2096)
Browse files Browse the repository at this point in the history
Make metrics upload match damon's IoTHub changes
  • Loading branch information
lfitchett authored Dec 13, 2019
1 parent 4b62c64 commit eff5c85
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

<ItemGroup>
<ProjectReference Include="..\..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
<ProjectReference Include="..\Microsoft.Azure.Devices.Edge.Agent.IoTHub\Microsoft.Azure.Devices.Edge.Agent.IoTHub.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,44 @@

namespace Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Publisher
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Agent.Core.Requests;
using Microsoft.Azure.Devices.Edge.Agent.IoTHub;
using Microsoft.Azure.Devices.Edge.Util;

public sealed class IoTHubMetricsUpload : IMetricsPublisher
{
readonly ModuleClient moduleClient;
readonly IEdgeAgentConnection edgeAgentConnection;

public IoTHubMetricsUpload(ModuleClient moduleClient)
public IoTHubMetricsUpload(IEdgeAgentConnection edgeAgentConnection)
{
this.moduleClient = Preconditions.CheckNotNull(moduleClient, nameof(moduleClient));
this.edgeAgentConnection = Preconditions.CheckNotNull(edgeAgentConnection, nameof(edgeAgentConnection));
}

public async Task PublishAsync(IEnumerable<Metric> metrics, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(metrics, nameof(metrics));
byte[] data = MetricsSerializer.MetricsToBytes(metrics).ToArray();
byte[] compressedData = Compression.CompressToGzip(data);

// TODO: add check for too big of a message
if (compressedData.Length > 0)
if (data.Length > 0)
{
Message message = new Message(compressedData);
await this.moduleClient.SendEventAsync(message);
Message message = this.BuildMessage(data);
await this.edgeAgentConnection.SendEventAsync(message);
}
}

Message BuildMessage(byte[] data)
{
Message message = new Message(data);
message.ContentType = "application/x-azureiot-edgeruntimediagnostics";

return message;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void Start(ILogger logger)
{
if (ApiVersion.ParseVersion(this.apiVersion).Value >= ApiVersion.Version20191105.Value)
{
this.updateResources = new PeriodicTask(this.Update, TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), logger, "Get system resources");
this.updateResources = new PeriodicTask(this.Update, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1), logger, "Get system resources", false);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,28 @@ public async Task UpdateReportedPropertiesAsync(TwinCollection patch)
}
}

public async Task SendEventAsync(Message message)
{
Events.UpdatingReportedProperties();
try
{
Option<IModuleClient> moduleClient = this.moduleConnection.GetModuleClient();
if (!moduleClient.HasValue)
{
Events.SendEventClientEmpty();
return;
}

await moduleClient.ForEachAsync(d => d.SendEventAsync(message));
Events.SendEvent();
}
catch (Exception e)
{
Events.ErrorSendingEvent(e);
throw;
}
}

internal static void ValidateSchemaVersion(string schemaVersion)
{
if (ExpectedSchemaVersion.CompareMajorVersion(schemaVersion, "desired properties schema") != 0)
Expand Down Expand Up @@ -331,7 +353,10 @@ enum EventIds
UpdatedReportedProperties,
ErrorUpdatingReportedProperties,
GotModuleClient,
GettingModuleClient
GettingModuleClient,
SendEvent,
SendEventClientEmpty,
ErrorSendingEvent,
}

public static void DesiredPropertiesPatchFailed(Exception exception)
Expand Down Expand Up @@ -447,6 +472,21 @@ internal static void RetryingGetTwin(RetryingEventArgs args)
{
Log.LogDebug((int)EventIds.RetryingGetTwin, $"Edge agent is retrying GetTwinAsync. Attempt #{args.CurrentRetryCount}. Last error: {args.LastException?.Message}");
}

internal static void SendEvent()
{
Log.LogDebug((int)EventIds.SendEvent, $"Edge agent is sending a diagnostic message.");
}

public static void SendEventClientEmpty()
{
Log.LogDebug((int)EventIds.SendEventClientEmpty, "Client empty.");
}

public static void ErrorSendingEvent(Exception ex)
{
Log.LogDebug((int)EventIds.ErrorSendingEvent, ex, "Error sending event");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Agent.Core;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Shared;
Expand All @@ -16,5 +17,7 @@ public interface IEdgeAgentConnection : IDisposable
Task<Option<DeploymentConfigInfo>> GetDeploymentConfigInfoAsync();

Task UpdateReportedPropertiesAsync(TwinCollection patch);

Task SendEventAsync(Message message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public interface IModuleClient

Task UpdateReportedPropertiesAsync(TwinCollection reportedProperties);

Task SendEventAsync(Message message);

////Task<DeviceStreamRequest> WaitForDeviceStreamRequestAsync(CancellationToken cancellationToken);

////Task<IClientWebSocket> AcceptDeviceStreamingRequestAndConnect(DeviceStreamRequest deviceStreamRequest, CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@ public async Task UpdateReportedPropertiesAsync(TwinCollection reportedPropertie
}
}

public async Task SendEventAsync(Message message)
{
try
{
this.inactivityTimer.Reset();
await this.inner.SendEventAsync(message);
}
catch (Exception e)
{
await this.HandleException(e);
throw;
}
}

////public async Task<DeviceStreamRequest> WaitForDeviceStreamRequestAsync(CancellationToken cancellationToken)
////{
//// try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public interface ISdkModuleClient

Task UpdateReportedPropertiesAsync(TwinCollection reportedProperties);

Task SendEventAsync(Message message);

////Task<Client.DeviceStreamRequest> WaitForDeviceStreamRequestAsync(CancellationToken cancellationToken);

////Task<IClientWebSocket> AcceptDeviceStreamingRequestAndConnect(Client.DeviceStreamRequest deviceStreamRequest, CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public Task SetDefaultMethodHandlerAsync(MethodCallback callback)
public Task UpdateReportedPropertiesAsync(TwinCollection reportedProperties)
=> this.sdkModuleClient.UpdateReportedPropertiesAsync(reportedProperties);

public Task SendEventAsync(Message message) => this.sdkModuleClient.SendEventAsync(message);

////public Task<DeviceStreamRequest> WaitForDeviceStreamRequestAsync(CancellationToken cancellationToken)
//// => this.sdkModuleClient.WaitForDeviceStreamRequestAsync(cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Service
using System.IO;
using System.Text;
using Autofac;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Agent.Diagnostics;
using Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Publisher;
using Microsoft.Azure.Devices.Edge.Util;
Expand Down Expand Up @@ -34,7 +35,7 @@ protected override void Load(ContainerBuilder builder)
.SingleInstance();

// IMetricsPublisher
builder.Register(c => new MetricsFileWriter())
builder.RegisterType<IoTHubMetricsUpload>()
.As<IMetricsPublisher>()
.SingleInstance();

Expand Down
27 changes: 22 additions & 5 deletions edge-util/src/Microsoft.Azure.Devices.Edge.Util/PeriodicTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class PeriodicTask : IDisposable
readonly string operationName;
readonly Timer checkTimer;
readonly CancellationTokenSource cts = new CancellationTokenSource();
readonly bool verbose;

Task currentTask;

Expand All @@ -24,7 +25,8 @@ public PeriodicTask(
TimeSpan frequency,
TimeSpan startAfter,
ILogger logger,
string operationName)
string operationName,
bool verbose = true)
{
Preconditions.CheckArgument(frequency > TimeSpan.Zero, "Frequency should be > 0");
Preconditions.CheckArgument(startAfter >= TimeSpan.Zero, "startAfter should be >= 0");
Expand All @@ -36,6 +38,7 @@ public PeriodicTask(
this.operationName = Preconditions.CheckNonWhiteSpace(operationName, nameof(operationName));
this.currentTask = this.DoWork();
this.checkTimer = new Timer(this.EnsureWork, null, startAfter, frequency);
this.verbose = verbose;
this.logger.LogInformation($"Started operation {this.operationName}");
}

Expand All @@ -44,8 +47,9 @@ public PeriodicTask(
TimeSpan frequency,
TimeSpan startAfter,
ILogger logger,
string operationName)
: this(_ => Preconditions.CheckNotNull(work, nameof(work))(), frequency, startAfter, logger, operationName)
string operationName,
bool verbose = true)
: this(_ => Preconditions.CheckNotNull(work, nameof(work))(), frequency, startAfter, logger, operationName, verbose)
{
}

Expand Down Expand Up @@ -85,9 +89,22 @@ async Task DoWork()
{
try
{
this.logger.LogInformation($"Starting periodic operation {this.operationName}...");
// void Log(string message) => this.verbose ? this.logger.LogInformation(message) : this.logger.LogDebug(message);
void Log(string message)
{
if (this.verbose)
{
this.logger.LogInformation(message);
}
else
{
this.logger.LogDebug(message);
}
}

Log($"Starting periodic operation {this.operationName}...");
await this.work(cancellationToken);
this.logger.LogInformation($"Successfully completed periodic operation {this.operationName}");
Log($"Successfully completed periodic operation {this.operationName}");
}
catch (Exception e)
{
Expand Down

0 comments on commit eff5c85

Please sign in to comment.