Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for ChangeSetMergeTracker so that it correctly works with Value Types #940

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -651,11 +651,11 @@ public void EqualityComparerAndComparerWorkTogetherForRefreshes()
// then
_marketList.Count.Should().Be(2);
results1.Data.Count.Should().Be(PricesPerMarket);
results1.Messages.Count.Should().Be(2);
results1.Messages.Count.Should().Be(3);
results1.Summary.Overall.Adds.Should().Be(PricesPerMarket);
results1.Summary.Overall.Removes.Should().Be(0);
results1.Summary.Overall.Updates.Should().Be(PricesPerMarket);
results1.Summary.Overall.Refreshes.Should().Be(0);
results1.Summary.Overall.Refreshes.Should().Be(PricesPerMarket);
results2.Messages.Count.Should().Be(4);
results2.Summary.Overall.Adds.Should().Be(PricesPerMarket);
results2.Summary.Overall.Removes.Should().Be(0);
Expand Down
20 changes: 20 additions & 0 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,26 @@ public void MergedObservableWillFailIfSourceFails()
receivedError.Should().Be(expectedError);
}

[Fact]
public void MergeManyChangeSetsWorksCorrectlyWithValueTypes()
{
// having
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
_marketCache.AddOrUpdate(markets);
markets.ForEach(m => m.SetPrices(0, PricesPerMarket, GetRandomPrice));
using var results = _marketCache.Connect()
.MergeManyChangeSets(m => m.LatestPrices.Transform(p => p.Price))
.AsAggregator();

// when
markets.ForEach(m => m.RemoveAllPrices());

// then
results.Data.Count.Should().Be(0);
results.Summary.Overall.Adds.Should().Be(PricesPerMarket);
results.Summary.Overall.Removes.Should().Be(PricesPerMarket);
}

public void Dispose()
{
_marketCacheResults.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,11 +921,11 @@ public void EqualityComparerAndChildComparerWorkTogetherForRefreshes()
resultsLow.Summary.Overall.Removes.Should().Be(0);
resultsLow.Summary.Overall.Updates.Should().Be(0);
resultsLow.Summary.Overall.Refreshes.Should().Be(0);
resultsRecent.Messages.Count.Should().Be(3);
resultsRecent.Messages.Count.Should().Be(4);
resultsRecent.Summary.Overall.Adds.Should().Be(PricesPerMarket);
resultsRecent.Summary.Overall.Removes.Should().Be(0);
resultsRecent.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2);
resultsRecent.Summary.Overall.Refreshes.Should().Be(0);
resultsRecent.Summary.Overall.Refreshes.Should().Be(PricesPerMarket);
resultsTimeStamp.Messages.Count.Should().Be(5);
resultsTimeStamp.Summary.Overall.Adds.Should().Be(PricesPerMarket);
resultsTimeStamp.Summary.Overall.Removes.Should().Be(0);
Expand Down
7 changes: 4 additions & 3 deletions src/DynamicData/Cache/Internal/ChangeSetMergeTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal sealed class ChangeSetMergeTracker<TObject, TKey>(Func<IEnumerable<Chan
where TKey : notnull
{
private readonly ChangeAwareCache<TObject, TKey> _resultCache = new();
private readonly IEqualityComparer<TObject> _equalityComparer = equalityComparer ?? EqualityComparer<TObject>.Default;
private bool _hasCompleted;

public void MarkComplete() => _hasCompleted = true;
Expand Down Expand Up @@ -201,7 +202,7 @@ private void OnItemRefreshed(ChangeSetCache<TObject, TKey>[] sources, TObject it
// In the sorting case, a refresh requires doing a full update because any change could alter what the best value is
// If we don't care about sorting OR if we do care, but re-selecting the best value didn't change anything
// AND the current value is the exact one being refreshed, then emit the refresh downstream
if (((comparer is null) || !UpdateToBestValue(sources, key, cached)) && ReferenceEquals(cached.Value, item))
if (((comparer is null) || !UpdateToBestValue(sources, key, cached)) && CheckEquality(cached.Value, item))
{
_resultCache.Refresh(key);
}
Expand Down Expand Up @@ -268,9 +269,9 @@ private Optional<TObject> LookupBestValue(ChangeSetCache<TObject, TKey>[] source
}

private bool CheckEquality(TObject left, TObject right) =>
ReferenceEquals(left, right) || (equalityComparer?.Equals(left, right) ?? false);
_equalityComparer.Equals(left, right);

// Return true if candidate should replace current as the observed downstream value
private bool ShouldReplace(TObject candidate, TObject current) =>
!ReferenceEquals(candidate, current) && (comparer?.Compare(candidate, current) < 0);
comparer?.Compare(candidate, current) < 0;
}
55 changes: 36 additions & 19 deletions src/DynamicData/Cache/Internal/ToObservableOptional.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,45 @@ internal sealed class ToObservableOptional<TObject, TKey>(IObservable<IChangeSet
where TObject : notnull
where TKey : notnull
{
private readonly IEqualityComparer<TObject> _equalityComparer = equalityComparer ?? EqualityComparer<TObject>.Default;
private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));
private readonly TKey _key = key;

public IObservable<Optional<TObject>> Run() => Observable.Create<Optional<TObject>>(observer =>
_source.Subscribe(
changes =>
changes.Where(ShouldEmitChange).ForEach(change => observer.OnNext(change switch
{
{ Reason: ChangeReason.Remove } => Optional.None<TObject>(),
_ => Optional.Some(change.Current),
})),
observer.OnError,
observer.OnCompleted));

private bool ShouldEmitChange(Change<TObject, TKey> change) => change switch
{
{ Key: { } thekey } when !thekey.Equals(_key) => false,
{ Reason: ChangeReason.Add } => true,
{ Reason: ChangeReason.Remove } => true,
{ Reason: ChangeReason.Update, Previous.HasValue: false } => true,
{ Reason: ChangeReason.Update } when equalityComparer is not null => !equalityComparer.Equals(change.Current, change.Previous.Value),
{ Reason: ChangeReason.Update } => !ReferenceEquals(change.Current, change.Previous.Value),
_ => false,
};
var lastValue = Optional.None<TObject>();

return _source.Subscribe(
changes => lastValue = EmitChanges(changes, observer, lastValue),
observer.OnError,
observer.OnCompleted);
});

private Optional<TObject> EmitChanges(IChangeSet<TObject, TKey> changes, IObserver<Optional<TObject>> observer, Optional<TObject> lastValue)
{
foreach (var change in changes.ToConcreteType())
{
// Ignore changes for different keys
if (!change.Key.Equals(_key))
{
continue;
}

// Remove is None, everything else is the current value
var emitValue = change switch
{
{ Reason: ChangeReason.Remove } => Optional.None<TObject>(),
_ => Optional.Some(change.Current),
};

// Emit the value if it has changed
if ((emitValue.HasValue != lastValue.HasValue) || (emitValue.HasValue && !_equalityComparer.Equals(lastValue.Value, emitValue.Value)))
{
observer.OnNext(emitValue);
lastValue = emitValue;
}
}

return lastValue;
}
}
Loading