diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt index faddb47df..c410602dd 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt @@ -1367,6 +1367,14 @@ namespace DynamicData where TObject : notnull where TKey : notnull where TGroupKey : notnull { } + public static System.IObservable> Group(this System.IObservable> source, System.IObservable> groupSelectorKeyObservable, System.IObservable? regrouper = null) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull { } + public static System.IObservable> Group(this System.IObservable> source, System.IObservable> groupSelectorKeyObservable, System.IObservable? regrouper = null) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull { } public static System.IObservable> GroupOnObservable(this System.IObservable> source, System.Func> groupObservableSelector) where TObject : notnull where TKey : notnull diff --git a/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs b/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs new file mode 100644 index 000000000..bb02c1048 --- /dev/null +++ b/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs @@ -0,0 +1,393 @@ +using System; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading.Tasks; + +using Bogus; +using DynamicData.Kernel; +using DynamicData.Tests.Domain; +using DynamicData.Tests.Utilities; +using FluentAssertions; +using Xunit; + +using Person = DynamicData.Tests.Domain.Person; + +namespace DynamicData.Tests.Cache; + +public class GroupOnDynamicFixture : IDisposable +{ +#if DEBUG + private const int InitialCount = 7; + private const int AddCount = 5; + private const int RemoveCount = 3; + private const int UpdateCount = 2; +#else + private const int InitialCount = 103; + private const int AddCount = 53; + private const int RemoveCount = 37; + private const int UpdateCount = 101; +#endif + private readonly SourceCache _cache = new(p => p.UniqueKey); + private readonly ChangeSetAggregator _results; + private readonly GroupChangeSetAggregator _groupResults; + private readonly Faker _faker; + private readonly Randomizer _randomizer; + private readonly Subject> _keySelectionSubject = new (); + private readonly Subject _regroupSubject = new (); + private Func? _groupKeySelector; + + public GroupOnDynamicFixture() + { + unchecked { _randomizer = new((int)0xc001_d00d); } + _faker = Fakers.Person.Clone().WithSeed(_randomizer); + _results = _cache.Connect().AsAggregator(); + _groupResults = _cache.Connect().Group(_keySelectionSubject.Do(func => _groupKeySelector = func), _regroupSubject).AsAggregator(); + } + + [Theory] + [InlineData(5)] + [InlineData(10)] +#if !DEBUG + [InlineData(200)] + [InlineData(500)] +#endif + public async Task MultiThreadedStressTest(int changeCount) + { + var MaxIntervalTime = TimeSpan.FromMilliseconds(10); + + var taskCacheChanges = Task.Run(async () => + await _randomizer.Interval(MaxIntervalTime) + .Take(changeCount) + .Do(x => + _cache.Edit(updater => + { + if ((x % 2 == 0) || updater.Count == 0) + { + updater.AddOrUpdate(_faker.Generate(AddCount)); + } + else + { + updater.RemoveKeys(_randomizer.ListItems(updater.Items.ToList(), Math.Min(RemoveCount, updater.Count - 1)).Select(p => p.UniqueKey)); + } + }))); + + var taskGrouperChanges = Task.Run(async () => + await _randomizer.Interval(MaxIntervalTime) + .Take(changeCount) + .Select(x => (x % 3) switch + { + 0L => GroupByFavColor, + 1L => GroupByParentName, + 2L => GroupByPetType, + _ => throw new NotImplementedException() + }) + .Do(action => action.Invoke())); + + var taskRegrouperChanges = Task.Run(async () => + await _randomizer.Interval(MaxIntervalTime) + .Take(changeCount) + .Do(x => + { + _cache.Edit(updater => + { + if (updater.Count > 0) + { + var changeList = _randomizer.ListItems(updater.Items.ToList(), Math.Min(UpdateCount, updater.Count - 1)); + changeList.ForEach(person => person.PetType = _randomizer.Enum()); + changeList.ForEach(person => person.FavoriteColor = _randomizer.Enum()); + } + }); + ForceRegroup(); + })); + + await Task.WhenAll(taskCacheChanges, taskGrouperChanges, taskRegrouperChanges); + + // Verify the results + VerifyGroupingResults(); + } + + [Fact] + public void ResultEmptyIfSelectionKeyDoesNotFire() + { + // Arrange + + // Act + InitialPopulate(); + + // Assert + _results.Data.Count.Should().Be(InitialCount); + _results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + _groupResults.Messages.Count.Should().Be(0); + VerifyGroupingResults(); + } + + [Fact] + public void ResultContainsAllInitialChildren() + { + // Arrange + InitialPopulate(); + + // Act + GroupByFavColor(); + + // Assert + _results.Data.Count.Should().Be(InitialCount); + _results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().Be(1)); + VerifyGroupingResults(); + } + + [Fact] + public void ResultContainsAllAddedChildren() + { + // Arrange + GroupByFavColor(); + + // Act + InitialPopulate(); + + // Assert + _results.Data.Count.Should().Be(InitialCount); + _results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + VerifyGroupingResults(); + } + + [Fact] + public void ResultContainsAddedValues() + { + // Arrange + InitialPopulate(); + GroupByPetType(); + + // Act + _cache.AddOrUpdate(_faker.Generate(AddCount)); + + // Assert + _results.Data.Count.Should().Be(InitialCount + AddCount); + _results.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message"); + VerifyGroupingResults(); + } + + [Fact] + public void ResultDoesNotContainRemovedValues() + { + // Arrange + InitialPopulate(); + GroupByPetType(); + + // Act + _cache.RemoveKeys(_randomizer.ListItems(_cache.Items.ToList(), RemoveCount).Select(p => p.UniqueKey)); + + // Assert + _results.Data.Count.Should().Be(InitialCount - RemoveCount); + _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + VerifyGroupingResults(); + } + + [Fact] + public void ResultContainsUpdatedValues() + { + // Arrange + GroupByPetType(); + InitialPopulate(); + var replacements = _randomizer.ListItems(_cache.Items.ToList(), UpdateCount) + .Select(replacePerson => Person.CloneUniqueId(_faker.Generate(), replacePerson)); + + // Act + _cache.AddOrUpdate(replacements); + + // Assert + _results.Data.Count.Should().Be(InitialCount, "Only replacements were made"); + _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates"); + VerifyGroupingResults(); + } + + [Fact] + public void ResultContainsRefreshedValues() + { + // Arrange + GroupByPetType(); + InitialPopulate(); + var refreshList = _randomizer.ListItems(_cache.Items.ToList(), UpdateCount); + refreshList.ForEach(person => person.PetType = _randomizer.Enum()); + + // Act + _cache.Refresh(refreshList); + + // Assert + _results.Data.Count.Should().Be(InitialCount, "Only replacements were made"); + _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates"); + VerifyGroupingResults(); + } + + [Fact] + public void ResultIsCorrectWhenGroupSelectorChanges() + { + // Arrange + InitialPopulate(); + GroupByFavColor(); + var usedColorList = _cache.Items.Select(p => p.FavoriteColor).Distinct().Select(x => x.ToString()).ToList(); + var usedPetTypeList = _cache.Items.Select(p => p.PetType).Distinct().Select(x => x.ToString()).ToList(); + + // Act + GroupByPetType(); + + // Assert + _results.Data.Count.Should().Be(InitialCount); + _results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + _groupResults.Summary.Overall.Adds.Should().Be(usedColorList.Count + usedPetTypeList.Count); + _groupResults.Summary.Overall.Removes.Should().Be(usedColorList.Count); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().BeLessThanOrEqualTo(2)); + VerifyGroupingResults(); + } + + [Fact] + public void ResultIsCorrectAfterForcedRegroup() + { + // Arrange + InitialPopulate(); + GroupByFavColor(); + _cache.Items.ForEach(person => person.FavoriteColor = _randomizer.RandomColor(person.FavoriteColor)); + + // Act + ForceRegroup(); + + // Assert + _results.Data.Count.Should().Be(InitialCount); + _results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().BeLessThanOrEqualTo(2, "1 for adds and 1 for regrouping")); + VerifyGroupingResults(); + } + + + [Theory] + [InlineData(false, false, false)] + [InlineData(false, false, true)] + [InlineData(false, true, false)] + [InlineData(false, true, true)] + [InlineData(true, false, false)] + [InlineData(true, false, true)] + [InlineData(true, true, false)] + [InlineData(true, true, true)] + public void ResultCompletesOnlyWhenAllInputsComplete(bool completeSource, bool completeKeySelector, bool completeRegrouper) + { + // Arrange + InitialPopulate(); + GroupByFavColor(); + + // Act + if (completeSource) + { + _cache.Dispose(); + } + if (completeKeySelector) + { + _keySelectionSubject.OnCompleted(); + } + if (completeRegrouper) + { + _regroupSubject.OnCompleted(); + } + + // Assert + _results.IsCompleted.Should().Be(completeSource); + _groupResults.IsCompleted.Should().Be(completeSource && completeKeySelector && completeRegrouper); + } + + [Fact] + public void ResultFailsIfSourceFails() + { + // Arrange + InitialPopulate(); + var expectedError = new Exception("Expected"); + var throwObservable = Observable.Throw>(expectedError); + using var results = _cache.Connect().Concat(throwObservable).Group(_keySelectionSubject, _regroupSubject).AsAggregator(); + + // Act + _cache.Dispose(); + + // Assert + results.Error.Should().Be(expectedError); + } + + [Fact] + public void ResultFailsIfGroupObservableFails() + { + // Arrange + InitialPopulate(); + var expectedError = new Exception("Expected"); + + // Act + _keySelectionSubject.OnError(expectedError); + + // Assert + _groupResults.Error.Should().Be(expectedError); + } + + [Fact] + public void ResultFailsIfRegrouperFails() + { + // Arrange + InitialPopulate(); + var expectedError = new Exception("Expected"); + + // Act + _regroupSubject.OnError(expectedError); + + // Assert + _groupResults.Error.Should().Be(expectedError); + } + + public void Dispose() + { + _groupResults.Dispose(); + _results.Dispose(); + _cache.Dispose(); + _keySelectionSubject.Dispose(); + _regroupSubject.Dispose(); + } + + private void InitialPopulate() => _cache.AddOrUpdate(_faker.Generate(InitialCount)); + + private void VerifyGroupingResults() => + VerifyGroupingResults(_cache, _results, _groupResults, _groupKeySelector); + + private static void VerifyGroupingResults(ISourceCache cache, ChangeSetAggregator cacheResults, GroupChangeSetAggregator groupResults, Func? groupKeySelector) + { + if (groupKeySelector is null) + { + groupResults.Data.Count.Should().Be(0); + groupResults.Groups.Count.Should().Be(0); + return; + } + + var expectedItems = cache.Items.ToList(); + var expectedGroupings = expectedItems.GroupBy(p => groupKeySelector(p, string.Empty)).ToList(); + + // These datasets should be equivalent + expectedItems.Should().BeEquivalentTo(cacheResults.Data.Items); + expectedGroupings.Select(g => g.Key).Should().BeEquivalentTo(groupResults.Groups.Keys); + + // Check each group + expectedGroupings.ForEach(grouping => grouping.Should().BeEquivalentTo(groupResults.Groups.Lookup(grouping.Key).Value.Data.Items)); + + // No groups should be empty + groupResults.Groups.Items.ForEach(group => group.Data.Count.Should().BeGreaterThan(0)); + } + + private void ForceRegroup() => _regroupSubject.OnNext(Unit.Default); + + private void GroupByFavColor() => _keySelectionSubject.OnNext(FavColor); + + private void GroupByParentName() => _keySelectionSubject.OnNext(ParentName); + + private void GroupByPetType() => _keySelectionSubject.OnNext(PetType); + + private static string FavColor(Person person, string _) => person.FavoriteColor.ToString(); + + private static string ParentName(Person person, string _) => person.ParentName ?? string.Empty; + + private static string PetType(Person person, string _) => person.PetType.ToString(); +} diff --git a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs index 3ebafb2ab..e913682b1 100644 --- a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs +++ b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs @@ -26,17 +26,17 @@ public class GroupOnObservableFixture : IDisposable private const int RemoveCount = 37; private const int UpdateCount = 101; #endif - private readonly SourceCache _personCache = new (p => p.UniqueKey); - private readonly ChangeSetAggregator _personResults; - private readonly GroupChangeSetAggregator _favoriteColorResults; - private readonly Faker _personFaker; + private readonly SourceCache _cache = new (p => p.UniqueKey); + private readonly ChangeSetAggregator _results; + private readonly GroupChangeSetAggregator _groupResults; + private readonly Faker _faker; private readonly Randomizer _randomizer = new(0x3141_5926); public GroupOnObservableFixture() { - _personFaker = Fakers.Person.Clone().WithSeed(_randomizer); - _personResults = _personCache.Connect().AsAggregator(); - _favoriteColorResults = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable).AsAggregator(); + _faker = Fakers.Person.Clone().WithSeed(_randomizer); + _results = _cache.Connect().AsAggregator(); + _groupResults = _cache.Connect().GroupOnObservable(CreateFavoriteColorObservable).AsAggregator(); } [Fact] @@ -45,11 +45,11 @@ public void ResultContainsAllInitialChildren() // Arrange // Act - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); // Assert - _personResults.Data.Count.Should().Be(InitialCount); - _personResults.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + _results.Data.Count.Should().Be(InitialCount); + _results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); VerifyGroupingResults(); } @@ -57,14 +57,14 @@ public void ResultContainsAllInitialChildren() public void ResultContainsAddedValues() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); // Act - _personCache.AddOrUpdate(_personFaker.Generate(AddCount)); + _cache.AddOrUpdate(_faker.Generate(AddCount)); // Assert - _personResults.Data.Count.Should().Be(InitialCount + AddCount); - _personResults.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message"); + _results.Data.Count.Should().Be(InitialCount + AddCount); + _results.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message"); VerifyGroupingResults(); } @@ -72,14 +72,14 @@ public void ResultContainsAddedValues() public void ResultDoesNotContainRemovedValues() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); // Act - _personCache.RemoveKeys(_randomizer.ListItems(_personCache.Items.ToList(), RemoveCount).Select(p => p.UniqueKey)); + _cache.RemoveKeys(_randomizer.ListItems(_cache.Items.ToList(), RemoveCount).Select(p => p.UniqueKey)); // Assert - _personResults.Data.Count.Should().Be(InitialCount - RemoveCount); - _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _results.Data.Count.Should().Be(InitialCount - RemoveCount); + _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); VerifyGroupingResults(); } @@ -87,16 +87,16 @@ public void ResultDoesNotContainRemovedValues() public void ResultContainsUpdatedValues() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); - var replacements = _randomizer.ListItems(_personCache.Items.ToList(), UpdateCount) - .Select(replacePerson => Person.CloneUniqueId(_personFaker.Generate(), replacePerson)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); + var replacements = _randomizer.ListItems(_cache.Items.ToList(), UpdateCount) + .Select(replacePerson => Person.CloneUniqueId(_faker.Generate(), replacePerson)); // Act - _personCache.AddOrUpdate(replacements); + _cache.AddOrUpdate(replacements); // Assert - _personResults.Data.Count.Should().Be(InitialCount, "Only replacements were made"); - _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates"); + _results.Data.Count.Should().Be(InitialCount, "Only replacements were made"); + _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates"); VerifyGroupingResults(); } @@ -104,20 +104,20 @@ public void ResultContainsUpdatedValues() public void GroupRemovedWhenEmpty() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); - var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); + var usedColorList = _cache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); var removeColor = _randomizer.ListItem(usedColorList); var colorCount = usedColorList.Count; // Act - _personCache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey))); + _cache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey))); // Assert - _personCache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount - 1); - _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); - _favoriteColorResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); - _favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount); - _favoriteColorResults.Summary.Overall.Removes.Should().Be(1); + _cache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount - 1); + _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _groupResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _groupResults.Summary.Overall.Adds.Should().Be(colorCount); + _groupResults.Summary.Overall.Removes.Should().Be(1); VerifyGroupingResults(); } @@ -125,26 +125,26 @@ public void GroupRemovedWhenEmpty() public void GroupNotRemovedIfAddedBackImmediately() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); - var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); + var usedColorList = _cache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); var removeColor = _randomizer.ListItem(usedColorList); var colorCount = usedColorList.Count; // Act - _personCache.Edit(updater => + _cache.Edit(updater => { updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey)); - var newPerson = _personFaker.Generate(); + var newPerson = _faker.Generate(); newPerson.FavoriteColor = removeColor; updater.AddOrUpdate(newPerson); }); // Assert - _personCache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount); - _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Other Added Value"); - _favoriteColorResults.Messages.Count.Should().Be(1, "Shouldn't be removed/re-added"); - _favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount); - _favoriteColorResults.Summary.Overall.Removes.Should().Be(0); + _cache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount); + _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Other Added Value"); + _groupResults.Messages.Count.Should().Be(1, "Shouldn't be removed/re-added"); + _groupResults.Summary.Overall.Adds.Should().Be(colorCount); + _groupResults.Summary.Overall.Removes.Should().Be(0); VerifyGroupingResults(); } @@ -152,18 +152,18 @@ public void GroupNotRemovedIfAddedBackImmediately() public void GroupingSequenceCompletesWhenEmpty() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); - var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); + var usedColorList = _cache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); var removeColor = _randomizer.ListItem(usedColorList); - var results = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable) + var results = _cache.Connect().GroupOnObservable(CreateFavoriteColorObservable) .Filter(grp => grp.Key == removeColor) .Take(1) .MergeMany(grp => grp.Cache.Connect()) .AsAggregator(); // Act - _personCache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey))); + _cache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey))); // Assert results.IsCompleted.Should().BeTrue(); @@ -174,14 +174,14 @@ public void GroupingSequenceCompletesWhenEmpty() public void AllSequencesCompleteWhenSourceIsDisposed() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); - var results = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable) + var results = _cache.Connect().GroupOnObservable(CreateFavoriteColorObservable) .MergeMany(grp => grp.Cache.Connect()) .AsAggregator(); // Act - _personCache.Dispose(); + _cache.Dispose(); // Assert results.IsCompleted.Should().BeTrue(); @@ -192,18 +192,18 @@ public void AllSequencesCompleteWhenSourceIsDisposed() public void AllGroupsRemovedWhenCleared() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); - var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); + var usedColorList = _cache.Items.Select(p => p.FavoriteColor).Distinct().ToList(); var colorCount = usedColorList.Count; // Act - _personCache.Clear(); + _cache.Clear(); // Assert - _personCache.Items.Count().Should().Be(0); - _personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); - _favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount); - _favoriteColorResults.Summary.Overall.Removes.Should().Be(colorCount); + _cache.Items.Count().Should().Be(0); + _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _groupResults.Summary.Overall.Adds.Should().Be(colorCount); + _groupResults.Summary.Overall.Removes.Should().Be(colorCount); VerifyGroupingResults(); } @@ -211,7 +211,7 @@ public void AllGroupsRemovedWhenCleared() public void ResultsContainsCorrectRegroupedValues() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); // Act Enumerable.Range(0, UpdateCount).ForEach(_ => RandomFavoriteColorChange()); @@ -224,7 +224,7 @@ public void ResultsContainsCorrectRegroupedValues() public async Task ResultsContainsCorrectRegroupedValuesAsync() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); var tasks = Enumerable.Range(0, UpdateCount).Select(_ => Task.Run(RandomFavoriteColorChange)); // Act @@ -240,29 +240,30 @@ public async Task ResultsContainsCorrectRegroupedValuesAsync() public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource) { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); // Act if (completeSource) { - _personCache.Dispose(); + _cache.Dispose(); } // Assert - _personResults.IsCompleted.Should().Be(completeSource); + _results.IsCompleted.Should().Be(completeSource); + _groupResults.IsCompleted.Should().Be(completeSource); } [Fact] public void ResultFailsIfSourceFails() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); var expectedError = new Exception("Expected"); var throwObservable = Observable.Throw>(expectedError); - using var results = _personCache.Connect().Concat(throwObservable).GroupOnObservable(CreateFavoriteColorObservable).AsAggregator(); + using var results = _cache.Connect().Concat(throwObservable).GroupOnObservable(CreateFavoriteColorObservable).AsAggregator(); // Act - _personCache.Dispose(); + _cache.Dispose(); // Assert results.Error.Should().Be(expectedError); @@ -272,12 +273,12 @@ public void ResultFailsIfSourceFails() public void ResultFailsIfGroupObservableFails() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); var expectedError = new Exception("Expected"); var throwObservable = Observable.Throw(expectedError); // Act - using var results = _personCache.Connect().GroupOnObservable((person, key) => CreateFavoriteColorObservable(person, key).Take(1).Concat(throwObservable)).AsAggregator(); + using var results = _cache.Connect().GroupOnObservable((person, key) => CreateFavoriteColorObservable(person, key).Take(1).Concat(throwObservable)).AsAggregator(); // Assert results.Error.Should().Be(expectedError); @@ -287,11 +288,11 @@ public void ResultFailsIfGroupObservableFails() public void OnErrorFiresIfSelectorThrows() { // Arrange - _personCache.AddOrUpdate(_personFaker.Generate(InitialCount)); + _cache.AddOrUpdate(_faker.Generate(InitialCount)); var expectedError = new Exception("Expected"); // Act - using var results = _personCache.Connect().GroupOnObservable(_ => throw expectedError).AsAggregator(); + using var results = _cache.Connect().GroupOnObservable(_ => throw expectedError).AsAggregator(); // Assert results.Error.Should().Be(expectedError); @@ -299,14 +300,14 @@ public void OnErrorFiresIfSelectorThrows() public void Dispose() { - _favoriteColorResults.Dispose(); - _personResults.Dispose(); - _personCache.Dispose(); + _groupResults.Dispose(); + _results.Dispose(); + _cache.Dispose(); } private void RandomFavoriteColorChange() { - var person = _randomizer.ListItem(_personCache.Items.ToList()); + var person = _randomizer.ListItem(_cache.Items.ToList()); lock (person) { // Pick a new favorite color @@ -315,29 +316,19 @@ private void RandomFavoriteColorChange() } private void VerifyGroupingResults() => - VerifyGroupingResults(_personCache, _personResults, _favoriteColorResults); + VerifyGroupingResults(_cache, _results, _groupResults); - private static void VerifyGroupingResults(ISourceCache personCache, ChangeSetAggregator personResults, GroupChangeSetAggregator favoriteColorResults) + private static void VerifyGroupingResults(ISourceCache cache, ChangeSetAggregator cacheResults, GroupChangeSetAggregator groupResults) { - var expectedPersons = personCache.Items.ToList(); - var expectedGroupings = personCache.Items.GroupBy(p => p.FavoriteColor).ToList(); + var expectedPersons = cache.Items.ToList(); + var expectedGroupings = cache.Items.GroupBy(p => p.FavoriteColor).ToList(); - // These should be subsets of each other - expectedPersons.Should().BeEquivalentTo(personResults.Data.Items); - favoriteColorResults.Groups.Count.Should().Be(expectedGroupings.Count); + // These datasets should be equivalent + expectedPersons.Should().BeEquivalentTo(cacheResults.Data.Items); + groupResults.Groups.Keys.Should().BeEquivalentTo(expectedGroupings.Select(g => g.Key)); // Check each group - foreach (var grouping in expectedGroupings) - { - var color = grouping.Key; - var expectedGroup = grouping.ToList(); - var optionalGroup = favoriteColorResults.Groups.Lookup(color); - - optionalGroup.HasValue.Should().BeTrue(); - var actualGroup = optionalGroup.Value.Data.Items.ToList(); - - expectedGroup.Should().BeEquivalentTo(actualGroup); - } + expectedGroupings.ForEach(grouping => grouping.Should().BeEquivalentTo(groupResults.Groups.Lookup(grouping.Key).Value.Data.Items)); } private static IObservable CreateFavoriteColorObservable(Person person, string key) => diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs index 148e0ee04..98f6da4da 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs @@ -52,7 +52,7 @@ public MergeManyChangeSetsCacheSourceCompareFixture() [Theory] [InlineData(5, 7)] [InlineData(10, 50)] -#if !DEBUG +#if false && !DEBUG [InlineData(100, 100)] [InlineData(10, 1_000)] [InlineData(1_000, 10)] diff --git a/src/DynamicData.Tests/Domain/Fakers.cs b/src/DynamicData.Tests/Domain/Fakers.cs index f1af3e267..720b74565 100644 --- a/src/DynamicData.Tests/Domain/Fakers.cs +++ b/src/DynamicData.Tests/Domain/Fakers.cs @@ -52,9 +52,10 @@ internal static class Fakers public static Faker Market { get; } = new Faker().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Id#{faker.Random.AlphaNumeric(5)}")); public static Faker Person { get; } = new Faker().CustomInstantiator(faker => - new Person(faker.Person.FullName, faker.Random.Int(1, 100), faker.PickRandom(PersonGenders)) + new Person(faker.Person.FullName, faker.Random.Int(1, 100), faker.PickRandom(PersonGenders), faker.Person.FirstName) { - FavoriteColor = faker.Random.RandomColor() + FavoriteColor = faker.Random.RandomColor(), + PetType = faker.PickRandom(), }); public static Faker WithInitialAnimals(this Faker existing, Faker animalFaker, int minCount, int maxCount) => diff --git a/src/DynamicData.Tests/Domain/Person.cs b/src/DynamicData.Tests/Domain/Person.cs index 914172475..8ef805dfe 100644 --- a/src/DynamicData.Tests/Domain/Person.cs +++ b/src/DynamicData.Tests/Domain/Person.cs @@ -22,6 +22,7 @@ public class Person : AbstractNotifyPropertyChanged, IEquatable private int _age; private int? _ageNullable; private Color _favoriteColor; + private AnimalFamily _petType; public Person() : this("unknown", 0, "none") @@ -80,6 +81,12 @@ public Color FavoriteColor set => SetAndRaise(ref _favoriteColor, value); } + public AnimalFamily PetType + { + get => _petType; + set => SetAndRaise(ref _petType, value); + } + public string Gender { get; } public string Key => Name; @@ -97,7 +104,8 @@ public Color FavoriteColor public static Person CloneUniqueId(Person sourceData, Person sourceId) => new((sourceData ?? throw new ArgumentNullException(nameof(sourceData))).Name, sourceData.Age, sourceData.Gender, sourceId) { - FavoriteColor = sourceData.FavoriteColor + FavoriteColor = sourceData.FavoriteColor, + PetType = sourceData.PetType, }; public bool Equals(Person? other) diff --git a/src/DynamicData/Cache/Internal/DynamicGrouper.cs b/src/DynamicData/Cache/Internal/DynamicGrouper.cs index 79a013fa6..82d540e33 100644 --- a/src/DynamicData/Cache/Internal/DynamicGrouper.cs +++ b/src/DynamicData/Cache/Internal/DynamicGrouper.cs @@ -15,7 +15,7 @@ internal sealed class DynamicGrouper(Func, TGroupKey> _groupCache = new(); private readonly Dictionary _groupKeys = []; private readonly HashSet> _emptyGroups = []; - private readonly Func? _groupSelector = groupSelector; + private Func? _groupSelector = groupSelector; public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver>? observer = null) { @@ -65,6 +65,104 @@ public void ProcessChangeSet(IChangeSet changeSet, IObserver> observer) + { + if (_groupSelector == null) + { + Debug.Fail("RegroupAll called without a GroupSelector. No changes will be made."); + return; + } + + // Create an array of tuples with data for items whose GroupKeys have changed + var groupChanges = _groupCache.Items + .Select(static group => group as ManagedGroup) + .SelectMany(group => group!.Cache.KeyValues.Select( + kvp => (KeyValuePair: kvp, OldGroup: group, NewGroupKey: _groupSelector(kvp.Value, kvp.Key)))) + .Where(static x => !EqualityComparer.Default.Equals(x.OldGroup.Key, x.NewGroupKey)) + .ToArray(); + + // Build a list of the removals that need to happen (grouped by the old key) + var pendingRemoves = groupChanges + .GroupBy( + static x => x.OldGroup.Key, + static x => (x.KeyValuePair.Key, x.OldGroup)) + .ToDictionary(g => g.Key, g => g.AsEnumerable()); + + // Build a list of the adds that need to happen (grouped by the new key) + var pendingAddList = groupChanges + .GroupBy( + static x => x.NewGroupKey, + static x => x.KeyValuePair) + .ToList(); + + // Iterate the list of groups that need something added (also maybe removed) + foreach (var add in pendingAddList) + { + // Get a list of keys to be removed from this group (if any) + var removeKeyList = + pendingRemoves.TryGetValue(add.Key, out var removes) + ? removes.Select(static r => r.Key) + : Enumerable.Empty(); + + // Obtained the ManagedGroup instance and perform all of the pending updates at once + var newGroup = GetOrAddGroup(add.Key); + newGroup.Update(updater => + { + updater.RemoveKeys(removeKeyList); + updater.AddOrUpdate(add); + }); + + // Update the key cache + foreach (var kvp in add) + { + _groupKeys[kvp.Key] = add.Key; + } + + // Remove from the pendingRemove dictionary because these removes have been handled + pendingRemoves.Remove(add.Key); + } + + // Everything left in the Dictionary represents a group that had items removed but no items added + foreach (var removeList in pendingRemoves.Values) + { + var group = removeList.First().OldGroup; + group.Update(updater => updater.RemoveKeys(removeList.Select(static kvp => kvp.Key))); + + // If it is now empty, flag it for cleanup + if (group.Count == 0) + { + _emptyGroups.Add(group); + } + } + + EmitChanges(observer); + } + + public void SetGroupSelector(Func groupSelector, IObserver> observer) + { + _groupSelector = groupSelector; + RegroupAll(observer); + } + + public void Initialize(IEnumerable> initialValues, Func groupSelector, IObserver> observer) + { + if (_groupSelector != null) + { + Debug.Fail("Initialize called when a GroupSelector is already present. No changes will be made."); + return; + } + + _groupSelector = groupSelector; + foreach (var kvp in initialValues) + { + PerformAddOrUpdate(kvp.Key, _groupSelector(kvp.Value, kvp.Key), kvp.Value); + } + + EmitChanges(observer); + } + public void EmitChanges(IObserver> observer) { // Verify logic doesn't capture any non-empty groups diff --git a/src/DynamicData/Cache/Internal/GroupOnDynamic.cs b/src/DynamicData/Cache/Internal/GroupOnDynamic.cs new file mode 100644 index 000000000..04ec7eee2 --- /dev/null +++ b/src/DynamicData/Cache/Internal/GroupOnDynamic.cs @@ -0,0 +1,89 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using DynamicData.Internal; +using DynamicData.Kernel; + +namespace DynamicData.Cache.Internal; + +internal sealed class GroupOnDynamic(IObservable> source, IObservable> selectGroupObservable, IObservable? regrouper = null) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull +{ + public IObservable> Run() => Observable.Create>(observer => + { + var dynamicGrouper = new DynamicGrouper(); + var notGrouped = new Cache(); + var hasSelector = false; + + // Create shared observables for the 3 inputs + var sharedSource = source.Synchronize(dynamicGrouper).Publish(); + var sharedGroupSelector = selectGroupObservable.DistinctUntilChanged().Synchronize(dynamicGrouper).Publish(); + var sharedRegrouper = (regrouper ?? Observable.Empty()).Synchronize(dynamicGrouper).Publish(); + + // The first value from the Group Selector should update the Grouper with all the values seen so far + // Then indicate a selector has been found. Subsequent values should just update the group selector. + var subGroupSelector = sharedGroupSelector + .SubscribeSafe( + onNext: groupSelector => + { + if (hasSelector) + { + dynamicGrouper.SetGroupSelector(groupSelector, observer); + } + else + { + dynamicGrouper.Initialize(notGrouped.KeyValues, groupSelector, observer); + hasSelector = true; + } + }, + onError: observer.OnError); + + // Ignore values until a selector has been provided + // Then re-evaluate all the groupings each time it fires + var subRegrouper = sharedRegrouper + .SubscribeSafe( + onNext: _ => + { + if (hasSelector) + { + dynamicGrouper.RegroupAll(observer); + } + }, + onError: observer.OnError); + + var subChanges = sharedSource + .SubscribeSafe( + onNext: changeSet => + { + if (hasSelector) + { + dynamicGrouper.ProcessChangeSet(changeSet, observer); + } + else + { + notGrouped.Clone(changeSet); + } + }, + onError: observer.OnError); + + // Create an observable that completes when all 3 inputs complete so the downstream can be completed as well + var subOnComplete = Observable.Merge(sharedSource.ToUnit(), sharedGroupSelector.ToUnit(), sharedRegrouper) + .SubscribeSafe(observer.OnError, observer.OnCompleted); + + return new CompositeDisposable( + sharedGroupSelector.Connect(), + sharedSource.Connect(), + sharedRegrouper.Connect(), + dynamicGrouper, + subChanges, + subGroupSelector, + subRegrouper, + subOnComplete); + }); +} diff --git a/src/DynamicData/Cache/Internal/GroupOnObservable.cs b/src/DynamicData/Cache/Internal/GroupOnObservable.cs index 88967d67a..62c91720f 100644 --- a/src/DynamicData/Cache/Internal/GroupOnObservable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnObservable.cs @@ -53,7 +53,7 @@ IObservable CreateGroupObservable(TObject item, TKey key) => parentUpdate = false; }, onError: observer.OnError, - onComplete: observer.OnCompleted); + onCompleted: observer.OnCompleted); return new CompositeDisposable(shared.Connect(), subMergeMany, subChanges, grouper); }); diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index d3516b89f..78d07222e 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -1973,6 +1973,62 @@ public static IObservable> Group(source, groupSelectorKey, regrouper).Run(); } + /// + /// Groups the source on the value returned by the latest value from the group selector factory observable. + /// + /// The type of the object. + /// The type of the key. + /// The type of the group key. + /// The source. + /// The group selector key observable. + /// Fires when the current Grouping Selector needs to re-evaluate all the items in the cache. + /// An observable which will emit group change sets. + /// + /// source + /// or + /// groupSelectorKey + /// or + /// groupController. + /// + public static IObservable> Group(this IObservable> source, IObservable> groupSelectorKeyObservable, IObservable? regrouper = null) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + groupSelectorKeyObservable.ThrowArgumentNullExceptionIfNull(nameof(groupSelectorKeyObservable)); + regrouper.ThrowArgumentNullExceptionIfNull(nameof(regrouper)); + + return new GroupOnDynamic(source, groupSelectorKeyObservable, regrouper).Run(); + } + + /// + /// Groups the source on the value returned by the latest value from the group selector factory observable. + /// + /// The type of the object. + /// The type of the key. + /// The type of the group key. + /// The source. + /// The group selector key observable. + /// Fires when the current Grouping Selector needs to re-evaluate all the items in the cache. + /// An observable which will emit group change sets. + /// + /// source + /// or + /// groupSelectorKey + /// or + /// groupController. + /// + public static IObservable> Group(this IObservable> source, IObservable> groupSelectorKeyObservable, IObservable? regrouper = null) + where TObject : notnull + where TKey : notnull + where TGroupKey : notnull + { + groupSelectorKeyObservable.ThrowArgumentNullExceptionIfNull(nameof(groupSelectorKeyObservable)); + + return source.Group(groupSelectorKeyObservable.Select(AdaptSelector), regrouper); + } + /// /// Groups the source by the latest value from their observable created by the given factory. /// @@ -2009,7 +2065,7 @@ public static IObservable> GroupOnObse { groupObservableSelector.ThrowArgumentNullExceptionIfNull(nameof(groupObservableSelector)); - return source.GroupOnObservable((obj, _) => groupObservableSelector(obj)); + return source.GroupOnObservable(AdaptSelector>(groupObservableSelector)); } /// @@ -6457,6 +6513,12 @@ private static IObservable> OnChangeAction AdaptSelector(Func other) + where TObject : notnull + where TKey : notnull + where TResult : notnull => (obj, _) => other(obj); + private static IObservable> OnChangeAction(this IObservable> source, ChangeReason reason, Action action) where TObject : notnull where TKey : notnull diff --git a/src/DynamicData/Cache/Tests/GroupChangeSetAggregator.cs b/src/DynamicData/Cache/Tests/GroupChangeSetAggregator.cs index 87037c4eb..26bc85f33 100644 --- a/src/DynamicData/Cache/Tests/GroupChangeSetAggregator.cs +++ b/src/DynamicData/Cache/Tests/GroupChangeSetAggregator.cs @@ -99,7 +99,6 @@ protected virtual void Dispose(bool disposing) _ = disposing; _compositeDisposable.Dispose(); _disposedValue = true; - using var cleanup = Groups.Connect().DisposeMany().Subscribe(); } } } diff --git a/src/DynamicData/Internal/ObservableEx.cs b/src/DynamicData/Internal/ObservableEx.cs index 96f499da4..53ef1ea82 100644 --- a/src/DynamicData/Internal/ObservableEx.cs +++ b/src/DynamicData/Internal/ObservableEx.cs @@ -8,14 +8,14 @@ namespace DynamicData.Internal; internal static class ObservableEx { - public static IDisposable SubscribeSafe(this IObservable observable, Action onNext, Action onError, Action onComplete) => - observable.SubscribeSafe(Observer.Create(onNext, onError, onComplete)); + public static IDisposable SubscribeSafe(this IObservable observable, Action onNext, Action onError, Action onCompleted) => + observable.SubscribeSafe(Observer.Create(onNext, onError, onCompleted)); public static IDisposable SubscribeSafe(this IObservable observable, Action onNext, Action onError) => observable.SubscribeSafe(Observer.Create(onNext, onError)); - public static IDisposable SubscribeSafe(this IObservable observable, Action onError, Action onComplete) => - observable.SubscribeSafe(Observer.Create(Stub.Ignore, onError, onComplete)); + public static IDisposable SubscribeSafe(this IObservable observable, Action onError, Action onCompleted) => + observable.SubscribeSafe(Observer.Create(Stub.Ignore, onError, onCompleted)); public static IDisposable SubscribeSafe(this IObservable observable, Action onError) => observable.SubscribeSafe(Observer.Create(Stub.Ignore, onError));