Skip to content

Commit 7330c41

Browse files
erdtsieckjeremydmiller
authored andcommittedFeb 14, 2025
#3661 Custom projection session gets disposed after first await in slicer
1 parent a0b339a commit 7330c41

File tree

2 files changed

+78
-4
lines changed

2 files changed

+78
-4
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using EventSourcingTests.Aggregation;
7+
using EventSourcingTests.Examples;
8+
using JasperFx.Core;
9+
using Marten;
10+
using Marten.Events;
11+
using Marten.Events.Aggregation;
12+
using Marten.Events.Projections;
13+
using Marten.Internal.Sessions;
14+
using Marten.Storage;
15+
using Marten.Testing.Harness;
16+
using Shouldly;
17+
using Xunit;
18+
19+
namespace EventSourcingTests.Bugs;
20+
21+
public class Bug_3661_await_custom_projection_slicing : OneOffConfigurationsContext
22+
{
23+
[Fact]
24+
public async Task fetching_multiple_items_from_slicers_in_async_custom_projection()
25+
{
26+
StoreOptions(opts => opts.Projections.Add(new StartAndStopIteratingAwaitablesSlicedProjection(), ProjectionLifecycle.Async));
27+
28+
var stream = Guid.NewGuid();
29+
theSession.Store(new Document1 { Id = stream });
30+
theSession.Events.StartStream(stream, new Start(), new Increment(), new Increment());
31+
32+
var stream2 = Guid.NewGuid();
33+
theSession.Store(new Document1 { Id = stream2 });
34+
theSession.Events.StartStream(stream2, new Start(), new Increment(), new Increment());
35+
await theSession.SaveChangesAsync();
36+
37+
using var daemon = await theStore.BuildProjectionDaemonAsync();
38+
await daemon.StartAllAsync();
39+
await daemon.WaitForNonStaleData(20.Seconds());
40+
41+
var aggregate = await theSession.LoadAsync<StartAndStopAggregate>(stream);
42+
aggregate.Count.ShouldBe(2);
43+
var aggregate2 = await theSession.LoadAsync<StartAndStopAggregate>(stream2);
44+
aggregate2.Count.ShouldBe(2);
45+
}
46+
}
47+
48+
public class StartAndStopIteratingAwaitablesSlicedProjection: CustomProjection<StartAndStopAggregate, Guid>, IEventSlicer<StartAndStopAggregate, Guid>
49+
{
50+
public StartAndStopIteratingAwaitablesSlicedProjection()
51+
{
52+
UseCustomSlicer(this);
53+
IncludeType<Start>();
54+
IncludeType<Increment>();
55+
}
56+
57+
public override ValueTask ApplyChangesAsync(DocumentSessionBase session,
58+
EventSlice<StartAndStopAggregate, Guid> slice, CancellationToken cancellation,
59+
ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline) =>
60+
new StartAndStopProjection().ApplyChangesAsync(session, slice, cancellation, lifecycle);
61+
62+
public ValueTask<IReadOnlyList<EventSlice<StartAndStopAggregate, Guid>>> SliceInlineActions(IQuerySession querySession, IEnumerable<StreamAction> streams) => throw new NotImplementedException();
63+
64+
public async ValueTask<IReadOnlyList<TenantSliceGroup<StartAndStopAggregate, Guid>>> SliceAsyncEvents(IQuerySession querySession, List<IEvent> events)
65+
{
66+
var aggregateId = events.First().StreamId;
67+
var group = new TenantSliceGroup<StartAndStopAggregate, Guid>(Tenant.ForDatabase(querySession.Database));
68+
foreach (var @event in events)
69+
{
70+
await querySession.LoadAsync<Document1>(@event.StreamId);
71+
group.AddEvent(@event.StreamId, @event);
72+
}
73+
return [group];
74+
}
75+
}

‎src/Marten/Events/Aggregation/CustomProjection.cs

+3-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
using Marten.Events.Daemon.Internals;
1111
using Marten.Events.Projections;
1212
using Marten.Exceptions;
13-
using Marten.Internal;
1413
using Marten.Internal.Sessions;
1514
using Marten.Internal.Storage;
1615
using Marten.Schema;
@@ -203,12 +202,12 @@ public virtual bool IsNew(EventSlice<TDoc, TId> slice)
203202
/// <param name="range"></param>
204203
/// <param name="cancellation"></param>
205204
/// <returns></returns>
206-
public ValueTask<IReadOnlyList<TenantSliceGroup<TDoc, TId>>> GroupEventRange(DocumentStore store,
205+
public async ValueTask<IReadOnlyList<TenantSliceGroup<TDoc, TId>>> GroupEventRange(DocumentStore store,
207206
IMartenDatabase database,
208207
EventRange range, CancellationToken cancellation)
209208
{
210-
using var session = store.LightweightSession(SessionOptions.ForDatabase(database));
211-
return Slicer.SliceAsyncEvents(session, range.Events);
209+
await using var session = store.LightweightSession(SessionOptions.ForDatabase(database));
210+
return await Slicer.SliceAsyncEvents(session, range.Events).ConfigureAwait(false);
212211
}
213212

214213
Type IReadOnlyProjectionData.ProjectionType => GetType();

0 commit comments

Comments
 (0)