Skip to content

Commit

Permalink
Bug Fix: Synchronization Issues in MergeManyChangeSet operators (#808)
Browse files Browse the repository at this point in the history
* Fixed Synchronization Issues
* Standardized / Streamlined implementation of all 7 operators
  • Loading branch information
dwcullop authored Dec 22, 2023
1 parent 4ffeec0 commit 42d0bda
Show file tree
Hide file tree
Showing 14 changed files with 232 additions and 288 deletions.
2 changes: 1 addition & 1 deletion src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ public void EnumObservableUsesTheSchedulerAndEmitsAll()
using var results = pricesCache.Connect().AsAggregator();

// when
scheduler.AdvanceBy(1);
scheduler.AdvanceBy(MarketCount);

// then
_marketList.Count.Should().Be(MarketCount);
Expand Down
30 changes: 14 additions & 16 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ public MergeManyChangeSetsCacheFixture()
[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(5, 100)]
#if !DEBUG
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(100, 5)]
[InlineData(1_000, 10)]
#endif
public async Task MultiThreadedStressTest(int marketCount, int priceCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
Expand All @@ -65,23 +67,19 @@ public async Task MultiThreadedStressTest(int marketCount, int priceCount)

IObservable<Unit> AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
(
AddRemoveMarkets(marketCount, parallel, scheduler)
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex)),
.Subscribe(
onNext: static _ => { },
onError: observer.OnError),

_marketCache.Connect()
.MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler))
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});
.MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler))
.Subscribe(
onNext: static _ => { },
onError: observer.OnError,
onCompleted: observer.OnCompleted)
));

IObservable<IMarket> AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) =>
_marketFaker.IntervalGenerate(MaxAddTime, scheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ public MergeManyChangeSetsCacheSourceCompareFixture()
}

[Theory]
#if DEBUG
[InlineData(5, 7)]
[InlineData(10, 50)]
#else
#if false && !DEBUG
[InlineData(10, 1_000)]
[InlineData(100, 100)]
[InlineData(1_000, 10)]
Expand Down Expand Up @@ -91,7 +90,7 @@ IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int para
.Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler))
.Finally(market.PricesCache.Dispose);

var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare);
var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true);
var adding = true;
using var priceResults = merged.AsAggregator();

Expand Down
24 changes: 11 additions & 13 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ public MergeManyChangeSetsListFixture()
[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
#if !DEBUG
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(1_000, 10)]
#endif
public async Task MultiThreadedStressTest(int ownerCount, int animalCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
Expand All @@ -63,23 +65,19 @@ public async Task MultiThreadedStressTest(int ownerCount, int animalCount)

IObservable<Unit> AddRemoveAnimalsStress(int ownerCount, int animalCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
AddRemoveOwners(ownerCount, parallel, scheduler)
.Subscribe(
onNext: static _ => { },
onError: ex => observer.OnError(ex)),
(
AddRemoveOwners(ownerCount, parallel, scheduler)
.Subscribe(
onNext: static _ => { },
onError: observer.OnError),

_animalOwners.Connect()
_animalOwners.Connect()
.MergeMany(owner => AddRemoveAnimals(owner, animalCount, parallel, scheduler))
.Subscribe(
onNext: static _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});
onError: observer.OnError,
onCompleted: observer.OnCompleted)
));

IObservable<AnimalOwner> AddRemoveOwners(int ownerCount, int parallel, IScheduler scheduler) =>
_animalOwnerFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler)
Expand Down
10 changes: 6 additions & 4 deletions src/DynamicData.Tests/List/MergeChangeSetsFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public MergeChangeSetsFixture()
[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(10, 100)]
[InlineData(200, 50)]
[InlineData(100, 10)]
#if !DEBUG
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(1_000, 10)]
#endif
public async Task MultiThreadedStressTest(int ownerCount, int animalCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
Expand Down Expand Up @@ -383,7 +385,7 @@ public void EnumObservableUsesTheScheduler(bool advance)
// Act
if (advance)
{
scheduler.AdvanceBy(1);
scheduler.AdvanceBy(InitialOwnerCount);
}

// Assert
Expand Down
32 changes: 15 additions & 17 deletions src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ public MergeManyChangeSetsCacheFixture()
[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(5, 100)]
#if !DEBUG
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(100, 5)]
[InlineData(1_000, 10)]
#endif
public async Task MultiThreadedStressTest(int marketCount, int priceCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
Expand All @@ -64,23 +66,19 @@ public async Task MultiThreadedStressTest(int marketCount, int priceCount)

IObservable<Unit> AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
(
AddRemoveMarkets(marketCount, parallel, scheduler)
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex)),
.Subscribe(
onNext: static _ => { },
onError: observer.OnError),

