Skip to content

Commit

Permalink
Metrics Collector: Fix adoption stats (#5190)
Browse files Browse the repository at this point in the history
We removed the test specific logic from the metrics collector. In doing so, this removed a get twin call that was relied on for gauging usage stats.

The solution is to recreate the `ModuleClient` (after product info and potentially all other fields are set) even in the Log Analytics code path. This change needs some synchronization as to not interfere with metrics potentially being published from the `IotMessage` metrics upload path (which also uses `ModuleClient`). 

To achieve this synchronization, I chose to create a wrapper around the `ModuleClient` which serves as the shared reference across multiple `Tasks`. Multiple of these `Tasks` might use the `ModuleClient`. One `Task` could be recreating it while the other could be attempting to publish messages. In order to achieve synchronization I chose to use a `SemaphoreSlim` because I needed some async lock.

I ran into some issues when attempting to recreate the `ModuleClient`. 
1. If messages tried to be published too quickly, the initial message send would hang.
2. When calling `CloseAsync()` the SDK threw an exception. (SDK bug imo)

I managed to resolve these issues by not calling `CloseAsync()` and waiting 1 minute to publish messages. I confirmed in `EdgeHub` logs that multiple connections did not exist.
  • Loading branch information
and-rewsmith authored Jul 12, 2021
1 parent bc5ebf1 commit 057da6a
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private static string Sign(string requestdate, string contenthash, string key)
return Convert.ToBase64String(hKey.ComputeHash(Encoding.UTF8.GetBytes(rawsignature)));
}

