Skip to content

Commit

Permalink
Add support for Queue length metric in EdgeHub (#2139) (#2227)
Browse files Browse the repository at this point in the history
* Add support for Queue length metric in EdgeHub

* Fix tests

* Fix build

* Update edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Message.cs

Co-Authored-By: Lee Fitchett <lefitche@microsoft.com>

* Add comments and rename inner object to "inner"

* Fix indentation

* Remove offset check. Some existing tests validate offset to be -ve

Co-authored-by: Lee Fitchett <lefitche@microsoft.com>

Co-authored-by: Lee Fitchett <lefitche@microsoft.com>
Co-authored-by: Andrew Smith <andsmi@microsoft.com>
  • Loading branch information
3 people authored Jan 4, 2020
1 parent 3ea4db7 commit 4ce6aa4
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Storage
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Routing.Core;
using Microsoft.Azure.Devices.Routing.Core.Checkpointers;
using Microsoft.Azure.Devices.Routing.Core.MessageSources;
using Microsoft.Azure.Devices.Routing.Core.Query.Types;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using static System.FormattableString;
Expand Down Expand Up @@ -72,7 +74,7 @@ public async Task RemoveEndpoint(string endpointId)
}
}

public async Task<long> Add(string endpointId, IMessage message)
public async Task<IMessage> Add(string endpointId, IMessage message)
{
Preconditions.CheckNotNull(message, nameof(message));
if (!this.endpointSequentialStores.TryGetValue(Preconditions.CheckNonWhiteSpace(endpointId, nameof(endpointId)), out ISequentialStore<MessageRef> sequentialStore))
Expand Down Expand Up @@ -110,7 +112,7 @@ await this.messageEntityStore.PutOrUpdate(
{
long offset = await sequentialStore.Append(new MessageRef(edgeMessageId));
Events.MessageAdded(offset, edgeMessageId, endpointId, this.messageCount);
return offset;
return new MessageWithOffset(message, offset);
}
}
catch (Exception)
Expand Down Expand Up @@ -164,7 +166,7 @@ public MessageWrapper(IMessage message)

public MessageWrapper(IMessage message, DateTime timeStamp, int refCount)
{
Preconditions.CheckArgument(timeStamp != default(DateTime));
Preconditions.CheckArgument(timeStamp != default);
this.Message = Preconditions.CheckNotNull(message, nameof(message));
this.TimeStamp = timeStamp;
this.RefCount = Preconditions.CheckRange(refCount, 0, nameof(refCount));
Expand Down Expand Up @@ -469,7 +471,7 @@ public async Task<IEnumerable<IMessage>> GetNext(int batchSize)
else
{
messageWrapper
.Map(m => this.AddMessageOffset(m.Message, item.offset))
.Map(m => new MessageWithOffset(m.Message, item.offset))
.ForEach(m => messageList.Add(m));
}
}
Expand All @@ -486,18 +488,6 @@ public async Task<IEnumerable<IMessage>> GetNext(int batchSize)

return messageList;
}

IMessage AddMessageOffset(IMessage message, long offset)
{
return new Message(
message.MessageSource,
message.Body,
message.Properties.ToDictionary(),
message.SystemProperties.ToDictionary(),
offset,
message.EnqueuedTime,
message.DequeuedTime);
}
}

/// <summary>
Expand All @@ -523,6 +513,38 @@ public MessageRef(string edgeMessageId, DateTime timeStamp)
public DateTime TimeStamp { get; }
}

// Wrapper to allow adding offset to an existing IMessage object
class MessageWithOffset : IMessage
{
readonly IMessage inner;

public MessageWithOffset(IMessage message, long offset)
{
this.inner = Preconditions.CheckNotNull(message, nameof(message));
this.Offset = Preconditions.CheckRange(offset, 0, nameof(offset));
}

public void Dispose() => this.inner.Dispose();

public IMessageSource MessageSource => this.inner.MessageSource;

public byte[] Body => this.inner.Body;

public IReadOnlyDictionary<string, string> Properties => this.inner.Properties;

public IReadOnlyDictionary<string, string> SystemProperties => this.inner.SystemProperties;

public long Offset { get; }

public DateTime EnqueuedTime => this.inner.EnqueuedTime;

public DateTime DequeuedTime => this.inner.DequeuedTime;

public QueryValue GetQueryValue(string queryString) => this.inner.GetQueryValue(queryString);

public long Size() => this.inner.Size();
}

static class MetricsV0
{
static readonly TimerOptions MessageEntityStorePutOrUpdateLatencyOptions = new TimerOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IMessageStore : IDisposable
/// Creates an entry in the message queue for the given endpoint
/// and returns the offset of that entry.
/// </summary>
Task<long> Add(string endpointId, IMessage message);
Task<IMessage> Add(string endpointId, IMessage message);

/// <summary>
/// Returns an iterator that allows reading messages starting from the given offset.
Expand Down
30 changes: 21 additions & 9 deletions edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,23 @@ public Message(IMessageSource messageSource, byte[] body, IDictionary<string, st
}

public Message(IMessageSource messageSource, byte[] body, IDictionary<string, string> properties, IDictionary<string, string> systemProperties, long offset, DateTime enqueuedTime, DateTime dequeuedTime)
: this(
messageSource,
body,
new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(Preconditions.CheckNotNull(properties), StringComparer.OrdinalIgnoreCase)) as IReadOnlyDictionary<string, string>,
new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(Preconditions.CheckNotNull(systemProperties), StringComparer.OrdinalIgnoreCase)),
offset,
enqueuedTime,
dequeuedTime)
{
}