_marketList.Connect()
.MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler))
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});
.MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler))
.Subscribe(
onNext: static _ => { },
onError: observer.OnError,
onCompleted: observer.OnCompleted)
));

IObservable<IMarket> AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) =>
_marketFaker.IntervalGenerate(MaxAddTime, scheduler)
Expand Down Expand Up @@ -521,7 +519,7 @@ public void ComparerOnlyAddsBetterValuesOnSourceReplace()
using var lowPriceResults = _marketList.Connect().DebugSpy("List").MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).DebugSpy("MergedLow").AsAggregator();
var marketOriginal = new Market(0);
var marketLow = new Market(1);
var marketLowLow = new Market(marketLow);
var marketLowLow = new Market(2);
marketOriginal.SetPrices(0, PricesPerMarket, GetRandomPrice);
marketLow.SetPrices(0, PricesPerMarket, LowestPrice);
marketLowLow.SetPrices(0, PricesPerMarket, LowestPrice - 1);
Expand Down
24 changes: 11 additions & 13 deletions src/DynamicData.Tests/List/MergeManyChangeSetsListFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ public MergeManyChangeSetsListFixture()
[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
#if !DEBUG
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(1_000, 10)]
#endif
public async Task MultiThreadedStressTest(int ownerCount, int animalCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
Expand All @@ -60,23 +62,19 @@ public async Task MultiThreadedStressTest(int ownerCount, int animalCount)

IObservable<Unit> AddRemoveAnimalsStress(int ownerCount, int animalCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
AddRemoveOwners(ownerCount, parallel, scheduler)
.Subscribe(
onNext: static _ => { },
onError: ex => observer.OnError(ex)),
(
AddRemoveOwners(ownerCount, parallel, scheduler)
.Subscribe(
onNext: static _ => { },
onError: observer.OnError),

_animalOwners.Connect()
_animalOwners.Connect()
.MergeMany(owner => AddRemoveAnimals(owner, animalCount, parallel, scheduler))
.Subscribe(
onNext: static _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});
onError: observer.OnError,
onCompleted: observer.OnCompleted)
));

IObservable<AnimalOwner> AddRemoveOwners(int ownerCount, int parallel, IScheduler scheduler) =>
_animalOwnerFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler)
Expand Down
97 changes: 33 additions & 64 deletions src/DynamicData/Cache/Internal/MergeChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,91 +3,60 @@
// See the LICENSE file in the project root for full license information.

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace DynamicData.Cache.Internal;

