Skip to content

Commit

Permalink
Adding amqp support for pnp to edge (#3273)
Browse files Browse the repository at this point in the history
Adding downstream amqp support for Plug and Play to edge for modules and devices
  • Loading branch information
dylanbronson authored Jul 23, 2020
1 parent c8a78ee commit 0ee6978
Show file tree
Hide file tree
Showing 19 changed files with 152 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ static class IotHubAmqpProperty
public static readonly AmqpSymbol QueuePartitionKey = "x-opt-partition-key";
public static readonly AmqpSymbol ChannelCorrelationId = AmqpConstants.Vendor + ":channel-correlation-id";
public static readonly AmqpSymbol GatewayReconnect = AmqpConstants.Vendor + ":gateway-reconnect";
public static readonly AmqpSymbol ModelId = AmqpConstants.Vendor + ":model-id";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ public DeviceBoundLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public EventsLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public abstract class LinkHandler : ILinkHandler
{
readonly IConnectionHandler connectionHandler;
readonly IProductInfoStore productInfoStore;
readonly IModelIdStore modelIdStore;

protected LinkHandler(
IIdentity identity,
Expand All @@ -23,7 +24,8 @@ protected LinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
{
this.Identity = Preconditions.CheckNotNull(identity, nameof(identity));
this.MessageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
Expand All @@ -33,12 +35,19 @@ protected LinkHandler(
this.Link.SafeAddClosed(this.OnLinkClosed);
this.connectionHandler = Preconditions.CheckNotNull(connectionHandler, nameof(connectionHandler));
this.productInfoStore = Preconditions.CheckNotNull(productInfoStore, nameof(productInfoStore));
this.modelIdStore = Preconditions.CheckNotNull(modelIdStore, nameof(modelIdStore));

string clientVersion = null;
if (this.Link.Settings?.Properties?.TryGetValue(IotHubAmqpProperty.ClientVersion, out clientVersion) ?? false)
{
this.ClientVersion = Option.Maybe(clientVersion);
}

string modelId = null;
if (this.Link.Settings?.Properties?.TryGetValue(IotHubAmqpProperty.ModelId, out modelId) ?? false)
{
this.ModelId = Option.Maybe(modelId);
}
}

public IAmqpLink Link { get; }
Expand All @@ -61,6 +70,8 @@ protected LinkHandler(

protected Option<string> ClientVersion { get; }

public Option<string> ModelId { get; }

public async Task OpenAsync(TimeSpan timeout)
{
if (!await this.Authenticate())
Expand Down Expand Up @@ -108,6 +119,7 @@ protected async Task<bool> Authenticate()
await this.ClientVersion
.Filter(c => !string.IsNullOrWhiteSpace(c))
.ForEachAsync(c => this.productInfoStore.SetProductInfo(this.Identity.Id, c));
await this.ModelId.ForEachAsync(m => this.modelIdStore.SetModelId(this.Identity.Id, m));
}

return authenticated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ public class LinkHandlerProvider : ILinkHandlerProvider
readonly IMessageConverter<AmqpMessage> methodMessageConverter;
readonly IIdentityProvider identityProvider;
readonly IProductInfoStore productInfoStore;
readonly IModelIdStore modelIdStore;
readonly IDictionary<(UriPathTemplate Template, bool IsReceiver), LinkType> templatesList;

public LinkHandlerProvider(
IMessageConverter<AmqpMessage> messageConverter,
IMessageConverter<AmqpMessage> twinMessageConverter,
IMessageConverter<AmqpMessage> methodMessageConverter,
IIdentityProvider identityProvider,
IProductInfoStore productInfoStore)
: this(messageConverter, twinMessageConverter, methodMessageConverter, identityProvider, productInfoStore, DefaultTemplatesList)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: this(messageConverter, twinMessageConverter, methodMessageConverter, identityProvider, productInfoStore, modelIdStore, DefaultTemplatesList)
{
}

Expand All @@ -53,13 +55,15 @@ public LinkHandlerProvider(
IMessageConverter<AmqpMessage> methodMessageConverter,
IIdentityProvider identityProvider,
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore,
IDictionary<(UriPathTemplate Template, bool IsReceiver), LinkType> templatesList)
{
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.twinMessageConverter = Preconditions.CheckNotNull(twinMessageConverter, nameof(twinMessageConverter));
this.methodMessageConverter = Preconditions.CheckNotNull(methodMessageConverter, nameof(methodMessageConverter));
this.identityProvider = Preconditions.CheckNotNull(identityProvider, nameof(identityProvider));
this.productInfoStore = Preconditions.CheckNotNull(productInfoStore, nameof(productInfoStore));
this.modelIdStore = Preconditions.CheckNotNull(modelIdStore, nameof(modelIdStore));
this.templatesList = Preconditions.CheckNotNull(templatesList, nameof(templatesList));
}

Expand All @@ -86,25 +90,25 @@ internal ILinkHandler GetLinkHandler(LinkType linkType, IAmqpLink link, Uri uri,
switch (linkType)
{
case LinkType.C2D:
return new DeviceBoundLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore);
return new DeviceBoundLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore, this.modelIdStore);

case LinkType.Events:
return new EventsLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore);
return new EventsLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore, this.modelIdStore);

case LinkType.ModuleMessages:
return new ModuleMessageLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore);
return new ModuleMessageLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore, this.modelIdStore);

case LinkType.MethodSending:
return new MethodSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.productInfoStore);
return new MethodSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.productInfoStore, this.modelIdStore);

