Skip to content

Commit

Permalink
feat: Idempotency for MessageCommands (#92)
Browse files Browse the repository at this point in the history
Co-authored-by: Lasse Ringgren Nielsen <sixxth@gmail.com>
  • Loading branch information
dstenroejl and lasrinnil authored Jan 15, 2025
1 parent 99c60c9 commit db2bd35
Show file tree
Hide file tree
Showing 46 changed files with 397 additions and 130 deletions.
5 changes: 5 additions & 0 deletions docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# ProcessManager.Client Release Notes

## Version 0.20.0

- Renamed `MessageCommand` property `MessageId` to `IdempotencyKey`.
- The Process Manager will use the `IdempotencyKey` to handle idempotency for commands initiated using messages.

## Version 0.19.0

- Add `NotifyOrchestrationInstanceAsync` to `IProcessManagerMessageClient`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# ProcessManager.Orchestrations.Abstractions Release Notes

## Version 0.7.0

- Dependent NuGet packages updated

## Version 0.6.3

- No functional changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,25 @@ public abstract record StartOrchestrationInstanceMessageCommand<TInputParameterD
/// <param name="orchestrationDescriptionUniqueName">Uniquely identifies the orchestration description from which the
/// orchestration instance should be created.</param>
/// <param name="inputParameter">Contains the Durable Functions orchestration input parameter value.</param>
/// <param name="messageId">Id of the message that casued this command to be executed.</param>
/// <param name="idempotencyKey">
/// A value used by the Process Manager to ensure idempotency for a message command.
/// The producer of the <see cref="StartOrchestrationInstanceMessageCommand{TInputParameterDto}"/> should
/// create a key that is unique per command.</param>
public StartOrchestrationInstanceMessageCommand(
ActorIdentityDto operatingIdentity,
OrchestrationDescriptionUniqueNameDto orchestrationDescriptionUniqueName,
TInputParameterDto inputParameter,
string messageId)
string idempotencyKey)
: base(operatingIdentity, orchestrationDescriptionUniqueName, inputParameter)
{
MessageId = messageId;
IdempotencyKey = idempotencyKey;
}

/// <summary>
/// Id of the message that casued this command to be executed.
/// A value used by the Process Manager to ensure idempotency for a message command.
/// The producer of the <see cref="StartOrchestrationInstanceMessageCommand{TInputParameterDto}"/> should
/// create a key that is unique per command.
/// Max length is 1024 characters.
/// </summary>
public string MessageId { get; }
public string IdempotencyKey { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<PropertyGroup>
<PackageId>Energinet.DataHub.ProcessManager.Abstractions</PackageId>
<PackageVersion>0.19.0$(VersionSuffix)</PackageVersion>
<PackageVersion>0.20.0$(VersionSuffix)</PackageVersion>
<Title>DH3 Process Manager Abstractions library</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Energinet.DataHub.Core.FunctionApp.TestCommon" Version="7.1.1" />
<PackageReference Include="Energinet.DataHub.Core.FunctionApp.TestCommon" Version="7.2.1" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion source/ProcessManager.Client/ProcessManager.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<PropertyGroup>
<PackageId>Energinet.DataHub.ProcessManager.Client</PackageId>
<PackageVersion>0.19.0$(VersionSuffix)</PackageVersion>
<PackageVersion>0.20.0$(VersionSuffix)</PackageVersion>
<Title>DH3 Process Manager Client library</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private ServiceBusMessage CreateStartOrchestrationInstanceServiceBusMessage<TInp

var serviceBusMessage = startOrchestration.ToServiceBusMessage(
subject: command.OrchestrationDescriptionUniqueName.Name,
idempotencyKey: command.MessageId);
idempotencyKey: command.IdempotencyKey);

return serviceBusMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<ItemGroup>
<PackageReference Include="Energinet.DataHub.Core.App.WebApp" Version="14.0.2" />
<PackageReference Include="Energinet.DataHub.Core.FunctionApp.TestCommon" Version="7.1.1" />
<PackageReference Include="Energinet.DataHub.Core.FunctionApp.TestCommon" Version="7.2.1" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Energinet.DataHub.ProcessManager.Core.Domain.OrchestrationInstance;
using Energinet.DataHub.ProcessManager.Core.Tests.Fixtures;
using FluentAssertions;
using FluentAssertions.Execution;
using Microsoft.EntityFrameworkCore;
using NodaTime;

Expand All @@ -31,7 +32,7 @@ public ProcessManagerContextTests(ProcessManagerCoreFixture fixture)
}

[Fact]
public async Task Given_OrchestrationDescriptionAddedToDbContext_WhenRetrievingFromDatabase_HasCorrectValues()
public async Task Given_OrchestrationDescriptionAddedToDbContext_When_RetrievingFromDatabase_Then_HasCorrectValues()
{
// Arrange
var existingOrchestrationDescription = CreateOrchestrationDescription();
Expand All @@ -54,7 +55,7 @@ public async Task Given_OrchestrationDescriptionAddedToDbContext_WhenRetrievingF
}

[Fact]
public async Task Given_RecurringOrchestrationDescriptionAddedToDbContext_WhenRetrievingFromDatabase_HasCorrectValues()
public async Task Given_RecurringOrchestrationDescriptionAddedToDbContext_When_RetrievingFromDatabase_Then_HasCorrectValues()
{
// Arrange
var existingOrchestrationDescription = CreateOrchestrationDescription(recurringCronExpression: "0 0 * * *");
Expand All @@ -77,7 +78,7 @@ public async Task Given_RecurringOrchestrationDescriptionAddedToDbContext_WhenRe
}

[Fact]
public async Task Given_OrchestrationInstanceWithStepsAddedToDbContext_WhenRetrievingFromDatabase_HasCorrectValues()
public async Task Given_OrchestrationInstanceWithStepsAddedToDbContext_When_RetrievingFromDatabase_Then_HasCorrectValues()
{
// Arrange
var existingOrchestrationDescription = CreateOrchestrationDescription();
Expand All @@ -102,7 +103,85 @@ public async Task Given_OrchestrationInstanceWithStepsAddedToDbContext_WhenRetri
}

[Fact]
public async Task Given_OrchestrationInstanceWithStepsAddedToDbContext_WhenFilteringJsonColumn_ReturnsExpectedItem()
public async Task Given_OrchestrationInstanceWithUniqueIdempotencyKeyAddedToDbContext_When_RetrievingFromDatabase_Then_HasCorrectValues()
{
// Arrange
var existingOrchestrationDescription = CreateOrchestrationDescription();
var existingOrchestrationInstance = CreateOrchestrationInstance(
existingOrchestrationDescription,
idempotencyKey: new IdempotencyKey(Guid.NewGuid().ToString()));

await using (var writeDbContext = _fixture.DatabaseManager.CreateDbContext())
{
writeDbContext.OrchestrationDescriptions.Add(existingOrchestrationDescription);
writeDbContext.OrchestrationInstances.Add(existingOrchestrationInstance);
await writeDbContext.SaveChangesAsync();
}

// Act
await using var readDbContext = _fixture.DatabaseManager.CreateDbContext();
var orchestrationInstance = await readDbContext.OrchestrationInstances.FindAsync(existingOrchestrationInstance.Id);

// Assert
orchestrationInstance.Should()
.NotBeNull()
.And
.BeEquivalentTo(existingOrchestrationInstance);
}

