Skip to content

Commit

Permalink
Fix for GroupOnObservable OnCompleted handling (#938)
Browse files Browse the repository at this point in the history
* GroupOnObservable Fix (and Unit Tests)

* Remove extra tests

---------

Co-authored-by: Roland Pheasant <roland_pheasant@hotmail.com>
  • Loading branch information
dwcullop and RolandPheasant committed Sep 4, 2024
1 parent 55002ae commit 2c32489
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
26 changes: 19 additions & 7 deletions src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

using Bogus;
Expand Down Expand Up @@ -30,12 +32,14 @@ public class GroupOnObservableFixture : IDisposable
private readonly SourceCache<Person, string> _cache = new (p => p.UniqueKey);
private readonly ChangeSetAggregator<Person, string> _results;
private readonly GroupChangeSetAggregator<Person, string, Color> _groupResults;
private readonly Subject<Unit> _grouperShutdown;
private readonly Faker<Person> _faker;
private readonly Randomizer _randomizer = new(0x3141_5926);

public GroupOnObservableFixture()
{
_faker = Fakers.Person.Clone().WithSeed(_randomizer);
_grouperShutdown = new();
_results = _cache.Connect().AsAggregator();
_groupResults = _cache.Connect().GroupOnObservable(CreateFavoriteColorObservable).AsAggregator();
}
Expand Down Expand Up @@ -179,7 +183,7 @@ public void GroupingSequenceCompletesWhenEmpty()
}

[Fact]
public void AllSequencesCompleteWhenSourceIsDisposed()
public void AllSequencesShouldCompleteWhenSourceAndGroupingObservablesComplete()
{
// Arrange
_cache.AddOrUpdate(_faker.Generate(InitialCount));
Expand All @@ -190,6 +194,7 @@ public void AllSequencesCompleteWhenSourceIsDisposed()

// Act
_cache.Dispose();
_grouperShutdown.OnNext(Unit.Default);

// Assert
results.IsCompleted.Should().BeTrue();
Expand Down Expand Up @@ -243,9 +248,11 @@ public async Task ResultsContainsCorrectRegroupedValuesAsync()
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource)
[InlineData(false, false)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(true, true)]
public void ResultCompletesOnlyWhenSourceAndAllGroupingObservablesComplete(bool completeSource, bool completeGroups)
{
// Arrange
_cache.AddOrUpdate(_faker.Generate(InitialCount));
Expand All @@ -255,10 +262,14 @@ public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource)
{
_cache.Dispose();
}
if (completeGroups)
{
_grouperShutdown.OnNext(Unit.Default);
}

// Assert
_results.IsCompleted.Should().Be(completeSource);
_groupResults.IsCompleted.Should().Be(completeSource);
_groupResults.IsCompleted.Should().Be(completeGroups && completeSource);
}

[Fact]
Expand Down Expand Up @@ -311,6 +322,7 @@ public void Dispose()
_groupResults.Dispose();
_results.Dispose();
_cache.Dispose();
_grouperShutdown.Dispose();
}

private void RandomFavoriteColorChange()
Expand Down Expand Up @@ -342,6 +354,6 @@ private static void VerifyGroupingResults(ISourceCache<Person, string> cache, Ch
groupResults.Groups.Items.ForEach(group => group.Data.Count.Should().BeGreaterThan(0, "Empty groups should be removed"));
}

private static IObservable<Color> CreateFavoriteColorObservable(Person person, string key) =>
person.WhenPropertyChanged(p => p.FavoriteColor).Select(change => change.Value);
private IObservable<Color> CreateFavoriteColorObservable(Person person, string key) =>
person.WhenPropertyChanged(p => p.FavoriteColor).Select(change => change.Value).TakeUntil(_grouperShutdown);
}
7 changes: 4 additions & 3 deletions src/DynamicData/Cache/Internal/GroupOnObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ IObservable<TGroupKey> CreateGroupObservable(TObject item, TKey key) =>
// Next process the Grouping observables created for each item
var subMergeMany = shared
.MergeMany(CreateGroupObservable)
.SubscribeSafe(onError: observer.OnError);
.SubscribeSafe(
onError: observer.OnError,
onCompleted: observer.OnCompleted);
// Finally, emit the results
var subResults = shared
Expand All @@ -52,8 +54,7 @@ IObservable<TGroupKey> CreateGroupObservable(TObject item, TKey key) =>
grouper.EmitChanges(observer);
parentUpdate = false;
},
onError: observer.OnError,
onCompleted: observer.OnCompleted);
onError: observer.OnError);
return new CompositeDisposable(shared.Connect(), subMergeMany, subChanges, grouper);
});
Expand Down

0 comments on commit 2c32489

Please sign in to comment.