Skip to content

Commit

Permalink
Fix Outbox table creation when outbox table name is set in behaviour …
Browse files Browse the repository at this point in the history
…- 5.0 (#1029)

* Backport of outbox table creation fix

* small refactoring based on review
  • Loading branch information
jpalac authored Dec 17, 2024
1 parent 460663e commit 27866b9
Show file tree
Hide file tree
Showing 24 changed files with 545 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ public ConfigureAzureTablePersistence(TableServiceClient tableServiceClient = nu

Task IConfigureEndpointTestExecution.Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
{
configuration.UsePersistence<AzureTablePersistence, StorageType.Sagas>()
.DisableTableCreation()
.UseTableServiceClient(tableServiceClient);
var sagaPersistence = configuration.UsePersistence<AzureTablePersistence, StorageType.Sagas>()
.UseTableServiceClient(tableServiceClient);

configuration.UsePersistence<AzureTablePersistence, StorageType.Outbox>();
var outboxPersistence = configuration.UsePersistence<AzureTablePersistence, StorageType.Outbox>();

if (!settings.TryGet<AllowTableCreation>(out _))
{
sagaPersistence.DisableTableCreation();
outboxPersistence.DisableTableCreation();
}

if (endpointName != Conventions.EndpointNamingConvention(typeof(When_saga_started_concurrently.ConcurrentHandlerEndpoint)))
{
Expand All @@ -35,7 +40,12 @@ Task IConfigureEndpointTestExecution.Configure(string endpointName, EndpointConf
{
configuration.Pipeline.Register(new PartitionKeyProviderBehavior.Registration());
}
if (!settings.TryGet<DoNotRegisterDefaultTableNameProvider>(out _))

if (settings.TryGet<TableNameProvider>(out var tableNameProvider))
{
configuration.Pipeline.Register(new TableInformationProviderBehavior.Registration(tableNameProvider.GetTableName));
}
else if (!settings.TryGet<DoNotRegisterDefaultTableNameProvider>(out _))
{
configuration.Pipeline.Register(new TableInformationProviderBehavior.Registration());
}
Expand Down Expand Up @@ -75,26 +85,53 @@ public Registration() : base(nameof(PartitionKeyProviderBehavior),

class TableInformationProviderBehavior : Behavior<IIncomingLogicalMessageContext>
{
public TableInformationProviderBehavior(IReadOnlySettings settings) => this.settings = settings;
readonly IReadOnlySettings settings;
readonly Func<IIncomingLogicalMessageContext, string> tableNameProvider;

public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
public TableInformationProviderBehavior(IReadOnlySettings settings, Func<string> tableNameProvider)
{
this.settings = settings;

this.tableNameProvider = tableNameProvider == null
? DefaultTableNameProvider
: context => tableNameProvider();
}

string DefaultTableNameProvider(IIncomingLogicalMessageContext context)
{
if (!settings.TryGet<TableInformation>(out _) && !context.Extensions.TryGet<TableInformation>(out _))
{
context.Extensions.Set(new TableInformation(SetupFixture.TableName));
return SetupFixture.TableName;
}
else
{
return null;
}
return next();
}

readonly IReadOnlySettings settings;
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
var tableName = tableNameProvider(context);

if (!string.IsNullOrEmpty(tableName))
{
context.Extensions.Set(new TableInformation(tableName));
}

return next();
}

public class Registration : RegisterStep
{
public Registration() : base(nameof(TableInformationProviderBehavior),
typeof(TableInformationProviderBehavior),
"Populates the table information",
provider => new TableInformationProviderBehavior(provider.GetRequiredService<IReadOnlySettings>())) =>
InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
public Registration(Func<string> tableNameProvider = null)
: base(
nameof(TableInformationProviderBehavior),
typeof(TableInformationProviderBehavior),
"Populates the table information",
serviceProvider => new TableInformationProviderBehavior(
serviceProvider.GetRequiredService<IReadOnlySettings>(),
tableNameProvider))
=> InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Outbox;
using Pipeline;
Expand All @@ -14,8 +15,12 @@
/// </summary>
public sealed class LogicalOutboxBehavior : IBehavior<IIncomingLogicalMessageContext, IIncomingLogicalMessageContext>
{
internal LogicalOutboxBehavior(TableClientHolderResolver tableClientHolderResolver) =>
internal LogicalOutboxBehavior(TableClientHolderResolver tableClientHolderResolver, TableCreator tableCreator)
{
this.tableClientHolderResolver = tableClientHolderResolver;
this.tableCreator = tableCreator;

}

/// <inheritdoc />
public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingLogicalMessageContext, Task> next)
Expand Down Expand Up @@ -49,13 +54,15 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL

var setAsDispatchedHolder = context.Extensions.Get<SetAsDispatchedHolder>();
setAsDispatchedHolder.PartitionKey = partitionKey;
setAsDispatchedHolder.TableClientHolder = tableHolder;
setAsDispatchedHolder.TableClientHolder = tableHolder ?? throw new InvalidOperationException("Outbox table name not given. Consider calling DefaultTable(string) on the persistence or alternatively supply the table name as part of the message handling pipeline.");

azureStorageOutboxTransaction.PartitionKey = partitionKey;
azureStorageOutboxTransaction.StorageSession.TableClientHolder = tableHolder;

setAsDispatchedHolder.ThrowIfTableClientIsNotSet();

await tableCreator.CreateTableIfNotExists(tableHolder.TableClient, CancellationToken.None).ConfigureAwait(false);

var outboxRecord = await tableHolder.TableClient.ReadOutboxRecord(context.MessageId, azureStorageOutboxTransaction.PartitionKey.Value, context.Extensions, context.CancellationToken)
.ConfigureAwait(false);

Expand Down Expand Up @@ -102,5 +109,6 @@ static AddressTag DeserializeRoutingStrategy(Dictionary<string, string> options)
}

readonly TableClientHolderResolver tableClientHolderResolver;
readonly TableCreator tableCreator;
}
}
14 changes: 11 additions & 3 deletions src/NServiceBus.Persistence.AzureTable/Outbox/OutboxPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

class OutboxPersister : IOutboxStorage
{
public OutboxPersister(TableClientHolderResolver tableClientHolderResolver)
=> this.tableClientHolderResolver = tableClientHolderResolver;
public OutboxPersister(TableClientHolderResolver tableClientHolderResolver, TableCreator tableCreator)
{
this.tableClientHolderResolver = tableClientHolderResolver;
this.tableCreator = tableCreator;
}

public Task<IOutboxTransaction> BeginTransaction(ContextBag context, CancellationToken cancellationToken = default)
{
Expand All @@ -26,9 +29,11 @@ public Task<IOutboxTransaction> BeginTransaction(ContextBag context, Cancellatio

public async Task<OutboxMessage> Get(string messageId, ContextBag context, CancellationToken cancellationToken = default)
{
var tableClientHolder = tableClientHolderResolver.ResolveAndSetIfAvailable(context);

var setAsDispatchedHolder = new SetAsDispatchedHolder
{
TableClientHolder = tableClientHolderResolver.ResolveAndSetIfAvailable(context)
TableClientHolder = tableClientHolder
};
context.Set(setAsDispatchedHolder);

Expand All @@ -48,6 +53,8 @@ public async Task<OutboxMessage> Get(string messageId, ContextBag context, Cance

setAsDispatchedHolder.ThrowIfTableClientIsNotSet();

await tableCreator.CreateTableIfNotExists(tableClientHolder.TableClient, cancellationToken).ConfigureAwait(false);

var outboxRecord = await setAsDispatchedHolder.TableClientHolder.TableClient
.ReadOutboxRecord(messageId, partitionKey, context, cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -101,5 +108,6 @@ public Task SetAsDispatched(string messageId, ContextBag context, CancellationTo
}

readonly TableClientHolderResolver tableClientHolderResolver;
readonly TableCreator tableCreator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ class OutboxStorage : Feature
{
internal OutboxStorage()
{
Defaults(s =>
{
s.EnableFeatureByDefault<SynchronizedStorage>();
});
Defaults(s => s.EnableFeatureByDefault<SynchronizedStorage>());

DependsOn<Outbox>();
DependsOn<SynchronizedStorage>();
Expand All @@ -20,7 +17,7 @@ internal OutboxStorage()
protected override void Setup(FeatureConfigurationContext context)
{
context.Services.AddSingleton<IOutboxStorage, OutboxPersister>();
context.Services.AddTransient(provider => new LogicalOutboxBehavior(provider.GetRequiredService<TableClientHolderResolver>()));
context.Services.AddTransient(provider => new LogicalOutboxBehavior(provider.GetRequiredService<TableClientHolderResolver>(), provider.GetRequiredService<TableCreator>()));

context.Pipeline.Register(provider => provider.GetRequiredService<LogicalOutboxBehavior>(), "Behavior that mimics the outbox as part of the logical stage.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace NServiceBus.Persistence.AzureTable
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Threading;
Expand All @@ -15,7 +14,7 @@ sealed class AzureSagaPersister : ISagaPersister
{
public AzureSagaPersister(
IProvideTableServiceClient tableServiceClientProvider,
bool disableTableCreation,
TableCreator tableCreator,
bool compatibilityMode,
SecondaryIndex secondaryIndex,
string conventionalTablePrefix,
Expand All @@ -28,7 +27,7 @@ public AzureSagaPersister(
this.jsonSerializer = jsonSerializer;
this.conventionalTablePrefix = conventionalTablePrefix;
this.compatibilityMode = compatibilityMode;
this.disableTableCreation = disableTableCreation;
this.tableCreator = tableCreator;
client = tableServiceClientProvider.Client;
this.secondaryIndex = secondaryIndex;
}
Expand Down Expand Up @@ -156,14 +155,8 @@ async ValueTask<TableClient> GetTableClientAndCreateTableIfNotExists(IAzureTable
tableToReadFrom = storageSession.Table;
}

if (disableTableCreation || tableCreated.TryGetValue(tableToReadFrom.Name, out var isTableCreated) ||
isTableCreated)
{
return tableToReadFrom;
}
await tableCreator.CreateTableIfNotExists(tableToReadFrom, cancellationToken).ConfigureAwait(false);

await tableToReadFrom.CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
tableCreated[tableToReadFrom.Name] = true;
return tableToReadFrom;
}

Expand Down Expand Up @@ -196,11 +189,10 @@ public Task Complete(IContainSagaData sagaData, ISynchronizedStorageSession sess
static TableEntityPartitionKey GetPartitionKey(ContextBag context, Guid sagaDataId)
=> !context.TryGet<TableEntityPartitionKey>(out var partitionKey) ? new TableEntityPartitionKey(sagaDataId.ToString()) : partitionKey;

readonly bool disableTableCreation;
readonly TableCreator tableCreator;
readonly TableServiceClient client;
readonly SecondaryIndex secondaryIndex;
const string SecondaryIndexIndicatorProperty = "NServiceBus_2ndIndexKey";
static readonly ConcurrentDictionary<string, bool> tableCreated = new();
readonly bool compatibilityMode;
readonly string conventionalTablePrefix;
readonly JsonSerializer jsonSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected override void Setup(FeatureConfigurationContext context)
var writerCreator = context.Settings.Get<Func<TextWriter, JsonWriter>>(WellKnownConfigurationKeys.SagaWriterCreator);

context.Services.AddSingleton<ISagaPersister>(provider => new AzureSagaPersister(provider.GetRequiredService<IProvideTableServiceClient>(),
installerSettings.Disabled, compatibilityModeEnabled, secondaryIndices, conventionalTablePrefix, jsonSerializer, readerCreator, writerCreator));
provider.GetRequiredService<TableCreator>(), compatibilityModeEnabled, secondaryIndices, conventionalTablePrefix, jsonSerializer, readerCreator, writerCreator));
}

static readonly ILog Logger = LogManager.GetLogger<SagaStorage>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ protected override void Setup(FeatureConfigurationContext context)
new AzureStorageSynchronizedStorageSession(provider.GetRequiredService<TableClientHolderResolver>()));
context.Services.AddScoped(provider => provider.GetRequiredService<ICompletableSynchronizedStorageSession>().AzureTablePersistenceSession());

context.Settings.TryGet(out SynchronizedStorageInstallerSettings installerSettings);
context.Services.AddSingleton(new TableCreator(installerSettings != null && installerSettings.Disabled));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
namespace NServiceBus.Persistence.AzureTable
{
using System;
using Azure.Data.Tables;

sealed class TableClientHolder
{
public TableClientHolder(TableClient tableClient) => TableClient = tableClient;
public TableClientHolder(TableClient tableClient) => TableClient = tableClient ?? throw new ArgumentNullException(nameof(tableClient));

public TableClient TableClient { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace NServiceBus.Persistence.AzureTable
{
using Azure.Data.Tables;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;

class TableCreator
{
readonly ConcurrentDictionary<string, bool> createdTables = new();
readonly bool tableCreationDisabled;

public TableCreator(bool tableCreationDisabled)
{
this.tableCreationDisabled = tableCreationDisabled;
}

public async ValueTask CreateTableIfNotExists(TableClient tableClient, CancellationToken cancellationToken = default)
{
if (tableCreationDisabled || createdTables.ContainsKey(tableClient.Name))
{
return;
}

await tableClient.CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
createdTables[tableClient.Name] = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ public class ConfigureEndpointAzureStoragePersistence : IConfigureEndpointTestEx
{
Task IConfigureEndpointTestExecution.Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
{
var persistence = configuration
.UsePersistence<AzureTablePersistence, StorageType.Sagas>()
.DisableTableCreation()
var sagaPersistence = configuration.UsePersistence<AzureTablePersistence, StorageType.Sagas>()
.UseTableServiceClient(SetupFixture.TableServiceClient);

persistence.DefaultTable(SetupFixture.TableName);
sagaPersistence.DefaultTable(SetupFixture.TableName);

if (endpointName != Conventions.EndpointNamingConvention(typeof(When_saga_started_concurrently.ConcurrentHandlerEndpoint)))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ public class ConfigureEndpointAzureTablePersistence : IConfigureEndpointTestExec
{
Task IConfigureEndpointTestExecution.Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
{
var persistence = configuration.UsePersistence<AzureTablePersistence, StorageType.Sagas>();
var sagaPersistence = configuration.UsePersistence<AzureTablePersistence, StorageType.Sagas>();
// backdoor for testing
persistence.GetSettings().Set("AzureSagaStorage.ConventionalTablePrefix", SetupFixture.TablePrefix);
sagaPersistence.GetSettings().Set("AzureSagaStorage.ConventionalTablePrefix", SetupFixture.TablePrefix);

persistence.UseTableServiceClient(SetupFixture.TableServiceClient);
sagaPersistence.UseTableServiceClient(SetupFixture.TableServiceClient);

var recoverabilitySettings = configuration.Recoverability();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ public Task Configure(CancellationToken cancellationToken = default)

SagaIdGenerator = new SagaIdGenerator();
var resolver = new TableClientHolderResolver(this, new TableInformation(SetupFixture.TableName));
var tableCreator = new TableCreator(true);
var secondaryIndices = new SecondaryIndex();
SagaStorage = new AzureSagaPersister(
this,
true,
tableCreator,
false,
secondaryIndices,
null,
JsonSerializer.Create(),
reader => new JsonTextReader(reader),
writer => new JsonTextWriter(writer));

OutboxStorage = new OutboxPersister(resolver);
OutboxStorage = new OutboxPersister(resolver, tableCreator);

GetContextBagForSagaStorage = () =>
{
Expand Down
Loading

0 comments on commit 27866b9

Please sign in to comment.