From 56eda4522ff2b498514133cfe6dd470066aedd88 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Thu, 21 Dec 2023 13:40:08 -0800 Subject: [PATCH] Feature: MergeChangeSets for List ChangeSets (#805) * Initial drop of List MergeChangeSets * Finished unit tests * Made API file changes * PR Feedback * Unit test fixes and code cleanup --- ...ts.DynamicDataTests.DotNet6_0.verified.txt | 12 + ...ts.DynamicDataTests.DotNet7_0.verified.txt | 12 + ...ts.DynamicDataTests.DotNet8_0.verified.txt | 12 + .../List/MergeChangeSetsFixture.cs | 442 ++++++++++++++++++ .../Utilities/ObservableExtensions.cs | 12 + .../Cache/Internal/MergeChangeSets.cs | 2 +- .../List/Internal/MergeChangeSets.cs | 69 +++ src/DynamicData/List/ObservableListEx.cs | 110 ++++- 8 files changed, 668 insertions(+), 3 deletions(-) create mode 100644 src/DynamicData.Tests/List/MergeChangeSetsFixture.cs create mode 100644 src/DynamicData/List/Internal/MergeChangeSets.cs diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet6_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet6_0.verified.txt index 1ca750edc..4612db9d9 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet6_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet6_0.verified.txt @@ -2122,6 +2122,18 @@ namespace DynamicData where TGroupKey : notnull { } public static System.IObservable> LimitSizeTo(this DynamicData.ISourceList source, int sizeLimit, System.Reactive.Concurrency.IScheduler? scheduler = null) where T : notnull { } + public static System.IObservable> MergeChangeSets(this DynamicData.IObservableList>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable>>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.Collections.Generic.IEnumerable>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable> source, System.Collections.Generic.IEnumerable>> others, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable> source, System.IObservable> other, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } public static System.IObservable> MergeChangeSets(this DynamicData.IObservableList>> source, System.Collections.Generic.IComparer comparer) where TObject : notnull where TKey : notnull { } diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet7_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet7_0.verified.txt index d3136718b..10b8258c4 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet7_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet7_0.verified.txt @@ -2122,6 +2122,18 @@ namespace DynamicData where TGroupKey : notnull { } public static System.IObservable> LimitSizeTo(this DynamicData.ISourceList source, int sizeLimit, System.Reactive.Concurrency.IScheduler? scheduler = null) where T : notnull { } + public static System.IObservable> MergeChangeSets(this DynamicData.IObservableList>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable>>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.Collections.Generic.IEnumerable>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable> source, System.Collections.Generic.IEnumerable>> others, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable> source, System.IObservable> other, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } public static System.IObservable> MergeChangeSets(this DynamicData.IObservableList>> source, System.Collections.Generic.IComparer comparer) where TObject : notnull where TKey : notnull { } diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt index 448927480..492b4274f 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt @@ -2122,6 +2122,18 @@ namespace DynamicData where TGroupKey : notnull { } public static System.IObservable> LimitSizeTo(this DynamicData.ISourceList source, int sizeLimit, System.Reactive.Concurrency.IScheduler? scheduler = null) where T : notnull { } + public static System.IObservable> MergeChangeSets(this DynamicData.IObservableList>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable>>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.Collections.Generic.IEnumerable>> source, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable> source, System.Collections.Generic.IEnumerable>> others, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } + public static System.IObservable> MergeChangeSets(this System.IObservable> source, System.IObservable> other, System.Collections.Generic.IEqualityComparer? equalityComparer = null, System.Reactive.Concurrency.IScheduler? scheduler = null, bool completable = true) + where TObject : notnull { } public static System.IObservable> MergeChangeSets(this DynamicData.IObservableList>> source, System.Collections.Generic.IComparer comparer) where TObject : notnull where TKey : notnull { } diff --git a/src/DynamicData.Tests/List/MergeChangeSetsFixture.cs b/src/DynamicData.Tests/List/MergeChangeSetsFixture.cs new file mode 100644 index 000000000..9f30052d1 --- /dev/null +++ b/src/DynamicData.Tests/List/MergeChangeSetsFixture.cs @@ -0,0 +1,442 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive; +using System.Reactive.Linq; +using System.Threading.Tasks; +using Bogus; +using DynamicData.Kernel; +using DynamicData.Tests.Domain; +using DynamicData.Tests.Utilities; +using FluentAssertions; +using Microsoft.Reactive.Testing; +using Xunit; +using System.Collections.Concurrent; + +namespace DynamicData.Tests.List; + +public sealed class MergeChangeSetsFixture : IDisposable +{ +#if DEBUG + const int InitialOwnerCount = 7; + const int AddRangeSize = 5; +#else + const int InitialOwnerCount = 103; + const int AddRangeSize = 53; +#endif + + private readonly IList _animalOwners = new List(); + private readonly Faker _animalOwnerFaker; + private readonly Faker _animalFaker; + private readonly Randomizer _randomizer; + + public MergeChangeSetsFixture() + { + _randomizer = new Randomizer(0x10131948); + _animalFaker = Fakers.Animal.Clone().WithSeed(_randomizer); + _animalOwnerFaker = Fakers.AnimalOwner.Clone().WithSeed(_randomizer).WithInitialAnimals(_animalFaker, AddRangeSize, AddRangeSize); + _animalOwners.Add(_animalOwnerFaker.Generate(InitialOwnerCount)); + } + + [Theory] + [InlineData(5, 7)] + [InlineData(10, 50)] + [InlineData(10, 100)] + [InlineData(200, 50)] + [InlineData(100, 10)] + public async Task MultiThreadedStressTest(int ownerCount, int animalCount) + { + var MaxAddTime = TimeSpan.FromSeconds(0.250); + var MaxRemoveTime = TimeSpan.FromSeconds(0.100); + + TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null; + + IObservable>> CreateStressObservable(int ownerCount, int animalCount, int parallel, ConcurrentBag added, IScheduler scheduler) => + Observable.Create>>(observer => + { + var shared = _animalOwnerFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler) + .Parallelize(ownerCount, parallel) + .Merge(_animalOwners.ToObservable()) + .Do(owner => added.Add(owner)) + .Publish(); + + var addAnimalsSub = shared.SelectMany(owner => AddRemoveAnimals(owner, animalCount, parallel, scheduler)) + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError, + onCompleted: observer.OnCompleted); + + var changeSetSub = shared.Select(owner => owner.Animals.Connect()) + .Subscribe( + onNext: observer.OnNext, + onError: observer.OnError); + + return new CompositeDisposable(addAnimalsSub, changeSetSub, shared.Connect()); + }); + + IObservable AddRemoveAnimals(AnimalOwner owner, int animalCount, int parallel, IScheduler scheduler) => + _animalFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler) + .Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler)) + .Finally(owner.Animals.Dispose); + + var addedOwners = new ConcurrentBag(); + var addingAnimals = true; + var observableObservable = CreateStressObservable(ownerCount, animalCount, Environment.ProcessorCount, addedOwners, TaskPoolScheduler.Default) + .Finally(() => addingAnimals = false) + .Publish() + .RefCount(); + var mergedObservable = observableObservable.MergeChangeSets(); + + // Start asynchrononously modifying the parent list and the child lists + using var results = mergedObservable.AsAggregator(); + + // Subscribe / unsubscribe over and over while the collections are being modified + do + { + // Ensure items are being added asynchronously before subscribing to the animal changes + await Task.Yield(); + + { + // Subscribe + var mergedSub = mergedObservable.Subscribe(); + + // Let other threads run + await Task.Yield(); + + // Unsubscribe + mergedSub.Dispose(); + } + } + while (addingAnimals); + + // Verify the results + CheckResultContents(addedOwners.ToList(), results); + } + + [Fact] + public void NullChecks() + { + // Arrange + var nullChangeSetObs = (IObservable>>)null!; + + // Act + var checkParam1 = () => nullChangeSetObs.MergeChangeSets(); + + // Assert + nullChangeSetObs.Should().BeNull(); + + checkParam1.Should().Throw(); + } + + [Fact] + public void ResultContainsAllInitialChildrenObsObs() + { + // Arrange + var obs = GetObservableObservable(); + + // Act + using var results = obs.MergeChangeSets().AsAggregator(); + + // Assert + CheckResultContents(_animalOwners, results); + } + + [Fact] + public void ResultContainsAllInitialChildrenEnum() + { + // Arrange + var obs = GetEnumerableObservable(); + + // Act + using var results = obs.MergeChangeSets().AsAggregator(); + + // Assert + CheckResultContents(_animalOwners, results); + } + + [Fact] + public void ResultEmptyIfSourceIsClearedObs() + { + // Arrange + var obs = GetObservableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + _animalOwners.ForEach(owner => owner.Animals.Clear()); + + // Assert + results.Data.Count.Should().Be(0); + } + + [Fact] + public void ResultEmptyIfSourceIsClearedEnum() + { + // Arrange + var obs = GetEnumerableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + _animalOwners.ForEach(owner => owner.Animals.Clear()); + + // Assert + results.Data.Count.Should().Be(0); + } + + [Fact] + public async Task ResultContainsChildrenAddedWithAddRangeObs() + { + // Arrange + var obs = GetObservableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + var added = (await ForOwnersAsync(UseAddRange)).SelectMany(list => list).ToList(); + + // Assert + added.Should().BeSubsetOf(results.Data.Items); + CheckResultContents(_animalOwners, results); + } + + [Fact] + public async Task ResultContainsChildrenAddedWithAddRangeEnum() + { + // Arrange + var obs = GetEnumerableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + var added = (await ForOwnersAsync(UseAddRange)).SelectMany(list => list).ToList(); + + // Assert + added.Should().BeSubsetOf(results.Data.Items); + CheckResultContents(_animalOwners, results); + } + + [Fact] + public async Task ResultContainsChildrenAddedWithAddObs() + { + // Arrange + var obs = GetObservableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + var added = await ForOwnersAsync(UseAdd); + + // Assert + added.Should().BeSubsetOf(results.Data.Items); + CheckResultContents(_animalOwners, results); + } + + [Fact] + public async Task ResultContainsChildrenAddedWithAddEnum() + { + // Arrange + var obs = GetEnumerableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + await ForOwnersAsync(owner => owner.Animals.Add(_animalFaker.Generate())); + + // Assert + CheckResultContents(_animalOwners, results); + } + [Fact] + public async Task ResultContainsChildrenAddedWithInsertObs() + { + // Arrange + var obs = GetObservableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + var added = await ForOwnersAsync(UseInsert); + + // Assert + added.Should().BeSubsetOf(results.Data.Items); + CheckResultContents(_animalOwners, results); + } + + [Fact] + public async Task ResultContainsChildrenAddedWithInsertEnum() + { + // Arrange + var obs = GetEnumerableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + var added = await ForOwnersAsync(UseInsert); + + // Assert + added.Should().BeSubsetOf(results.Data.Items); + CheckResultContents(_animalOwners, results); + } + + + [Fact] + public async Task ResultContainsCorrectItemsAfterChildReplacementObs() + { + // Arrange + var obs = GetObservableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + var replacements = await ForOwnersAsync(ReplaceAnimal); + + // Assert + replacements.Select(r => r.New).Should().BeSubsetOf(results.Data.Items); + replacements.Select(r => r.Old).ForEach(old => results.Data.Items.Should().NotContain(old)); + CheckResultContents(_animalOwners, results); + } + + [Fact] + public async Task ResultContainsCorrectItemsAfterChildReplacementEnum() + { + // Arrange + var obs = GetEnumerableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + var replacements = await ForOwnersAsync(ReplaceAnimal); + + // Assert + replacements.Select(r => r.New).Should().BeSubsetOf(results.Data.Items); + replacements.Select(r => r.Old).ForEach(old => results.Data.Items.Should().NotContain(old)); + CheckResultContents(_animalOwners, results); + } + + [Fact] + public void ResultFailsIfSourceFails() + { + // Arrange + var expectedError = new Exception("Expected"); + var throwObservable = Observable.Throw>>(expectedError); + var obs = GetObservableObservable(); + + // Act + using var results = obs.Concat(throwObservable).MergeChangeSets().AsAggregator(); + + // Assert + results.Exception.Should().Be(expectedError); + } + + [Fact] + public void ResultFailsIfAnyChildChangeSetFails() + { + // Arrange + var expectedError = new Exception("Test exception"); + var throwObservable = Observable.Throw>(expectedError); + var obs = GetEnumerableObservable().Append(throwObservable); + + // Act + using var results = obs.MergeChangeSets().AsAggregator(); + + // Assert + results.Exception.Should().Be(expectedError); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public void ResultCompletesOnlyWhenSourceAndAllChildrenComplete(bool completeAll) + { + // Arrange + var obs = GetObservableObservable(); + using var results = obs.MergeChangeSets().AsAggregator(); + + // Act + _animalOwners.Skip(completeAll ? 0 : 1).ForEach(owner => owner.Animals.Dispose()); + + // Assert + results.IsCompleted.Should().Be(completeAll); + } + + [Theory] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public void MergedObservableRespectsCompletableFlag(bool completeSource, bool completeChildren) + { + // Arrange + var obs = GetEnumerableObservable(); + using var results = obs.MergeChangeSets(completable: completeSource).AsAggregator(); + + // Act + _animalOwners.Skip(completeChildren ? 0 : 1).ForEach(owner => owner.Animals.Dispose()); + + // Assert + results.IsCompleted.Should().Be(completeSource && completeChildren); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public void EnumObservableUsesTheScheduler(bool advance) + { + // Arrange + var scheduler = new TestScheduler(); + var obs = GetEnumerableObservable(); + using var results = obs.MergeChangeSets(scheduler: scheduler).AsAggregator(); + + // Act + if (advance) + { + scheduler.AdvanceBy(1); + } + + // Assert + if (advance) + { + CheckResultContents(_animalOwners, results); + } + else + { + results.Data.Count.Should().Be(0); + results.Messages.Count.Should().Be(0); + } + } + + public void Dispose() + { + _animalOwners.ForEach(owner => owner.Dispose()); + } + + private static void CheckResultContents(IList expectedOwners, ChangeSetAggregator animalResults) + { + var expectedAnimals = expectedOwners.SelectMany(owner => owner.Animals.Items).ToList(); + + // These should be subsets of each other, so check one subset and the size + expectedAnimals.Should().BeSubsetOf(animalResults.Data.Items); + animalResults.Data.Items.Count().Should().Be(expectedAnimals.Count); + } + + Task ForOwnersAsync(Action action) => Task.WhenAll(_animalOwners.Select(owner => Task.Run(() => action(owner)))); + + Task ForOwnersAsync(Func func) => Task.WhenAll(_animalOwners.Select(owner => Task.Run(() => func(owner)))); + + private Animal UseAdd(AnimalOwner owner) => + _animalFaker.Generate().With(animal => owner.Animals.Add(animal)); + + private List UseAddRange(AnimalOwner owner) => + _animalFaker.Generate(_randomizer.Number(AddRangeSize)).With(animals => owner.Animals.AddRange(animals)); + + private (Animal Old, Animal New) ReplaceAnimal(AnimalOwner owner) + { + var replaceThis = _randomizer.ListItem(owner.Animals.Items.ToList()); + var withThis = _animalFaker.Generate(); + owner.Animals.Replace(replaceThis, withThis); + return (replaceThis, withThis); + } + + private Animal UseInsert(AnimalOwner owner) + { + var newAnimal = _animalFaker.Generate(); + owner.Animals.Insert(_randomizer.Number(owner.Animals.Count), newAnimal); + return newAnimal; + } + + private IEnumerable>> GetEnumerableObservable() => _animalOwners.Select(owner => owner.Animals.Connect()); + private IObservable>> GetObservableObservable() => GetEnumerableObservable().ToObservable(); +} diff --git a/src/DynamicData.Tests/Utilities/ObservableExtensions.cs b/src/DynamicData.Tests/Utilities/ObservableExtensions.cs index b216cca32..b392f163a 100644 --- a/src/DynamicData.Tests/Utilities/ObservableExtensions.cs +++ b/src/DynamicData.Tests/Utilities/ObservableExtensions.cs @@ -36,6 +36,18 @@ e is not null public static IObservable Parallelize(this IObservable source, int count, int parallel, Func, IObservable> fnAttachParallelWork) => Observable.Merge(Distribute(count, parallel).Select(n => fnAttachParallelWork(source.Take(n)))); + /// + /// Creates an observable that parallelizes some given work by taking the source observable, creates multiple subscriptions, limiting each to a certain number of values, and + /// merging them back together. + /// + /// Observable type. + /// Source Observable. + /// Total number of values to process. + /// Total number of subscriptions to create. + /// An Observable that contains the values resulting from the merged sequences. + public static IObservable Parallelize(this IObservable source, int count, int parallel) => + Observable.Merge(Distribute(count, parallel).Select(n => source.Take(n))); + public static IObservable ValidateSynchronization(this IObservable source) // Using Raw observable and observer classes to bypass normal RX safeguards, which prevent out-of-sequence notifications. // This allows the operator to be combined with TestableObserver, for correctness-testing of operators. diff --git a/src/DynamicData/Cache/Internal/MergeChangeSets.cs b/src/DynamicData/Cache/Internal/MergeChangeSets.cs index aa51a74ce..ff6220830 100644 --- a/src/DynamicData/Cache/Internal/MergeChangeSets.cs +++ b/src/DynamicData/Cache/Internal/MergeChangeSets.cs @@ -50,7 +50,7 @@ public IObservable> Run() => Observable.Create(() => localCache.Items, _comparer, _equalityComparer); // Merge all of the changeset streams together and Process them with the change tracker which will emit the results - var subscription = localCache.Connect().MergeMany(mc => mc.Source.Do(_ => { }, observer.OnError)) + var subscription = localCache.Connect().MergeMany(mc => mc.Source.Do(static _ => { }, observer.OnError)) .Synchronize(locker) .Subscribe( changes => changeTracker.ProcessChangeSet(changes, observer), diff --git a/src/DynamicData/List/Internal/MergeChangeSets.cs b/src/DynamicData/List/Internal/MergeChangeSets.cs new file mode 100644 index 000000000..f92b00e93 --- /dev/null +++ b/src/DynamicData/List/Internal/MergeChangeSets.cs @@ -0,0 +1,69 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +namespace DynamicData.List.Internal; + +/// +/// Operator that is similiar to Merge but intelligently handles List ChangeSets. +/// +internal sealed class MergeChangeSets + where TObject : notnull +{ + private readonly IObservable>> _source; + + private readonly IEqualityComparer? _equalityComparer; + + public MergeChangeSets(IEnumerable>> source, IEqualityComparer? equalityComparer, bool completable, IScheduler? scheduler = null) + { + _equalityComparer = equalityComparer; + _source = CreateClonedListObservable(source, completable, scheduler); + } + + public MergeChangeSets(IObservable>> source, IEqualityComparer? equalityComparer) + { + _equalityComparer = equalityComparer; + _source = CreateClonedListObservable(source); + } + + public IObservable> Run() => Observable.Create>( + observer => + { + // This is manages all of the changes + var changeTracker = new ChangeSetMergeTracker(); + + // Merge all of the changeset streams together and Process them with the change tracker which will emit the results + return _source.MergeMany(clonedList => clonedList.Source.RemoveIndex().Do(static _ => { }, observer.OnError)) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); + }); + + // Can optimize for the Add case because that's the only one that applies + private Change> CreateChange(IObservable> source) => + new(ListChangeReason.Add, new ClonedListChangeSet(source, _equalityComparer)); + + // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable + private IObservable>> CreateClonedListObservable(IObservable>> source) => + source.Select(src => new ChangeSet>(new[] { CreateChange(src) })); + + // Create a ChangeSet Observable with a single event that adds all the values in the enum (and then completes, maybe) + private IObservable>> CreateClonedListObservable(IEnumerable>> source, bool completable, IScheduler? scheduler = null) + { + // Create a changeset that has a change for each changeset in the enumerable + var changeSet = new ChangeSet>(source.Select(CreateChange)); + + // Create an observable that returns it (using the scheduler if provided) + var observable = + scheduler is IScheduler sch + ? Observable.Return>>(changeSet, sch) + : Observable.Return(changeSet); + + // Block completion if it isn't supposed to complete + return completable ? observable : observable.Concat(Observable.Never>>()); + } +} diff --git a/src/DynamicData/List/ObservableListEx.cs b/src/DynamicData/List/ObservableListEx.cs index 69434bd68..6dd83671e 100644 --- a/src/DynamicData/List/ObservableListEx.cs +++ b/src/DynamicData/List/ObservableListEx.cs @@ -957,6 +957,112 @@ public static IObservable MergeMany(this IObserva return new MergeMany(source, observableSelector).Run(); } + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events. + /// + /// The type of the object. + /// The Source Observable ChangeSet. + /// instance to determine if two elements are the same. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable>> source, IEqualityComparer? equalityComparer = null) + where TObject : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + + return new MergeChangeSets(source, equalityComparer).Run(); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges both observable changesets into a single stream of ChangeSet events. + /// + /// The type of the object. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSet. + /// instance to determine if two elements are the same. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IObservable> other, IEqualityComparer? equalityComparer = null, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + other.ThrowArgumentNullExceptionIfNull(nameof(other)); + + return new[] { source, other }.MergeChangeSets(equalityComparer, scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. Merges the source changeset and the collection of other changesets together into a single stream of ChangeSet events. + /// + /// The type of the object. + /// The Source Observable ChangeSet. + /// The Other Observable ChangeSets. + /// instance to determine if two elements are the same. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable> source, IEnumerable>> others, IEqualityComparer? equalityComparer = null, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + others.ThrowArgumentNullExceptionIfNull(nameof(others)); + + return source.EnumerateOne().Concat(others).MergeChangeSets(equalityComparer, scheduler, completable); + } + + /// + /// Operator similiar to Merge except it is ChangeSet aware. All of the observable changesets are merged together into a single stream of ChangeSet events. + /// + /// The type of the object. + /// The Source Observable ChangeSet. + /// instance to determine if two elements are the same. + /// (Optional) instance to use when enumerating the collection. + /// Whether or not the result Observable should complete if all the changesets complete. + /// The result from merging the changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IEnumerable>> source, IEqualityComparer? equalityComparer = null, IScheduler? scheduler = null, bool completable = true) + where TObject : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + + return new MergeChangeSets(source, equalityComparer, completable, scheduler).Run(); + } + + /// + /// Merges all of the Cache Observable ChangeSets into a single ChangeSets that correctly handles removal of the parent items. + /// + /// The type of the object. + /// The SourceList of Observable Cache ChangeSets. + /// Optional instance to determine if two elements are the same. + /// The result from merging the child changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservableList>> source, IEqualityComparer? equalityComparer = null) + where TObject : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + + return source.Connect().MergeChangeSets(equalityComparer); + } + + /// + /// Merges each Observable ChangeSet in the ObservableList into a single stream of ChangeSets that correctly handles removal of the parent items. + /// + /// The type of the object. + /// The List Observable ChangeSet of Cache Observable ChangeSets. + /// Optional instance to determine if two elements are the same. + /// The result from merging the child changesets together. + /// Parameter was null. + public static IObservable> MergeChangeSets(this IObservable>>> source, IEqualityComparer? equalityComparer = null) + where TObject : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + + return source.MergeManyChangeSets(static src => src, equalityComparer); + } + /// /// Merges each Observable ChangeSet in the ObservableList into a single stream of ChangeSets that correctly handles multiple Keys and removal of the parent items. /// @@ -972,7 +1078,7 @@ public static IObservable> MergeChangeSets @@ -1009,7 +1115,7 @@ public static IObservable> MergeChangeSets