[Fact]
public async Task Given_MultipleOrchestrationInstancesWithNullInIdempotencyKeyAddedToDbContext_When_SaveChangesAsync_Then_NoExceptionThrown()
{
// Arrange
var existingOrchestrationDescription = CreateOrchestrationDescription();
var newOrchestrationInstance01 = CreateOrchestrationInstance(
existingOrchestrationDescription,
idempotencyKey: null);
var newOrchestrationInstance02 = CreateOrchestrationInstance(
existingOrchestrationDescription,
idempotencyKey: null);

await using (var writeDbContext = _fixture.DatabaseManager.CreateDbContext())
{
writeDbContext.OrchestrationDescriptions.Add(existingOrchestrationDescription);
writeDbContext.OrchestrationInstances.Add(newOrchestrationInstance01);
writeDbContext.OrchestrationInstances.Add(newOrchestrationInstance02);
// Act
await writeDbContext.SaveChangesAsync();
}
}

[Fact]
public async Task Given_MultipleOrchestrationInstancesWithSameValueInIdempotencyKeyAddedToDbContext_When_SaveChangesAsync_Then_ThrowsExpectedException()
{
// Arrange
var idempotencyKey = new IdempotencyKey(Guid.NewGuid().ToString());
var existingOrchestrationDescription = CreateOrchestrationDescription();
var newOrchestrationInstance01 = CreateOrchestrationInstance(
existingOrchestrationDescription,
idempotencyKey: idempotencyKey);
var newOrchestrationInstance02 = CreateOrchestrationInstance(
existingOrchestrationDescription,
idempotencyKey: idempotencyKey);

await using (var writeDbContext = _fixture.DatabaseManager.CreateDbContext())
{
writeDbContext.OrchestrationDescriptions.Add(existingOrchestrationDescription);
writeDbContext.OrchestrationInstances.Add(newOrchestrationInstance01);
writeDbContext.OrchestrationInstances.Add(newOrchestrationInstance02);
// Act
var act = () => writeDbContext.SaveChangesAsync();
// Assert
using var assertionScope = new AssertionScope();
var ex = await act.Should()
.ThrowAsync<DbUpdateException>();
ex.Which!.InnerException!.Message
.Contains("Cannot insert duplicate key row in object 'pm.OrchestrationInstance' with unique index 'UX_OrchestrationInstance_IdempotencyKey'");
}
}

[Fact]
public async Task Given_OrchestrationInstanceWithStepsAddedToDbContext_When_FilteringJsonColumn_Then_ReturnsExpectedItem()
{
// Arrange
var expectedTestInt = 52;
Expand All @@ -128,7 +207,7 @@ public async Task Given_OrchestrationInstanceWithStepsAddedToDbContext_WhenFilte
}

[Fact]
public async Task Given_UserCanceledOrchestrationInstanceAddedToDbContext_WhenRetrievingFromDatabase_HasCorrectValues()
public async Task Given_UserCanceledOrchestrationInstanceAddedToDbContext_When_RetrievingFromDatabase_Then_HasCorrectValues()
{
// Arrange
var userIdentity = new UserIdentity(new UserId(Guid.NewGuid()), new ActorId(Guid.NewGuid()));
Expand Down Expand Up @@ -177,7 +256,12 @@ private static OrchestrationDescription CreateOrchestrationDescription(string? r
return orchestrationDescription;
}

private static OrchestrationInstance CreateOrchestrationInstance(OrchestrationDescription orchestrationDescription, OperatingIdentity? identity = default, Instant? runAt = default, int? testInt = default)
private static OrchestrationInstance CreateOrchestrationInstance(
OrchestrationDescription orchestrationDescription,
OperatingIdentity? identity = default,
Instant? runAt = default,
int? testInt = default,
IdempotencyKey? idempotencyKey = default)
{
var operatingIdentity = identity
?? new UserIdentity(
Expand All @@ -189,7 +273,8 @@ private static OrchestrationInstance CreateOrchestrationInstance(OrchestrationDe
orchestrationDescription,
skipStepsBySequence: [3],
clock: SystemClock.Instance,
runAt);
runAt,
idempotencyKey);

orchestrationInstance.ParameterValue.SetFromInstance(new TestOrchestrationParameter
{
Expand Down
Loading

0 comments on commit db2bd35

Please sign in to comment.