public static void RegisterWithOms(X509Certificate2 cert, string AgentGuid, string logAnalyticsWorkspaceId, string logAnalyticsWorkspaceKey, string logAnalyticsWorkspaceDomain)
public static void RegisterWithOms(X509Certificate2 cert, string AgentGuid, string logAnalyticsWorkspaceId, string logAnalyticsWorkspaceKey, string logAnalyticsWorkspaceDomainPrefixOms)
{

string rawCert = Convert.ToBase64String(cert.GetRawCertData()); //base64 binary
Expand Down Expand Up @@ -168,7 +168,7 @@ public static void RegisterWithOms(X509Certificate2 cert, string AgentGuid, stri

var client = new HttpClient(clientHandler);

string url = "https://" + logAnalyticsWorkspaceId + ".oms." + logAnalyticsWorkspaceDomain + "/AgentService.svc/AgentTopologyRequest";
string url = "https://" + logAnalyticsWorkspaceId + logAnalyticsWorkspaceDomainPrefixOms + Settings.Current.AzureDomain + "/AgentService.svc/AgentTopologyRequest";

Console.WriteLine("OMS endpoint Url : {0}", url);

Expand Down Expand Up @@ -198,7 +198,7 @@ public static void RegisterWithOms(X509Certificate2 cert, string AgentGuid, stri
}
}

public static void RegisterWithOmsWithBasicRetryAsync(X509Certificate2 cert, string AgentGuid, string logAnalyticsWorkspaceId, string logAnalyticsWorkspaceKey, string logAnalyticsWorkspaceDomain)
public static void RegisterWithOmsWithBasicRetryAsync(X509Certificate2 cert, string AgentGuid, string logAnalyticsWorkspaceId, string logAnalyticsWorkspaceKey, string logAnalyticsWorkspaceDomainPrefixOms)
{
int currentRetry = 0;

Expand All @@ -207,7 +207,7 @@ public static void RegisterWithOmsWithBasicRetryAsync(X509Certificate2 cert, str
try
{
RegisterWithOms(
cert, AgentGuid, logAnalyticsWorkspaceId, logAnalyticsWorkspaceKey, logAnalyticsWorkspaceDomain);
cert, AgentGuid, logAnalyticsWorkspaceId, logAnalyticsWorkspaceKey, logAnalyticsWorkspaceDomainPrefixOms);

// Return or break.
break;
Expand Down Expand Up @@ -236,7 +236,7 @@ public static void RegisterWithOmsWithBasicRetryAsync(X509Certificate2 cert, str
}
}

public static (X509Certificate2 tempCert, (string, byte[]), string) RegisterAgentWithOMS(string logAnalyticsWorkspaceId, string logAnalyticsWorkspaceKey, string logAnalyticsWorkspaceDomain)
public static (X509Certificate2 tempCert, (string, byte[]), string) RegisterAgentWithOMS(string logAnalyticsWorkspaceId, string logAnalyticsWorkspaceKey, string logAnalyticsWorkspaceDomainPrefixOms)
{
X509Certificate2 agentCert = null;
string certString;
Expand Down Expand Up @@ -268,7 +268,7 @@ public static (X509Certificate2 tempCert, (string, byte[]), string) RegisterAgen
RegisterWithOmsWithBasicRetryAsync(agentCert, agentGuid,
logAnalyticsWorkspaceId,
logAnalyticsWorkspaceKey,
logAnalyticsWorkspaceDomain);
logAnalyticsWorkspaceDomainPrefixOms);
}
catch (Exception ex)
{
Expand Down
4 changes: 2 additions & 2 deletions edge-modules/azure-monitor/src/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public static class Constants
public static readonly string MetricUploadDataType = "INSIGHTS_METRICS_BLOB";
public static readonly string IoTUploadMessageIdentifier = "origin-iotedge-metrics-collector";
public static readonly int UploadMaxRetries = 3;
public const string DefaultLogAnalyticsWorkspaceDomain = "opinsights.azure.com";

public const string DefaultLogAnalyticsWorkspaceDomainPrefixOds = ".ods.opinsights.";
public const string DefaultLogAnalyticsWorkspaceDomainPrefixOms = ".oms.opinsights.";
public const string ProductInfo = "IoTEdgeMetricsCollectorModule";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public async Task<bool> PostAsync(string workspaceId, string sharedKey, string c
try
{
// Lazily generate and register certificate.
if (cert == null) {
(X509Certificate2 tempCert, (string certString, byte[] certBuf), string keyString) = CertGenerator.RegisterAgentWithOMS(workspaceId, sharedKey, Constants.DefaultLogAnalyticsWorkspaceDomain);
if (cert == null)
{
(X509Certificate2 tempCert, (string certString, byte[] certBuf), string keyString) = CertGenerator.RegisterAgentWithOMS(workspaceId, sharedKey, Constants.DefaultLogAnalyticsWorkspaceDomainPrefixOms);
cert = tempCert;
}
using (var handler = new HttpClientHandler())
Expand All @@ -71,7 +72,7 @@ public async Task<bool> PostAsync(string workspaceId, string sharedKey, string c
handler.PreAuthenticate = true;
handler.ClientCertificateOptions = ClientCertificateOption.Manual;

Uri requestUri = new Uri("https://" + workspaceId + ".ods." + Constants.DefaultLogAnalyticsWorkspaceDomain + "/OperationalData.svc/PostJsonDataItems");
Uri requestUri = new Uri("https://" + workspaceId + Constants.DefaultLogAnalyticsWorkspaceDomainPrefixOds + Settings.Current.AzureDomain + "/OperationalData.svc/PostJsonDataItems");

using (HttpClient client = new HttpClient(handler))
{
Expand All @@ -86,19 +87,21 @@ public async Task<bool> PostAsync(string workspaceId, string sharedKey, string c
// optionally compress content before sending
int contentLength;
HttpContent contentMsg;
if (Settings.Current.CompressForUpload) {
if (Settings.Current.CompressForUpload)
{
byte[] withHeader = ZlibDeflate(Encoding.UTF8.GetBytes(content));
contentLength = withHeader.Length;

contentMsg = new ByteArrayContent(withHeader);
contentMsg.Headers.Add("Content-Encoding", "deflate");
}
else {
else
{
contentMsg = new StringContent(content, Encoding.UTF8);
contentLength = ASCIIEncoding.Unicode.GetByteCount(content);
}

if (contentLength > 1024 * 1024 )
if (contentLength > 1024 * 1024)
{
LoggerUtil.Writer.LogDebug(
"HTTP post content greater than 1mb" + " " +
Expand All @@ -114,10 +117,12 @@ public async Task<bool> PostAsync(string workspaceId, string sharedKey, string c
response.ReasonPhrase + " " +
responseMsg);

if ((int)response.StatusCode != 200) {
if ((int)response.StatusCode != 200)
{
failurecount += 1;

if (DateTime.Now - lastFailureReportedTime > TimeSpan.FromMinutes(1)) {
if (DateTime.Now - lastFailureReportedTime > TimeSpan.FromMinutes(1))
{
LoggerUtil.Writer.LogDebug(
"abnormal HTTP response code - " +
"responsecode: " + ((int)response.StatusCode).ToString() + " " +
Expand All @@ -132,7 +137,7 @@ public async Task<bool> PostAsync(string workspaceId, string sharedKey, string c
// Regen the cert on next run just to be safe.
cert = null;
}
return ((int) response.StatusCode) == 200;
return ((int)response.StatusCode) == 200;
}
}
}
Expand Down Expand Up @@ -163,10 +168,10 @@ private static byte[] ZlibDeflate(byte[] input)
using (var memoryStream = new MemoryStream())
using (DeflaterOutputStream outStream = new DeflaterOutputStream(memoryStream, deflater))
{
outStream.IsStreamOwner = false;
outStream.Write(input, 0, input.Length);
outStream.Flush();
outStream.Finish();
outStream.IsStreamOwner = false;
outStream.Write(input, 0, input.Length);
outStream.Flush();
outStream.Finish();
return memoryStream.ToArray();
}
}
Expand All @@ -178,7 +183,8 @@ private static byte[] ZlibDeflate(byte[] input)
/// </summary>
private static X509Certificate2 ReadX509CertWithKey(byte[] certBuffer, string keyString)
{
try {
try
{
RSACryptoServiceProvider parsedKey;
X509Certificate2 cert;

Expand All @@ -191,7 +197,7 @@ private static X509Certificate2 ReadX509CertWithKey(byte[] certBuffer, string ke

// parse the private key
PemReader pr = new PemReader(new StringReader(keyString));
AsymmetricCipherKeyPair KeyPair = (AsymmetricCipherKeyPair) pr.ReadObject();
AsymmetricCipherKeyPair KeyPair = (AsymmetricCipherKeyPair)pr.ReadObject();
RSAParameters rsaParams = DotNetUtilities.ToRSAParameters((RsaPrivateCrtKeyParameters)KeyPair.Private);

parsedKey = new RSACryptoServiceProvider();
Expand All @@ -202,7 +208,8 @@ private static X509Certificate2 ReadX509CertWithKey(byte[] certBuffer, string ke
cert = cert.CopyWithPrivateKey(parsedKey);
return cert;
}
catch (Exception e) {
catch (Exception e)
{
// log an error and exit. Modules are restarted automatically so it makes more sense to crash and restart than recover from this.
LoggerUtil.Writer.LogCritical(e.ToString());
Environment.Exit(1);
Expand Down
8 changes: 4 additions & 4 deletions edge-modules/azure-monitor/src/IotHubUpload/IotHubUpload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ namespace Microsoft.Azure.Devices.Edge.Azure.Monitor.IotHubMetricsUpload
public class IotHubMetricsUpload : IMetricsPublisher
{
const string IdentifierPropertyName = "id";
readonly ModuleClient moduleClient;
readonly ModuleClientWrapper ModuleClientWrapper;

public IotHubMetricsUpload(ModuleClient moduleClient)
public IotHubMetricsUpload(ModuleClientWrapper moduleClientWrapper)
{
this.moduleClient = Preconditions.CheckNotNull(moduleClient, nameof(moduleClient));
this.ModuleClientWrapper = Preconditions.CheckNotNull(moduleClientWrapper, nameof(moduleClientWrapper));
}

public async Task<bool> PublishAsync(IEnumerable<Metric> metrics, CancellationToken cancellationToken)
Expand All @@ -47,7 +47,7 @@ public async Task<bool> PublishAsync(IEnumerable<Metric> metrics, CancellationTo
Message metricsMessage = new Message(metricsData);
metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier;

await this.moduleClient.SendEventAsync("metricOutput", metricsMessage);
await this.ModuleClientWrapper.SendMessage("metricOutput", metricsMessage);

LoggerUtil.Writer.LogInformation("Successfully sent metrics via IoT message");
return true;
Expand Down
2 changes: 1 addition & 1 deletion edge-modules/azure-monitor/src/MetricsScrapeAndUpload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public MetricsScrapeAndUpload(MetricsScraper scraper, IMetricsPublisher publishe

public void Start(TimeSpan scrapeAndUploadInterval)
{
this.periodicScrapeAndUpload = new PeriodicTask(this.ScrapeAndUploadMetricsAsync, scrapeAndUploadInterval, new TimeSpan(0, 0, 0), LoggerUtil.Writer, "Scrape and Upload Metrics", true);
this.periodicScrapeAndUpload = new PeriodicTask(this.ScrapeAndUploadMetricsAsync, scrapeAndUploadInterval, TimeSpan.FromMinutes(1), LoggerUtil.Writer, "Scrape and Upload Metrics", true);
}

public void Dispose()
Expand Down
78 changes: 78 additions & 0 deletions edge-modules/azure-monitor/src/ModuleClientWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Devices.Edge.Azure.Monitor
{

public class ModuleClientWrapper : IDisposable
{
ModuleClient inner;
ITransportSettings[] transportSettings;
SemaphoreSlim moduleClientLock;

public ModuleClientWrapper(ModuleClient moduleClient, ITransportSettings[] transportSettings, SemaphoreSlim moduleClientLock)
{
this.inner = Preconditions.CheckNotNull(moduleClient);
this.transportSettings = Preconditions.CheckNotNull(transportSettings);
this.moduleClientLock = Preconditions.CheckNotNull(moduleClientLock);
}

public static async Task<ModuleClientWrapper> BuildModuleClientWrapperAsync(ITransportSettings[] transportSettings)
{
SemaphoreSlim moduleClientLock = new SemaphoreSlim(1, 1);

ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(transportSettings);
moduleClient.ProductInfo = Constants.ProductInfo;
await moduleClient.OpenAsync();

return new ModuleClientWrapper(moduleClient, transportSettings, moduleClientLock);
}

public async Task RecreateClientAsync()
{
await this.moduleClientLock.WaitAsync();

try
{
this.inner.Dispose();
this.inner = await ModuleClient.CreateFromEnvironmentAsync(this.transportSettings);
this.inner.ProductInfo = Constants.ProductInfo;
await this.inner.OpenAsync();

LoggerUtil.Writer.LogInformation("Closed and re-established connection to IoT Hub");
}
catch (Exception e)
{
LoggerUtil.Writer.LogWarning($"Failed closing and re-establishing connection to IoT Hub: {e.ToString()}");
}

this.moduleClientLock.Release();
}

public async Task SendMessage(string outputName, Message message)
{
await this.moduleClientLock.WaitAsync();

try
{
await this.inner.SendEventAsync(outputName, message);
}
catch (Exception e)
{
LoggerUtil.Writer.LogError($"Failed sending metrics as IoT message: {e.ToString()}");
}

this.moduleClientLock.Release();
}

public void Dispose()
{
this.inner.Dispose();
this.moduleClientLock.Dispose();
}
}
}
11 changes: 6 additions & 5 deletions edge-modules/azure-monitor/src/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ static async Task<int> MainAsync()
var transportSetting = new AmqpTransportSettings(TransportType.Amqp_Tcp_Only);

ITransportSettings[] transportSettings = { transportSetting };
ModuleClient moduleClient = null;
ModuleClientWrapper moduleClientWrapper = null;
try
{
moduleClient = await ModuleClient.CreateFromEnvironmentAsync(transportSettings);
moduleClient.ProductInfo = Constants.ProductInfo;
moduleClientWrapper = await ModuleClientWrapper.BuildModuleClientWrapperAsync(transportSettings);

PeriodicTask periodicIothubConnect = new PeriodicTask(moduleClientWrapper.RecreateClientAsync, Settings.Current.IotHubConnectFrequency, TimeSpan.FromMinutes(1), LoggerUtil.Writer, "Reconnect to IoT Hub", true);

MetricsScraper scraper = new MetricsScraper(Settings.Current.Endpoints);
IMetricsPublisher publisher;
Expand All @@ -62,7 +63,7 @@ static async Task<int> MainAsync()
}
else
{
publisher = new IotHubMetricsUpload.IotHubMetricsUpload(moduleClient);
publisher = new IotHubMetricsUpload.IotHubMetricsUpload(moduleClientWrapper);
}

using (MetricsScrapeAndUpload metricsScrapeAndUpload = new MetricsScrapeAndUpload(scraper, publisher))
Expand All @@ -79,7 +80,7 @@ static async Task<int> MainAsync()
}
finally
{
moduleClient?.Dispose();
moduleClientWrapper?.Dispose();
}

completed.Set();
Expand Down
9 changes: 9 additions & 0 deletions edge-modules/azure-monitor/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ Optional config items:
- This can only be turned on if `UploadTarget` set to `IotMessage`
- ex: `false`
- Defaults to false.
- `IotHubConnectFrequency`
- Frequency at which the module will connect to IoT Hub for adoption profiling statistics
- Input taken in timespan format
- ex: `00:12:00`
- Defaults to every 24 hours
- `AzureDomain`
- Configurable azure domain which is used to construct the log analytics upload address.
- ex: `azure.com.cn`
- Defaults to `azure.com`


## Upload Target:
Expand Down
Loading

0 comments on commit 057da6a

Please sign in to comment.