case LinkType.MethodReceiving:
return new MethodReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.productInfoStore);
return new MethodReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.productInfoStore, this.modelIdStore);

case LinkType.TwinReceiving:
return new TwinReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.productInfoStore);
return new TwinReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.productInfoStore, this.modelIdStore);

case LinkType.TwinSending:
return new TwinSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.productInfoStore);
return new TwinSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.productInfoStore, this.modelIdStore);

default:
throw new InvalidOperationException($"Invalid link type {linkType}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public MethodReceivingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ public MethodSendingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public ModuleMessageLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ protected ReceivingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
Preconditions.CheckArgument(link.IsReceiver, $"Link {requestUri} cannot receive");
this.ReceivingLink = link;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ protected SendingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
Preconditions.CheckArgument(!link.IsReceiver, $"Link {requestUri} cannot send");
this.SendingAmqpLink = link;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public TwinReceivingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ public TwinSendingLinkHandler(
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
IProductInfoStore productInfoStore,
IModelIdStore modelIdStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ protected override void Load(ContainerBuilder builder)
IMessageConverter<AmqpMessage> directMethodMessageConverter = new AmqpDirectMethodMessageConverter();
var identityProvider = c.Resolve<IIdentityProvider>();
var productInfoStore = await c.Resolve<Task<IProductInfoStore>>();
ILinkHandlerProvider linkHandlerProvider = new LinkHandlerProvider(messageConverter, twinMessageConverter, directMethodMessageConverter, identityProvider, productInfoStore);
var modelIdStore = await c.Resolve<Task<IModelIdStore>>();
ILinkHandlerProvider linkHandlerProvider = new LinkHandlerProvider(messageConverter, twinMessageConverter, directMethodMessageConverter, identityProvider, productInfoStore, modelIdStore);
return linkHandlerProvider;
})
.As<Task<ILinkHandlerProvider>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ public void CreateTest()
var messageConverter = Mock.Of<IMessageConverter<AmqpMessage>>();
var identity = Mock.Of<IIdentity>(d => d.Id == "d1");
var productInfoStore = Mock.Of<IProductInfoStore>();
var modelIdStore = Mock.Of<IModelIdStore>();

// Act
ILinkHandler linkHandler = new DeviceBoundLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore);
ILinkHandler linkHandler = new DeviceBoundLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore);

// Assert
Assert.NotNull(linkHandler);
Expand All @@ -57,9 +58,10 @@ public void CreateThrowsExceptionIfReceiverLinkTest()
var messageConverter = Mock.Of<IMessageConverter<AmqpMessage>>();
var identity = Mock.Of<IIdentity>(d => d.Id == "d1");
var productInfoStore = Mock.Of<IProductInfoStore>();
var modelIdStore = Mock.Of<IModelIdStore>();

// Act / Assert
Assert.Throws<ArgumentException>(() => new DeviceBoundLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore));
Assert.Throws<ArgumentException>(() => new DeviceBoundLinkHandler(identity, amqpLink, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore));
}

[Fact]
Expand Down Expand Up @@ -91,8 +93,9 @@ public async Task SendMessageTest()
var messageConverter = new AmqpMessageConverter();
var identity = Mock.Of<IIdentity>(d => d.Id == "d1");
var productInfoStore = Mock.Of<IProductInfoStore>();
var modelIdStore = Mock.Of<IModelIdStore>();

var sendingLinkHandler = new DeviceBoundLinkHandler(identity, sendingLink, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore);
var sendingLinkHandler = new DeviceBoundLinkHandler(identity, sendingLink, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore, modelIdStore);
var body = new byte[] { 0, 1, 2, 3 };
IMessage message = new EdgeMessage.Builder(body).Build();
var deliveryState = new Mock<DeliveryState>(new AmqpSymbol(string.Empty), AmqpConstants.AcceptedOutcome.DescriptorCode);
Expand Down
Loading

0 comments on commit 0ee6978

Please sign in to comment.