Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensuring "Path" returns the right destination in the case of via-sender #6941

Merged
merged 3 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class MessageSender : ClientEntity, IMessageSender
readonly ActiveClientLinkManager clientLinkManager;
readonly ServiceBusDiagnosticSource diagnosticSource;
readonly bool isViaSender;
readonly string transferDestinationPath;

/// <summary>
/// Creates a new AMQP MessageSender.
Expand Down Expand Up @@ -152,7 +151,7 @@ internal MessageSender(

this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.Path = entityPath;
this.TransferDestinationPath = transferDestinationPath;
this.SendingLinkDestination = entityPath;
this.EntityType = entityType;
this.ServiceBusConnection.ThrowIfClosed();

Expand All @@ -177,7 +176,8 @@ internal MessageSender(
if (!string.IsNullOrWhiteSpace(transferDestinationPath))
{
this.isViaSender = true;
this.transferDestinationPath = transferDestinationPath;
this.TransferDestinationPath = transferDestinationPath;
this.ViaEntityPath = entityPath;
}

MessagingEventSource.Log.MessageSenderCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.ClientId);
Expand All @@ -190,15 +190,21 @@ internal MessageSender(
public override IList<ServiceBusPlugin> RegisteredPlugins { get; } = new List<ServiceBusPlugin>();

/// <summary>
/// Gets the entity path of the MessageSender.
/// Gets the entity path of the MessageSender.
/// In the case of a via-sender, this returns the path of the via entity.
/// </summary>
public override string Path { get; }

/// <summary>
/// Gets the transfer destination path (send-via) of the MessageSender.
/// In the case of a via-sender, gets the final destination path of the messages; null otherwise.
/// </summary>
public string TransferDestinationPath { get; }

/// <summary>
/// In the case of a via-sender, the message is sent to <see cref="TransferDestinationPath"/> via <see cref="ViaEntityPath"/>; null otherwise.
/// </summary>
public string ViaEntityPath { get; }

/// <summary>
/// Duration after which individual operations will timeout.
/// </summary>
Expand All @@ -215,6 +221,8 @@ public override TimeSpan OperationTimeout

internal MessagingEntityType? EntityType { get; }

internal string SendingLinkDestination { get; set; }

ICbsTokenProvider CbsTokenProvider { get; }

FaultTolerantAmqpObject<SendingAmqpLink> SendLinkManager { get; }
Expand Down Expand Up @@ -672,36 +680,36 @@ async Task OnCancelScheduledMessageAsync(long sequenceNumber)

async Task<SendingAmqpLink> CreateLinkAsync(TimeSpan timeout)
{
MessagingEventSource.Log.AmqpSendLinkCreateStart(this.ClientId, this.EntityType, this.Path);
MessagingEventSource.Log.AmqpSendLinkCreateStart(this.ClientId, this.EntityType, this.SendingLinkDestination);

var amqpLinkSettings = new AmqpLinkSettings
{
Role = false,
InitialDeliveryCount = 0,
Target = new Target { Address = this.Path },
Target = new Target { Address = this.SendingLinkDestination },
Source = new Source { Address = this.ClientId },
};
if (this.EntityType != null)
{
amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, (int)this.EntityType);
}

var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.Path);
var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.SendingLinkDestination);

string[] audience;
if (this.isViaSender)
{
var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.transferDestinationPath);
var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.TransferDestinationPath);
audience = new string[] { endpointUri.AbsoluteUri, transferDestinationEndpointUri.AbsoluteUri };
amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.transferDestinationPath);
amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.TransferDestinationPath);
}
else
{
audience = new string[] { endpointUri.AbsoluteUri };
}

string[] claims = {ClaimConstants.Send};
var amqpSendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.Path, this.ServiceBusConnection, endpointUri, audience, claims, this.CbsTokenProvider, amqpLinkSettings, this.ClientId);
var amqpSendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.SendingLinkDestination, this.ServiceBusConnection, endpointUri, audience, claims, this.CbsTokenProvider, amqpLinkSettings, this.ClientId);
Tuple<AmqpObject, DateTime> linkDetails = await amqpSendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);

var sendingAmqpLink = (SendingAmqpLink) linkDetails.Item1;
Expand All @@ -720,7 +728,7 @@ async Task<SendingAmqpLink> CreateLinkAsync(TimeSpan timeout)

async Task<RequestResponseAmqpLink> CreateRequestResponseLinkAsync(TimeSpan timeout)
{
var entityPath = this.Path + '/' + AmqpClientConstants.ManagementAddress;
var entityPath = this.SendingLinkDestination + '/' + AmqpClientConstants.ManagementAddress;
var amqpLinkSettings = new AmqpLinkSettings();
amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);

Expand All @@ -729,9 +737,9 @@ async Task<RequestResponseAmqpLink> CreateRequestResponseLinkAsync(TimeSpan time
string[] audience;
if (this.isViaSender)
{
var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.transferDestinationPath);
var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.TransferDestinationPath);
audience = new string[] { endpointUri.AbsoluteUri, transferDestinationEndpointUri.AbsoluteUri };
amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.transferDestinationPath);
amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.TransferDestinationPath);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ namespace Microsoft.Azure.ServiceBus.Core
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public string TransferDestinationPath { get; }
public string ViaEntityPath { get; }
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
protected override System.Threading.Tasks.Task OnClosingAsync() { }
public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.ServiceBus.UnitTests
{
using Core;
using Xunit;

public class MessageSenderTests
{
private MessageSender viaSender;
private MessageSender nonViaSender;

public MessageSenderTests()
{
var builder = new ServiceBusConnectionStringBuilder("blah.com", "path", "key-name", "key-value");
var connection = new ServiceBusConnection(builder);
viaSender = new MessageSender(connection, "path", "via");
nonViaSender = new MessageSender(connection, "path");
}

[Fact]
[DisplayTestMethodName]
public void Path_reflects_actual_link_destination()
{
Assert.Equal("via", viaSender.Path);
Assert.Equal("path", nonViaSender.Path);
}

[Fact]
[DisplayTestMethodName]
public void TransferDestinationPath_should_be_final_destination_name()
{
Assert.Equal("path", viaSender.TransferDestinationPath);
Assert.Null(nonViaSender.TransferDestinationPath);
}

[Fact]
[DisplayTestMethodName]
public void ViaEntityPath_should_be_via_entity_name()
{
Assert.Equal("via", viaSender.ViaEntityPath);
Assert.Null(nonViaSender.ViaEntityPath);
}
}
}