From 268dcac1800445f8ec81800ea6459d577971d123 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 19 Feb 2024 11:10:34 -0800 Subject: [PATCH 1/5] - Updated DynamicGrouper to leverager Deferred Updates - Added Unit Tests to verify ChangeSets are minimized --- .../Cache/GroupOnDynamicFixture.cs | 13 +- .../Cache/GroupOnObservableFixture.cs | 16 +- .../Cache/Internal/DynamicGrouper.cs | 163 +++++++++--------- .../Cache/Internal/ManagedGroup.cs | 2 + 4 files changed, 99 insertions(+), 95 deletions(-) diff --git a/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs b/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs index bb02c1048..139c63879 100644 --- a/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs +++ b/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs @@ -34,16 +34,15 @@ public class GroupOnDynamicFixture : IDisposable private readonly GroupChangeSetAggregator _groupResults; private readonly Faker _faker; private readonly Randomizer _randomizer; - private readonly Subject> _keySelectionSubject = new (); + private readonly BehaviorSubject> _keySelectionSubject = new (ParentName); private readonly Subject _regroupSubject = new (); - private Func? _groupKeySelector; public GroupOnDynamicFixture() { unchecked { _randomizer = new((int)0xc001_d00d); } _faker = Fakers.Person.Clone().WithSeed(_randomizer); _results = _cache.Connect().AsAggregator(); - _groupResults = _cache.Connect().Group(_keySelectionSubject.Do(func => _groupKeySelector = func), _regroupSubject).AsAggregator(); + _groupResults = _cache.Connect().Group(_keySelectionSubject, _regroupSubject).AsAggregator(); } [Theory] @@ -151,6 +150,7 @@ public void ResultContainsAllAddedChildren() // Assert _results.Data.Count.Should().Be(InitialCount); _results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().Be(1)); VerifyGroupingResults(); } @@ -167,6 +167,7 @@ public void ResultContainsAddedValues() // Assert _results.Data.Count.Should().Be(InitialCount + AddCount); _results.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().BeLessThanOrEqualTo(2)); VerifyGroupingResults(); } @@ -183,6 +184,7 @@ public void ResultDoesNotContainRemovedValues() // Assert _results.Data.Count.Should().Be(InitialCount - RemoveCount); _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().BeLessThanOrEqualTo(2)); VerifyGroupingResults(); } @@ -201,6 +203,7 @@ public void ResultContainsUpdatedValues() // Assert _results.Data.Count.Should().Be(InitialCount, "Only replacements were made"); _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().BeLessThanOrEqualTo(2)); VerifyGroupingResults(); } @@ -352,7 +355,7 @@ public void Dispose() private void InitialPopulate() => _cache.AddOrUpdate(_faker.Generate(InitialCount)); private void VerifyGroupingResults() => - VerifyGroupingResults(_cache, _results, _groupResults, _groupKeySelector); + VerifyGroupingResults(_cache, _results, _groupResults, _keySelectionSubject.Value); private static void VerifyGroupingResults(ISourceCache cache, ChangeSetAggregator cacheResults, GroupChangeSetAggregator groupResults, Func? groupKeySelector) { @@ -374,7 +377,7 @@ private static void VerifyGroupingResults(ISourceCache cache, Ch expectedGroupings.ForEach(grouping => grouping.Should().BeEquivalentTo(groupResults.Groups.Lookup(grouping.Key).Value.Data.Items)); // No groups should be empty - groupResults.Groups.Items.ForEach(group => group.Data.Count.Should().BeGreaterThan(0)); + groupResults.Groups.Items.ForEach(group => group.Data.Count.Should().BeGreaterThan(0, "Empty groups should be removed")); } private void ForceRegroup() => _regroupSubject.OnNext(Unit.Default); diff --git a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs index e913682b1..26009108a 100644 --- a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs +++ b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs @@ -1,15 +1,16 @@ using System; using System.Linq; +using System.Reactive.Linq; +using System.Threading.Tasks; + using Bogus; -using DynamicData.Tests.Domain; using DynamicData.Binding; -using System.Reactive.Linq; +using DynamicData.Kernel; +using DynamicData.Tests.Domain; using FluentAssertions; using Xunit; using Person = DynamicData.Tests.Domain.Person; -using System.Threading.Tasks; -using DynamicData.Kernel; namespace DynamicData.Tests.Cache; @@ -50,6 +51,7 @@ public void ResultContainsAllInitialChildren() // Assert _results.Data.Count.Should().Be(InitialCount); _results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().Be(1)); VerifyGroupingResults(); } @@ -65,6 +67,7 @@ public void ResultContainsAddedValues() // Assert _results.Data.Count.Should().Be(InitialCount + AddCount); _results.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().BeLessThanOrEqualTo(2)); VerifyGroupingResults(); } @@ -80,6 +83,7 @@ public void ResultDoesNotContainRemovedValues() // Assert _results.Data.Count.Should().Be(InitialCount - RemoveCount); _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().BeLessThanOrEqualTo(2)); VerifyGroupingResults(); } @@ -97,6 +101,7 @@ public void ResultContainsUpdatedValues() // Assert _results.Data.Count.Should().Be(InitialCount, "Only replacements were made"); _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates"); + _groupResults.Groups.Items.ForEach(group => group.Messages.Count.Should().BeLessThanOrEqualTo(2)); VerifyGroupingResults(); } @@ -329,6 +334,9 @@ private static void VerifyGroupingResults(ISourceCache cache, Ch // Check each group expectedGroupings.ForEach(grouping => grouping.Should().BeEquivalentTo(groupResults.Groups.Lookup(grouping.Key).Value.Data.Items)); + + // No groups should be empty + groupResults.Groups.Items.ForEach(group => group.Data.Count.Should().BeGreaterThan(0, "Empty groups should be removed")); } private static IObservable CreateFavoriteColorObservable(Person person, string key) => diff --git a/src/DynamicData/Cache/Internal/DynamicGrouper.cs b/src/DynamicData/Cache/Internal/DynamicGrouper.cs index 82d540e33..85bb9dce2 100644 --- a/src/DynamicData/Cache/Internal/DynamicGrouper.cs +++ b/src/DynamicData/Cache/Internal/DynamicGrouper.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for full license information. using System.Diagnostics; +using System.Reactive.Disposables; using DynamicData.Kernel; namespace DynamicData.Cache.Internal; @@ -19,6 +20,7 @@ internal sealed class DynamicGrouper(Func>? observer = null) { + // No need for Suspend Tracker. Operation could produce at most one change per group. PerformAddOrUpdate(key, groupKey, item); if (observer != null) @@ -29,33 +31,37 @@ public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver changeSet, IObserver>? observer = null) { - foreach (var change in changeSet.ToConcreteType()) { - switch (change.Reason) + // If there could be multiple changes per group, use a suspend tracker so that only a single changeset is emitted per group + using var suspendTracker = changeSet.Count > 1 ? new SuspendTracker() : null; + foreach (var change in changeSet.ToConcreteType()) { - case ChangeReason.Add when _groupSelector is not null: - PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current); - break; + switch (change.Reason) + { + case ChangeReason.Add when _groupSelector is not null: + PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); + break; - case ChangeReason.Remove: - PerformRemove(change.Key); - break; + case ChangeReason.Remove: + PerformRemove(change.Key, suspendTracker); + break; - case ChangeReason.Update when _groupSelector is not null: - PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current); - break; + case ChangeReason.Update when _groupSelector is not null: + PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); + break; - case ChangeReason.Update: - PerformUpdate(change.Key); - break; + case ChangeReason.Update: + PerformUpdate(change.Key, suspendTracker); + break; - case ChangeReason.Refresh when _groupSelector is not null: - PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current); - break; + case ChangeReason.Refresh when _groupSelector is not null: + PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); + break; - case ChangeReason.Refresh: - PerformRefresh(change.Key); - break; + case ChangeReason.Refresh: + PerformRefresh(change.Key, suspendTracker); + break; + } } } @@ -75,65 +81,29 @@ public void RegroupAll(IObserver> obse return; } - // Create an array of tuples with data for items whose GroupKeys have changed + // Create tuples with data for items whose GroupKeys have changed var groupChanges = _groupCache.Items - .Select(static group => group as ManagedGroup) - .SelectMany(group => group!.Cache.KeyValues.Select( - kvp => (KeyValuePair: kvp, OldGroup: group, NewGroupKey: _groupSelector(kvp.Value, kvp.Key)))) + .SelectMany(group => (group as ManagedGroup)!.Cache.KeyValues.Select( + kvp => (KeyValuePair: kvp, OldGroup: (group as ManagedGroup)!, NewGroupKey: _groupSelector(kvp.Value, kvp.Key)))) .Where(static x => !EqualityComparer.Default.Equals(x.OldGroup.Key, x.NewGroupKey)) .ToArray(); - // Build a list of the removals that need to happen (grouped by the old key) - var pendingRemoves = groupChanges - .GroupBy( - static x => x.OldGroup.Key, - static x => (x.KeyValuePair.Key, x.OldGroup)) - .ToDictionary(g => g.Key, g => g.AsEnumerable()); - - // Build a list of the adds that need to happen (grouped by the new key) - var pendingAddList = groupChanges - .GroupBy( - static x => x.NewGroupKey, - static x => x.KeyValuePair) - .ToList(); - - // Iterate the list of groups that need something added (also maybe removed) - foreach (var add in pendingAddList) - { - // Get a list of keys to be removed from this group (if any) - var removeKeyList = - pendingRemoves.TryGetValue(add.Key, out var removes) - ? removes.Select(static r => r.Key) - : Enumerable.Empty(); - - // Obtained the ManagedGroup instance and perform all of the pending updates at once - var newGroup = GetOrAddGroup(add.Key); - newGroup.Update(updater => - { - updater.RemoveKeys(removeKeyList); - updater.AddOrUpdate(add); - }); - - // Update the key cache - foreach (var kvp in add) - { - _groupKeys[kvp.Key] = add.Key; - } - - // Remove from the pendingRemove dictionary because these removes have been handled - pendingRemoves.Remove(add.Key); - } - - // Everything left in the Dictionary represents a group that had items removed but no items added - foreach (var removeList in pendingRemoves.Values) + // Make all of the group changes with a SuspendTracker so only one changeset is emitted for each group { - var group = removeList.First().OldGroup; - group.Update(updater => updater.RemoveKeys(removeList.Select(static kvp => kvp.Key))); + using var suspendTracker = new SuspendTracker(); - // If it is now empty, flag it for cleanup - if (group.Count == 0) + foreach (var change in groupChanges) { - _emptyGroups.Add(group); + PerformGroupAddOrUpdate(change.KeyValuePair.Key, change.NewGroupKey, change.KeyValuePair.Value, suspendTracker); + suspendTracker.Add(change.OldGroup); + change.OldGroup.Update(updater => + { + updater.Remove(change.KeyValuePair.Key); + if (updater.Count == 0) + { + _emptyGroups.Add(change.OldGroup); + } + }); } } @@ -154,6 +124,7 @@ public void Initialize(IEnumerable> initialValues, F return; } + // No need for Suspend Tracker. There can't be any subscribers to the Group Caches yet so they won't be emitting any changesets. _groupSelector = groupSelector; foreach (var kvp in initialValues) { @@ -193,10 +164,11 @@ public void EmitChanges(IObserver> obs public void Dispose() => _groupCache.Items.ForEach(group => (group as ManagedGroup)?.Dispose()); - private static void PerformGroupRefresh(TKey key, in Optional> optionalGroup) + private static void PerformGroupRefresh(TKey key, in Optional> optionalGroup, SuspendTracker? suspendTracker = null) { if (optionalGroup.HasValue) { + suspendTracker?.Add(optionalGroup.Value); optionalGroup.Value.Update(updater => updater.Refresh(key)); } else @@ -219,7 +191,7 @@ private ManagedGroup GetOrAddGroup(TGroupKey groupKey) return newGroup; }); - private void PerformAddOrUpdate(TKey key, TGroupKey groupKey, TObject item) + private void PerformAddOrUpdate(TKey key, TGroupKey groupKey, TObject item, SuspendTracker? suspendTracker = null) { // See if this item already has been grouped if (_groupKeys.TryGetValue(key, out var currentGroupKey)) @@ -231,6 +203,7 @@ private void PerformAddOrUpdate(TKey key, TGroupKey groupKey, TObject item) var optionalGroup = LookupGroup(currentGroupKey); if (optionalGroup.HasValue) { + suspendTracker?.Add(optionalGroup.Value); optionalGroup.Value.Update(updater => updater.AddOrUpdate(item, key)); return; } @@ -240,17 +213,18 @@ private void PerformAddOrUpdate(TKey key, TGroupKey groupKey, TObject item) else { // GroupKey changed, so remove from old and allow to be added below - PerformRemove(key, currentGroupKey); + PerformRemove(key, currentGroupKey, suspendTracker); } } // Find the right group and add the item - PerformGroupAddOrUpdate(key, groupKey, item); + PerformGroupAddOrUpdate(key, groupKey, item, suspendTracker); } - private void PerformGroupAddOrUpdate(TKey key, TGroupKey groupKey, TObject item) + private void PerformGroupAddOrUpdate(TKey key, TGroupKey groupKey, TObject item, SuspendTracker? suspendTracker = null) { var group = GetOrAddGroup(groupKey); + suspendTracker?.Add(group); group.Update(updater => updater.AddOrUpdate(item, key)); _groupKeys[key] = groupKey; @@ -258,10 +232,10 @@ private void PerformGroupAddOrUpdate(TKey key, TGroupKey groupKey, TObject item) _emptyGroups.Remove(group); } - private void PerformRefresh(TKey key) => PerformGroupRefresh(key, LookupGroup(key)); + private void PerformRefresh(TKey key, SuspendTracker? suspendTracker = null) => PerformGroupRefresh(key, LookupGroup(key), suspendTracker); // When the GroupKey is available, check then and move the group if it changed - private void PerformRefresh(TKey key, TGroupKey newGroupKey, TObject item) + private void PerformRefresh(TKey key, TGroupKey newGroupKey, TObject item, SuspendTracker? suspendTracker = null) { if (_groupKeys.TryGetValue(key, out var groupKey)) { @@ -269,13 +243,13 @@ private void PerformRefresh(TKey key, TGroupKey newGroupKey, TObject item) if (EqualityComparer.Default.Equals(newGroupKey, groupKey)) { // GroupKey did not change, so just refresh the value in the group - PerformGroupRefresh(key, LookupGroup(groupKey)); + PerformGroupRefresh(key, LookupGroup(groupKey), suspendTracker); } else { // GroupKey changed, so remove from old and add to new - PerformRemove(key, groupKey); - PerformGroupAddOrUpdate(key, newGroupKey, item); + PerformRemove(key, groupKey, suspendTracker); + PerformGroupAddOrUpdate(key, newGroupKey, item, suspendTracker); } } else @@ -284,11 +258,11 @@ private void PerformRefresh(TKey key, TGroupKey newGroupKey, TObject item) } } - private void PerformRemove(TKey key) + private void PerformRemove(TKey key, SuspendTracker? suspendTracker = null) { if (_groupKeys.TryGetValue(key, out var groupKey)) { - PerformRemove(key, groupKey); + PerformRemove(key, groupKey, suspendTracker); _groupKeys.Remove(key); } else @@ -297,12 +271,13 @@ private void PerformRemove(TKey key) } } - private void PerformRemove(TKey key, TGroupKey groupKey) + private void PerformRemove(TKey key, TGroupKey groupKey, SuspendTracker? suspendTracker = null) { var optionalGroup = LookupGroup(groupKey); if (optionalGroup.HasValue) { var currentGroup = optionalGroup.Value; + suspendTracker?.Add(currentGroup); currentGroup.Update(updater => { updater.Remove(key); @@ -320,5 +295,21 @@ private void PerformRemove(TKey key, TGroupKey groupKey) // Without the new group key, all that can be done is remove the old value // Consumer of the Grouper is resonsible for Adding the New Value. - private void PerformUpdate(TKey key) => PerformRemove(key); + private void PerformUpdate(TKey key, SuspendTracker? suspendTracker = null) => PerformRemove(key, suspendTracker); + + private sealed class SuspendTracker : IDisposable + { + private readonly HashSet _trackedKeys = []; + private readonly CompositeDisposable _disposables = []; + + public void Add(ManagedGroup managedGroup) + { + if (_trackedKeys.Add(managedGroup.Key)) + { + _disposables.Add(managedGroup.SuspendNotifications()); + } + } + + public void Dispose() => _disposables.Dispose(); + } } diff --git a/src/DynamicData/Cache/Internal/ManagedGroup.cs b/src/DynamicData/Cache/Internal/ManagedGroup.cs index f2c10ae84..85836d252 100644 --- a/src/DynamicData/Cache/Internal/ManagedGroup.cs +++ b/src/DynamicData/Cache/Internal/ManagedGroup.cs @@ -18,6 +18,8 @@ internal sealed class ManagedGroup(TGroupKey groupKey) public void Dispose() => _cache.Dispose(); + public IDisposable SuspendNotifications() => _cache.SuspendNotifications(); + /// /// Determines whether the specified is equal to the current . /// From db781a5fd606f1c8933099527c5e2af3268597e6 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 19 Feb 2024 11:42:39 -0800 Subject: [PATCH 2/5] Fixed Unit Tests --- src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs b/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs index 139c63879..ab5fd0bd4 100644 --- a/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs +++ b/src/DynamicData.Tests/Cache/GroupOnDynamicFixture.cs @@ -34,7 +34,7 @@ public class GroupOnDynamicFixture : IDisposable private readonly GroupChangeSetAggregator _groupResults; private readonly Faker _faker; private readonly Randomizer _randomizer; - private readonly BehaviorSubject> _keySelectionSubject = new (ParentName); + private readonly BehaviorSubject?> _keySelectionSubject = new (null); private readonly Subject _regroupSubject = new (); public GroupOnDynamicFixture() @@ -42,7 +42,7 @@ public GroupOnDynamicFixture() unchecked { _randomizer = new((int)0xc001_d00d); } _faker = Fakers.Person.Clone().WithSeed(_randomizer); _results = _cache.Connect().AsAggregator(); - _groupResults = _cache.Connect().Group(_keySelectionSubject, _regroupSubject).AsAggregator(); + _groupResults = _cache.Connect().Group(KeySelectionObservable, _regroupSubject).AsAggregator(); } [Theory] @@ -306,7 +306,7 @@ public void ResultFailsIfSourceFails() InitialPopulate(); var expectedError = new Exception("Expected"); var throwObservable = Observable.Throw>(expectedError); - using var results = _cache.Connect().Concat(throwObservable).Group(_keySelectionSubject, _regroupSubject).AsAggregator(); + using var results = _cache.Connect().Concat(throwObservable).Group(KeySelectionObservable, _regroupSubject).AsAggregator(); // Act _cache.Dispose(); @@ -352,6 +352,8 @@ public void Dispose() _regroupSubject.Dispose(); } + private IObservable> KeySelectionObservable => _keySelectionSubject.Where(v => v is not null).Select(v => v!); + private void InitialPopulate() => _cache.AddOrUpdate(_faker.Generate(InitialCount)); private void VerifyGroupingResults() => From 48650429d214175b095c813745403273e912ede0 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 19 Feb 2024 12:53:34 -0800 Subject: [PATCH 3/5] Leverage Deferrals in GroupOn operator --- src/DynamicData/Cache/Internal/GroupOn.cs | 26 +++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/DynamicData/Cache/Internal/GroupOn.cs b/src/DynamicData/Cache/Internal/GroupOn.cs index 95ca6bd12..b33734e6c 100644 --- a/src/DynamicData/Cache/Internal/GroupOn.cs +++ b/src/DynamicData/Cache/Internal/GroupOn.cs @@ -80,6 +80,8 @@ private GroupChangeSet HandleUpdates(IEnumerable new ChangeWithGroup(u, groupSelectorKey)).GroupBy(c => c.GroupKey); + using var suspendTracker = new SuspendTracker(); + // 1. iterate and maintain child caches (_groupCache) // 2. maintain which group each item belongs to (_itemCache) grouped.ForEach(group => @@ -90,6 +92,11 @@ private GroupChangeSet HandleUpdates(IEnumerable, TGroupKey>(ChangeReason.Add, group.Key, groupCache)); } + else + { + // It wasn't created, so there could be subscribers, so suspend updates until the end + suspendTracker.Add(groupCache); + } groupCache.Update( groupUpdater => @@ -111,6 +118,7 @@ private GroupChangeSet HandleUpdates(IEnumerable { + suspendTracker.Add(g); g.Update(u => u.Remove(current.Key)); if (g.Count != 0) { @@ -141,6 +149,7 @@ private GroupChangeSet HandleUpdates(IEnumerable { + suspendTracker.Add(g); g.Update(u => u.Remove(current.Key)); if (g.Count != 0) { @@ -180,6 +189,7 @@ private GroupChangeSet HandleUpdates(IEnumerable { + suspendTracker.Add(g); g.Update(u => u.Remove(current.Key)); if (g.Count != 0) { @@ -236,5 +246,21 @@ private readonly struct ChangeWithGroup(Change change, Func $"Key: {Key}, GroupKey: {GroupKey}, Item: {Item}"; } + + private sealed class SuspendTracker : IDisposable + { + private readonly HashSet _trackedKeys = []; + private readonly CompositeDisposable _disposables = []; + + public void Add(ManagedGroup managedGroup) + { + if (_trackedKeys.Add(managedGroup.Key)) + { + _disposables.Add(managedGroup.SuspendNotifications()); + } + } + + public void Dispose() => _disposables.Dispose(); + } } } From e2db7cbea8e93da20263ee33dd51af0a79ea41d6 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 19 Feb 2024 15:29:01 -0800 Subject: [PATCH 4/5] Fixed all the GroupOnObservable unit tests --- .../Cache/GroupOnObservableFixture.cs | 3 + .../Cache/Internal/DynamicGrouper.cs | 100 ++++++++++-------- .../Cache/Internal/GroupOnObservable.cs | 4 +- 3 files changed, 61 insertions(+), 46 deletions(-) diff --git a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs index 26009108a..9c2fea749 100644 --- a/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs +++ b/src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs @@ -120,6 +120,7 @@ public void GroupRemovedWhenEmpty() // Assert _cache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount - 1); _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + _groupResults.Data.Count.Should().Be(colorCount - 1, "{0} colors were used and then all of the {1} were removed", colorCount, removeColor); _groupResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); _groupResults.Summary.Overall.Adds.Should().Be(colorCount); _groupResults.Summary.Overall.Removes.Should().Be(1); @@ -147,9 +148,11 @@ public void GroupNotRemovedIfAddedBackImmediately() // Assert _cache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount); _results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Other Added Value"); + _groupResults.Data.Count.Should().Be(colorCount); _groupResults.Messages.Count.Should().Be(1, "Shouldn't be removed/re-added"); _groupResults.Summary.Overall.Adds.Should().Be(colorCount); _groupResults.Summary.Overall.Removes.Should().Be(0); + _groupResults.Groups.Lookup(removeColor).Value.Data.Count.Should().Be(1, "All the {0} were removed and then 1 was added back", removeColor); VerifyGroupingResults(); } diff --git a/src/DynamicData/Cache/Internal/DynamicGrouper.cs b/src/DynamicData/Cache/Internal/DynamicGrouper.cs index 85bb9dce2..cbb71acd5 100644 --- a/src/DynamicData/Cache/Internal/DynamicGrouper.cs +++ b/src/DynamicData/Cache/Internal/DynamicGrouper.cs @@ -16,12 +16,12 @@ internal sealed class DynamicGrouper(Func, TGroupKey> _groupCache = new(); private readonly Dictionary _groupKeys = []; private readonly HashSet> _emptyGroups = []; + private readonly SuspendTracker _suspendTracker = new(); private Func? _groupSelector = groupSelector; - public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver>? observer = null) + public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver>? observer = null, bool suspendUpdates = true) { - // No need for Suspend Tracker. Operation could produce at most one change per group. - PerformAddOrUpdate(key, groupKey, item); + PerformAddOrUpdate(key, groupKey, item, suspendUpdates ? _suspendTracker : null); if (observer != null) { @@ -29,39 +29,38 @@ public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver changeSet, IObserver>? observer = null) + public void ProcessChangeSet(IChangeSet changeSet, IObserver>? observer = null, bool? suspendUpdates = null) { + // If caller didn't specify whether to suspendUpdates, use the size of the ChangeSet to decide + // If there is only one change in the changeset, then it isn't worth suspending the updates + var suspendTracker = (suspendUpdates ?? changeSet.Count > 1) ? _suspendTracker : null; + foreach (var change in changeSet.ToConcreteType()) { - // If there could be multiple changes per group, use a suspend tracker so that only a single changeset is emitted per group - using var suspendTracker = changeSet.Count > 1 ? new SuspendTracker() : null; - foreach (var change in changeSet.ToConcreteType()) + switch (change.Reason) { - switch (change.Reason) - { - case ChangeReason.Add when _groupSelector is not null: - PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); - break; + case ChangeReason.Add when _groupSelector is not null: + PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); + break; - case ChangeReason.Remove: - PerformRemove(change.Key, suspendTracker); - break; + case ChangeReason.Remove: + PerformRemove(change.Key, suspendTracker); + break; - case ChangeReason.Update when _groupSelector is not null: - PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); - break; + case ChangeReason.Update when _groupSelector is not null: + PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); + break; - case ChangeReason.Update: - PerformUpdate(change.Key, suspendTracker); - break; + case ChangeReason.Update: + PerformUpdate(change.Key, suspendTracker); + break; - case ChangeReason.Refresh when _groupSelector is not null: - PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); - break; + case ChangeReason.Refresh when _groupSelector is not null: + PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker); + break; - case ChangeReason.Refresh: - PerformRefresh(change.Key, suspendTracker); - break; - } + case ChangeReason.Refresh: + PerformRefresh(change.Key, suspendTracker); + break; } } @@ -88,23 +87,18 @@ public void RegroupAll(IObserver> obse .Where(static x => !EqualityComparer.Default.Equals(x.OldGroup.Key, x.NewGroupKey)) .ToArray(); - // Make all of the group changes with a SuspendTracker so only one changeset is emitted for each group + foreach (var change in groupChanges) { - using var suspendTracker = new SuspendTracker(); - - foreach (var change in groupChanges) + PerformGroupAddOrUpdate(change.KeyValuePair.Key, change.NewGroupKey, change.KeyValuePair.Value, _suspendTracker); + _suspendTracker.Add(change.OldGroup); + change.OldGroup.Update(updater => { - PerformGroupAddOrUpdate(change.KeyValuePair.Key, change.NewGroupKey, change.KeyValuePair.Value, suspendTracker); - suspendTracker.Add(change.OldGroup); - change.OldGroup.Update(updater => + updater.Remove(change.KeyValuePair.Key); + if (updater.Count == 0) { - updater.Remove(change.KeyValuePair.Key); - if (updater.Count == 0) - { - _emptyGroups.Add(change.OldGroup); - } - }); - } + _emptyGroups.Add(change.OldGroup); + } + }); } EmitChanges(observer); @@ -154,6 +148,8 @@ public void EmitChanges(IObserver> obs // Make sure no empty ones were missed Debug.Assert(!_groupCache.Items.Any(static group => group.Cache.Count == 0), "Not all empty Groups were removed"); + _suspendTracker.Reset(); + // Emit any pending changes var changeSet = _groupCache.CaptureChanges(); if (changeSet.Count != 0) @@ -162,7 +158,11 @@ public void EmitChanges(IObserver> obs } } - public void Dispose() => _groupCache.Items.ForEach(group => (group as ManagedGroup)?.Dispose()); + public void Dispose() + { + _suspendTracker.Dispose(); + _groupCache.Items.ForEach(group => (group as ManagedGroup)?.Dispose()); + } private static void PerformGroupRefresh(TKey key, in Optional> optionalGroup, SuspendTracker? suspendTracker = null) { @@ -300,7 +300,9 @@ private void PerformRemove(TKey key, TGroupKey groupKey, SuspendTracker? suspend private sealed class SuspendTracker : IDisposable { private readonly HashSet _trackedKeys = []; - private readonly CompositeDisposable _disposables = []; + private CompositeDisposable _disposables = []; + + public bool HasItems => _disposables.Count > 0; public void Add(ManagedGroup managedGroup) { @@ -310,6 +312,16 @@ public void Add(ManagedGroup managedGroup) } } + public void Reset() + { + if (_disposables.Count > 0) + { + _disposables.Dispose(); + _disposables = []; + _trackedKeys.Clear(); + } + } + public void Dispose() => _disposables.Dispose(); } } diff --git a/src/DynamicData/Cache/Internal/GroupOnObservable.cs b/src/DynamicData/Cache/Internal/GroupOnObservable.cs index 62c91720f..bdf75c9d5 100644 --- a/src/DynamicData/Cache/Internal/GroupOnObservable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnObservable.cs @@ -23,8 +23,8 @@ IObservable CreateGroupObservable(TObject item, TKey key) => selectGroup(item, key) .DistinctUntilChanged() .Synchronize(locker!) - .Do( - onNext: groupKey => grouper!.AddOrUpdate(key, groupKey, item, !parentUpdate ? observer : null), + .Do(// Only suspend updates if inside of a parentUpdate. Otherwise, it will only generate at most one change per group, so there's no value in suspending. + onNext: groupKey => grouper!.AddOrUpdate(key, groupKey, item, !parentUpdate ? observer : null, suspendUpdates: parentUpdate), onError: observer.OnError); // Create a shared connection to the source From 8746ae826dfb3dd6f17578d17dee07d493f9ad31 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 20 Feb 2024 09:41:20 -0800 Subject: [PATCH 5/5] Minor improvements --- .../Cache/Internal/DynamicGrouper.cs | 21 +++++++++++++------ .../Cache/Internal/GroupOnObservable.cs | 4 ++-- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/DynamicData/Cache/Internal/DynamicGrouper.cs b/src/DynamicData/Cache/Internal/DynamicGrouper.cs index cbb71acd5..98671a60f 100644 --- a/src/DynamicData/Cache/Internal/DynamicGrouper.cs +++ b/src/DynamicData/Cache/Internal/DynamicGrouper.cs @@ -19,9 +19,11 @@ internal sealed class DynamicGrouper(Func? _groupSelector = groupSelector; - public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver>? observer = null, bool suspendUpdates = true) + public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver>? observer = null) { - PerformAddOrUpdate(key, groupKey, item, suspendUpdates ? _suspendTracker : null); + // If not emitting the changes, then suspend the notifications + // If changes will be emitted, then there is no need because it will generate at most one change per group + PerformAddOrUpdate(key, groupKey, item, observer == null ? _suspendTracker : null); if (observer != null) { @@ -29,11 +31,18 @@ public void AddOrUpdate(TKey key, TGroupKey groupKey, TObject item, IObserver changeSet, IObserver>? observer = null, bool? suspendUpdates = null) + public void ProcessChangeSet(IChangeSet changeSet, IObserver>? observer = null) { - // If caller didn't specify whether to suspendUpdates, use the size of the ChangeSet to decide - // If there is only one change in the changeset, then it isn't worth suspending the updates - var suspendTracker = (suspendUpdates ?? changeSet.Count > 1) ? _suspendTracker : null; + var suspendTracker = (observer, changeSet.Count) switch + { + // If emitting the changeset and there is only one change, then there will be at most one change per group downstream + // So there's no value in suspending the notifications + (not null, 1) => null, + + // Otherwise, use the tracker so they get suspended + _ => _suspendTracker, + }; + foreach (var change in changeSet.ToConcreteType()) { switch (change.Reason) diff --git a/src/DynamicData/Cache/Internal/GroupOnObservable.cs b/src/DynamicData/Cache/Internal/GroupOnObservable.cs index bdf75c9d5..62c91720f 100644 --- a/src/DynamicData/Cache/Internal/GroupOnObservable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnObservable.cs @@ -23,8 +23,8 @@ IObservable CreateGroupObservable(TObject item, TKey key) => selectGroup(item, key) .DistinctUntilChanged() .Synchronize(locker!) - .Do(// Only suspend updates if inside of a parentUpdate. Otherwise, it will only generate at most one change per group, so there's no value in suspending. - onNext: groupKey => grouper!.AddOrUpdate(key, groupKey, item, !parentUpdate ? observer : null, suspendUpdates: parentUpdate), + .Do( + onNext: groupKey => grouper!.AddOrUpdate(key, groupKey, item, !parentUpdate ? observer : null), onError: observer.OnError); // Create a shared connection to the source