Skip to content

Commit

Permalink
Fix filter in cache connect Fixes #400 (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandPheasant authored May 1, 2021
1 parent ec02c3b commit 811b32e
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 10 deletions.
43 changes: 43 additions & 0 deletions src/DynamicData.Tests/Cache/FilterOnConnectFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System.Linq;
using FluentAssertions;
using Xunit;

namespace DynamicData.Tests.Cache
{
/// <summary>
/// See https://github.com/reactivemarbles/DynamicData/issues/400
/// </summary>
public class FilterOnConnectFixture
{
[Fact]
public void ClearingSourceCacheWithPredicateShouldClearTheData()
{
// having
var source = new SourceCache<int, int>(it => it);
source.AddOrUpdate(1);
var results = source.Connect(it => true).AsAggregator();

// when
source.Clear();

// then
results.Data.Count.Should().Be(0, "Should be 0");
}

[Fact]
public void UpdatesExistedBeforeConnectWithoutPredicateShouldBeVisibleAsPreviousWhenNewUpdatesTriggered()
{
// having
var source = new SourceCache<int, int>(it => it);
source.AddOrUpdate(1);
var results = source.Connect().AsAggregator();

// when
source.AddOrUpdate(1);

// then
results.Messages.Count.Should().Be(2, "Should be 2 updates");
results.Messages[1].First().Previous.HasValue.Should().Be(true, "Should have previous value");
}
}
}
2 changes: 1 addition & 1 deletion src/DynamicData.Tests/Cache/SizeLimitFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void AddMoreThanLimitInBatched()
_source.AddOrUpdate(_generator.Take(10).ToArray());
_source.AddOrUpdate(_generator.Take(10).ToArray());

_scheduler.AdvanceBy(TimeSpan.FromMilliseconds(100).Ticks);
_scheduler.Start();

_results.Messages.Count.Should().Be(3, "Should be 3 updates");
_results.Messages[0].Adds.Should().Be(10, "Should be 10 adds in the first update");
Expand Down
26 changes: 18 additions & 8 deletions src/DynamicData/Cache/ObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,30 @@ public ObservableCache(Func<TObject, TKey>? keySelector = null)
public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predicate = null) =>
Observable.Create<IChangeSet<TObject, TKey>>(
observer =>
{
var initial = InternalEx.Return(() =>
{
// lock getting initial changes and rely on a combination of Concat
// + _changes being synchronized to produce thread safety (I hope!)
lock (_locker)
{
var initial = GetInitialUpdates(predicate);
if (initial.Count != 0)
{
observer.OnNext(initial);
}

var updateSource = (predicate is null ? _changes : _changes.Filter(predicate)).NotEmpty();
return updateSource.SubscribeSafe(observer);
return (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate);
}
});

var changes = Observable.Defer(() => initial).Concat(_changes);
if (predicate != null)
{
changes = changes.Filter(predicate);
}
else
{
changes = changes.NotEmpty();
}

return changes.SubscribeSafe(observer);
});

public void Dispose() => _cleanUp.Dispose();

public Optional<TObject> Lookup(TKey key) => _readerWriter.Lookup(key);
Expand Down
11 changes: 11 additions & 0 deletions src/DynamicData/Kernel/InternalEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,16 @@ internal static IObservable<Unit> ToUnit<T>(this IObservable<T> source)
{
return source.Select(_ => Unit.Default);
}

/// <summary>
/// Observable.Return without the memory leak.
/// </summary>
internal static IObservable<T> Return<T>(Func<T> source) =>
Observable.Create<T>(o =>
{
o.OnNext(source());
o.OnCompleted();
return () => { };
});
}
}
2 changes: 1 addition & 1 deletion src/DynamicData/List/ChangeAwareList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void RemoveAt(int index)
{
if (index > _innerList.Count)
{
throw new ArgumentOutOfRangeException($"{nameof(index)} cannot be greater than the size of the collection");
throw new ArgumentOutOfRangeException(nameof(index), $"{nameof(index)} cannot be greater than the size of the collection");
}

RemoveItem(index);
Expand Down

0 comments on commit 811b32e

Please sign in to comment.