public Message(IMessageSource messageSource, byte[] body, IReadOnlyDictionary<string, string> properties, IReadOnlyDictionary<string, string> systemProperties, long offset, DateTime enqueuedTime, DateTime dequeuedTime)
{
this.MessageSource = messageSource;
this.Body = Preconditions.CheckNotNull(body);
this.Properties = new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(Preconditions.CheckNotNull(properties), StringComparer.OrdinalIgnoreCase));
this.SystemProperties = new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(Preconditions.CheckNotNull(systemProperties), StringComparer.OrdinalIgnoreCase));
this.Properties = Preconditions.CheckNotNull(properties);
this.SystemProperties = Preconditions.CheckNotNull(systemProperties);
this.Offset = offset;
this.EnqueuedTime = enqueuedTime;
this.DequeuedTime = dequeuedTime;
Expand Down Expand Up @@ -94,13 +106,13 @@ public bool Equals(Message other)
}

return this.MessageSource.Equals(other.MessageSource) &&
this.Offset == other.Offset &&
this.Body.SequenceEqual(other.Body) &&
this.Properties.Keys.Count() == other.Properties.Keys.Count() &&
this.Properties.Keys.All(
key => other.Properties.ContainsKey(key) && Equals(this.Properties[key], other.Properties[key]) &&
this.SystemProperties.Keys.Count() == other.SystemProperties.Keys.Count() &&
this.SystemProperties.Keys.All(skey => other.SystemProperties.ContainsKey(skey) && Equals(this.SystemProperties[skey], other.SystemProperties[skey])));
this.Offset == other.Offset &&
this.Body.SequenceEqual(other.Body) &&
this.Properties.Keys.Count() == other.Properties.Keys.Count() &&
this.Properties.Keys.All(
key => other.Properties.ContainsKey(key) && Equals(this.Properties[key], other.Properties[key]) &&
this.SystemProperties.Keys.Count() == other.SystemProperties.Keys.Count() &&
this.SystemProperties.Keys.All(skey => other.SystemProperties.ContainsKey(skey) && Equals(this.SystemProperties[skey], other.SystemProperties[skey])));
}

public override bool Equals(object obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ namespace Microsoft.Azure.Devices.Routing.Core.Checkpointers
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Azure.Devices.Edge.Util.Metrics;
using Microsoft.Extensions.Logging;
using static System.FormattableString;
using EdgeMetrics = Microsoft.Azure.Devices.Edge.Util.Metrics.Metrics;

public class Checkpointer : ICheckpointer
{
Expand Down Expand Up @@ -61,6 +63,7 @@ public static async Task<Checkpointer> CreateAsync(string id, ICheckpointStore s
public void Propose(IMessage message)
{
this.Proposed = Math.Max(message.Offset, this.Proposed);
Metrics.SetQueueLength(this);
}

public bool Admit(IMessage message)
Expand Down Expand Up @@ -100,6 +103,7 @@ public async Task CommitAsync(ICollection<IMessage> successful, ICollection<IMes
this.LastFailedRevivalTime = lastFailedRevivalTime;
this.UnhealthySince = unhealthySince;
await this.store.SetCheckpointDataAsync(this.Id, new CheckpointData(offset, this.LastFailedRevivalTime, this.UnhealthySince), token);
Metrics.SetQueueLength(this);
}

Events.CommitFinished(this);
Expand Down Expand Up @@ -183,5 +187,15 @@ static string GetContextString(Checkpointer checkpointer)
return Invariant($"CheckpointerId: {checkpointer.Id}, Offset: {checkpointer.Offset}, Proposed: {checkpointer.Proposed}");
}
}

static class Metrics
{
static readonly IMetricsGauge QueueLength = EdgeMetrics.Instance.CreateGauge(
"queue_length",
"Number of messages pending to be processed for the endpoint",
new List<string> { "endpoint" });

public static void SetQueueLength(Checkpointer checkpointer) => QueueLength.Set(checkpointer.Proposed - checkpointer.Offset, new[] { checkpointer.Id });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public async Task Invoke(IMessage message)

using (MetricsV0.StoreLatency(this.Endpoint.Id))
{
long offset = await this.messageStore.Add(this.Endpoint.Id, message);
this.checkpointer.Propose(message);
Events.AddMessageSuccess(this, offset);
IMessage storedMessage = await this.messageStore.Add(this.Endpoint.Id, message);
this.checkpointer.Propose(storedMessage);
Events.AddMessageSuccess(this, storedMessage.Offset);
}

this.hasMessagesInQueue.Set();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ public static IEnumerable<object[]> GetRoutingMessages()
};
string dataString = JsonConvert.SerializeObject(data);
byte[] messageBytes = Encoding.UTF8.GetBytes(dataString);
var properties = new Dictionary<string, string>()
IDictionary<string, string> properties = new Dictionary<string, string>()
{
{ "model", "temperature" },
{ "level", "one" }
};

var systemProperties = new Dictionary<string, string>()
IDictionary<string, string> systemProperties = new Dictionary<string, string>()
{
{ SystemProperties.MessageId, "12345" },
{ SystemProperties.UserId, "UserId10" },
Expand Down
Loading

0 comments on commit 4ce6aa4

Please sign in to comment.