/// <summary>
/// Operator that is similiar to Merge but intelligently handles Cache ChangeSets.
/// </summary>
internal sealed class MergeChangeSets<TObject, TKey>
internal sealed class MergeChangeSets<TObject, TKey>(IObservable<IObservable<IChangeSet<TObject, TKey>>> source, IEqualityComparer<TObject>? equalityComparer, IComparer<TObject>? comparer)
where TObject : notnull
where TKey : notnull
{
private readonly IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> _source;

private readonly IComparer<TObject>? _comparer;

private readonly IEqualityComparer<TObject>? _equalityComparer;

public MergeChangeSets(IEnumerable<IObservable<IChangeSet<TObject, TKey>>> source, IEqualityComparer<TObject>? equalityComparer, IComparer<TObject>? comparer, bool completable, IScheduler? scheduler = null)
: this(CreateContainerObservable(source, completable, scheduler), equalityComparer, comparer)
{
}

public MergeChangeSets(IObservable<IObservable<IChangeSet<TObject, TKey>>> source, IEqualityComparer<TObject>? equalityComparer, IComparer<TObject>? comparer)
: this(CreateContainerObservable(source), equalityComparer, comparer)
: this(CreateObservable(source, completable, scheduler), equalityComparer, comparer)
{
}

private MergeChangeSets(IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> source, IEqualityComparer<TObject>? equalityComparer, IComparer<TObject>? comparer)
{
_source = source;
_comparer = comparer;
_equalityComparer = equalityComparer;
}

public IObservable<IChangeSet<TObject, TKey>> Run() => Observable.Create<IChangeSet<TObject, TKey>>(
observer =>
{
var locker = new object();

// Create a local cache of Merge Containers
var localCache = _source.Synchronize(locker).AsObservableCache();

// Set up the change tracker
var changeTracker = new ChangeSetMergeTracker<TObject, TKey>(() => localCache.Items, _comparer, _equalityComparer);

// Merge all of the changeset streams together and Process them with the change tracker which will emit the results
var subscription = localCache.Connect().MergeMany(mc => mc.Source.Do(static _ => { }, observer.OnError))
.Synchronize(locker)
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);

return new CompositeDisposable(localCache, subscription);
});
observer =>
{
var locker = new object();
var cache = new Cache<ChangeSetCache<TObject, TKey>, int>();

// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TObject, TKey>(() => cache.Items, comparer, equalityComparer);

// Create a ChangeSet of Caches, synchronize, update the local copy, and merge the sub-observables together.
return CreateContainerObservable(source, locker)
.Synchronize(locker)
.Do(cache.Clone)
.MergeMany(mc => mc.Source.Do(static _ => { }, observer.OnError))
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);
});

// Can optimize for the Add case because that's the only one that applies
private static Change<ChangeSetCache<TObject, TKey>, int> CreateChange(IObservable<IChangeSet<TObject, TKey>> source, int index) =>
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source));
private static Change<ChangeSetCache<TObject, TKey>, int> CreateChange(IObservable<IChangeSet<TObject, TKey>> source, int index, object locker) =>
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.Synchronize(locker)));

// Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable
private static IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> CreateContainerObservable(IObservable<IObservable<IChangeSet<TObject, TKey>>> source) =>
source.Select((src, index) => new ChangeSet<ChangeSetCache<TObject, TKey>, int>(new[] { CreateChange(src, index) }));
private static IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> CreateContainerObservable(IObservable<IObservable<IChangeSet<TObject, TKey>>> source, object locker) =>
source.Select((src, index) => new ChangeSet<ChangeSetCache<TObject, TKey>, int>(new[] { CreateChange(src, index, locker) }));

// Create a ChangeSet Observable with a single event that adds all the values in the enum (and then completes, maybe)
private static IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> CreateContainerObservable(IEnumerable<IObservable<IChangeSet<TObject, TKey>>> source, bool completable, IScheduler? scheduler = null) =>
Observable.Create<IChangeSet<ChangeSetCache<TObject, TKey>, int>>(observer =>
{
void EmitChanges()
{
observer.OnNext(new ChangeSet<ChangeSetCache<TObject, TKey>, int>(source.Select(CreateChange)));

if (completable)
{
observer.OnCompleted();
}
}
private static IObservable<IObservable<IChangeSet<TObject, TKey>>> CreateObservable(IEnumerable<IObservable<IChangeSet<TObject, TKey>>> source, bool completable, IScheduler? scheduler = null)
{
var obs = (scheduler != null) ? source.ToObservable(scheduler) : source.ToObservable();

if (scheduler is not null)
{
return scheduler.Schedule(EmitChanges);
}
if (!completable)
{
obs = obs.Concat(Observable.Never<IObservable<IChangeSet<TObject, TKey>>>());
}

EmitChanges();
return Disposable.Empty;
});
return obs;
}
}
Loading

0 comments on commit 42d0bda

Please sign in to comment.