Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty change set notifications #494

Merged
merged 7 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/DynamicData.Tests/Cache/FilterControllerFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,5 +299,19 @@ public void UpdateNotMatched()
_results.Messages.Count.Should().Be(0, "Should be no updates");
_results.Data.Count.Should().Be(0, "Should nothing cached");
}

[Fact]
public void EmptyChanges()
{
IChangeSet<Person, string>? change = null;

//need to also apply overload on connect as that will also need to provide and empty notification
using var subscription = _source.Connect(suppressEmptyChangeSets: false)
.Filter(_filter, false)
.Subscribe(c => change = c);

change.Should().NotBeNull();
change!.Count.Should().Be(0);
}
}
}
15 changes: 15 additions & 0 deletions src/DynamicData.Tests/Cache/FilterFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,5 +212,20 @@ public void UpdateNotMatched()
_results.Messages.Count.Should().Be(0, "Should be no updates");
_results.Data.Count.Should().Be(0, "Should nothing cached");
}

[Fact]
public void EmptyChanges()
{
IChangeSet<Person, string>? change = null;

//need to also apply overload on connect as that will also need to provide and empty notification
// [alternatively _source.Connect(x=> x.Age == 20, suppressEmptyChangeSets: false)] instead
using var subscription = _source.Connect(suppressEmptyChangeSets: false)
.Filter(x=> x.Age == 20, false)
.Subscribe(c => change = c);

change.Should().NotBeNull();
change!.Count.Should().Be(0);
}
}
}
25 changes: 25 additions & 0 deletions src/DynamicData.Tests/Cache/SourceCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,30 @@ public void SubscribesDisposesCorrectly()
called.Should().BeTrue();
completed.Should().BeTrue();
}

[Fact]
public void EmptyChanges()
{
IChangeSet<Person, string>? change = null;

using var subscription = _source.Connect(suppressEmptyChangeSets: false)
.Subscribe(c=> change = c);

change.Should().NotBeNull();
change!.Count.Should().Be(0);

}

[Fact]
public void EmptyChangesWithFilter()
{
IChangeSet<Person, string>? change = null;

using var subscription = _source.Connect(p=>p.Age == 20, suppressEmptyChangeSets: false)
.Subscribe(c => change = c);

change.Should().NotBeNull();
change!.Count.Should().Be(0);
}
}
}
4 changes: 2 additions & 2 deletions src/DynamicData.Tests/Cache/TransformSafeFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void AddWithError()
_source.AddOrUpdate(person);

_errors.Count.Should().Be(1, "Should be 1 error reported");
_results.Messages.Count.Should().Be(0, "Should be no messages");
_results.Messages.Count.Should().Be(1, "Should be 1 message");
}

[Fact]
Expand Down Expand Up @@ -118,7 +118,7 @@ public void UpdateSucessively()
_source.AddOrUpdate(update3);

_errors.Count.Should().Be(1, "Should be 1 error reported");
_results.Messages.Count.Should().Be(2, "Should be 2 messages");
_results.Messages.Count.Should().Be(3, "Should be 3 messages");

_results.Data.Count.Should().Be(1, "Should 1 item in the cache");
_results.Data.Items.First().Should().Be(_transformFactory(update2), "Change 2 shoud be the only item cached");
Expand Down
4 changes: 2 additions & 2 deletions src/DynamicData.Tests/Cache/TransformSafeParallelFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void AddWithError()
_source.AddOrUpdate(person);

_errors.Count.Should().Be(1, "Should be 1 error reported");
_results.Messages.Count.Should().Be(0, "Should be no messages");
_results.Messages.Count.Should().Be(1, "Should be 1 messages");
}

[Fact]
Expand Down Expand Up @@ -118,7 +118,7 @@ public void UpdateSucessively()
_source.AddOrUpdate(update3);

_errors.Count.Should().Be(1, "Should be 1 error reported");
_results.Messages.Count.Should().Be(2, "Should be 2 messages");
_results.Messages.Count.Should().Be(3, "Should be 3 messages");

_results.Data.Count.Should().Be(1, "Should 1 item in the cache");
_results.Data.Items.First().Should().Be(_transformFactory(update2), "Change 2 shoud be the only item cached");
Expand Down
47 changes: 46 additions & 1 deletion src/DynamicData.Tests/List/SortFixture.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;

using System.Reactive.Subjects;
using DynamicData.Binding;
using DynamicData.Tests.Domain;

Expand All @@ -11,6 +12,50 @@

namespace DynamicData.Tests.List
{
public class SortChangedFixture
{
private static readonly IComparer<ListItem> DefaultComparer = SortExpressionComparer<ListItem>.Ascending(x => x.Number);


/// <summary>
/// See https://github.com/reactivemarbles/DynamicData/issues/473
/// </summary>
[Fact]
public void SortsWithoutError()
{
var source = new SourceList<ListItem>();
var sorter = new Subject<IComparer<ListItem>>();

source.AddRange(Enumerable.Range(1, 10).Select(i => new ListItem(i)));

source.Connect()
.Sort(sorter)
.Bind(out var bound)
.Subscribe();

bound.Select(x => x.Number).Should().BeInAscendingOrder();

sorter.OnNext(SortExpressionComparer<ListItem>.Descending(x => x.Number));


bound.Select(x => x.Number).Should().BeInDescendingOrder();
}


private class ListItem : IComparable<ListItem>
{
public int Number { get; }

public ListItem(int number)
{
Number = number;
}

public int CompareTo([AllowNull] ListItem other) => DefaultComparer.Compare(this, other);

}
}

public class SortFixture : IDisposable
{
private readonly IComparer<Person> _comparer = SortExpressionComparer<Person>.Ascending(p => p.Name).ThenByAscending(p => p.Age);
Expand Down
3 changes: 2 additions & 1 deletion src/DynamicData/Cache/IConnectableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public interface IConnectableCache<TObject, TKey>
/// Returns a filtered stream of cache changes preceded with the initial filtered state.
/// </summary>
/// <param name="predicate">The result will be filtered using the specified predicate.</param>
/// <param name="suppressEmptyChangeSets">By default, empty change sets are not emitted. Set this value to false to emit empty change sets.</param>
/// <returns>An observable that emits the change set.</returns>
IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predicate = null);
IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predicate = null, bool suppressEmptyChangeSets = true);

/// <summary>
/// Returns a filtered stream of cache changes.
Expand Down
96 changes: 18 additions & 78 deletions src/DynamicData/Cache/IntermediateCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,101 +44,41 @@ public IntermediateCache()
_innerCache = new ObservableCache<TObject, TKey>();
}

/// <summary>
/// Gets the total count of cached items.
/// </summary>
/// <inheritdoc />
public int Count => _innerCache.Count;

/// <summary>
/// Gets a count changed observable starting with the current count.
/// </summary>
/// <inheritdoc />
public IObservable<int> CountChanged => _innerCache.CountChanged;

/// <summary>
/// Gets the Items.
/// </summary>
/// <inheritdoc />
public IEnumerable<TObject> Items => _innerCache.Items;

/// <summary>
/// Gets the keys.
/// </summary>
/// <inheritdoc />
public IEnumerable<TKey> Keys => _innerCache.Keys;

/// <summary>
/// Gets the key value pairs.
/// </summary>
/// <inheritdoc />
public IEnumerable<KeyValuePair<TKey, TObject>> KeyValues => _innerCache.KeyValues;

/// <summary>
/// Returns a filtered change set of cache changes preceded with the initial state.
/// </summary>
/// <param name="predicate">The predicate.</param>
/// <returns>An observable which will emit change sets.</returns>
public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predicate)
{
return _innerCache.Connect(predicate);
}

/// <summary>
/// Returns a observable of cache changes preceded with the initial cache state.
/// </summary>
/// <returns>An observable which will emit change sets.</returns>
public IObservable<IChangeSet<TObject, TKey>> Connect()
{
return _innerCache.Connect();
}
/// <inheritdoc />
public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predicate = null, bool suppressEmptyChangeSets = true)
=> _innerCache.Connect(predicate, suppressEmptyChangeSets);

/// <summary>
/// Releases unmanaged and - optionally - managed resources.
/// </summary>
public void Dispose()
{
_innerCache.Dispose();
}
/// <inheritdoc />
public void Dispose() => _innerCache.Dispose();

/// <summary>
/// Action to apply a batch update to a cache. Multiple update methods can be invoked within a single batch operation.
/// These operations are invoked within the cache's lock and is therefore thread safe.
/// The result of the action will produce a single change set.
/// </summary>
/// <param name="updateAction">The update action.</param>
public void Edit(Action<ICacheUpdater<TObject, TKey>> updateAction)
{
_innerCache.UpdateFromIntermediate(updateAction);
}
/// <inheritdoc />
public void Edit(Action<ICacheUpdater<TObject, TKey>> updateAction) => _innerCache.UpdateFromIntermediate(updateAction);

/// <summary>
/// Lookup a single item using the specified key.
/// </summary>
/// <remarks>
/// Fast indexed lookup.
/// </remarks>
/// <param name="key">The key.</param>
/// <returns>A optional value.</returns>
public Optional<TObject> Lookup(TKey key)
{
return _innerCache.Lookup(key);
}
/// <inheritdoc />
public Optional<TObject> Lookup(TKey key) => _innerCache.Lookup(key);

/// <inheritdoc />
public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool>? predicate = null)
{
return _innerCache.Preview(predicate);
}
=> _innerCache.Preview(predicate);

/// <summary>
/// Returns an observable of any changes which match the specified key. The sequence starts with the initial item in the cache (if there is one).
/// </summary>
/// <param name="key">The key.</param>
/// <returns>An observable which emits changes.</returns>
public IObservable<Change<TObject, TKey>> Watch(TKey key)
{
return _innerCache.Watch(key);
}
/// <inheritdoc />
public IObservable<Change<TObject, TKey>> Watch(TKey key) => _innerCache.Watch(key);

internal IChangeSet<TObject, TKey> GetInitialUpdates(Func<TObject, bool>? filter = null)
{
return _innerCache.GetInitialUpdates(filter);
}
internal IChangeSet<TObject, TKey> GetInitialUpdates(Func<TObject, bool>? filter = null) => _innerCache.GetInitialUpdates(filter);
}
}
26 changes: 6 additions & 20 deletions src/DynamicData/Cache/Internal/AnonymousObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,15 @@ public AnonymousObservableCache(IObservableCache<TObject, TKey> cache)

public IEnumerable<KeyValuePair<TKey, TObject>> KeyValues => _cache.KeyValues;

public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predicate = null)
{
return _cache.Connect(predicate);
}
public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predicate = null, bool suppressEmptyChangeSets = true)
=> _cache.Connect(predicate, suppressEmptyChangeSets);

public void Dispose()
{
_cache.Dispose();
}
public void Dispose() => _cache.Dispose();

public Optional<TObject> Lookup(TKey key)
{
return _cache.Lookup(key);
}
public Optional<TObject> Lookup(TKey key) => _cache.Lookup(key);

public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool>? predicate = null)
{
return _cache.Preview(predicate);
}
public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool>? predicate = null) => _cache.Preview(predicate);

public IObservable<Change<TObject, TKey>> Watch(TKey key)
{
return _cache.Watch(key);
}
public IObservable<Change<TObject, TKey>> Watch(TKey key) => _cache.Watch(key);
}
}
10 changes: 8 additions & 2 deletions src/DynamicData/Cache/Internal/DynamicFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ internal class DynamicFilter<TObject, TKey>
private readonly IObservable<Func<TObject, bool>> _predicateChanged;

private readonly IObservable<Unit>? _refilterObservable;
private readonly bool _suppressEmptyChangeSets;

private readonly IObservable<IChangeSet<TObject, TKey>> _source;

public DynamicFilter(IObservable<IChangeSet<TObject, TKey>> source, IObservable<Func<TObject, bool>> predicateChanged, IObservable<Unit>? refilterObservable = null)
public DynamicFilter(IObservable<IChangeSet<TObject, TKey>> source, IObservable<Func<TObject, bool>> predicateChanged, IObservable<Unit>? refilterObservable = null, bool suppressEmptyChangeSets = true)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_predicateChanged = predicateChanged ?? throw new ArgumentNullException(nameof(predicateChanged));
_refilterObservable = refilterObservable;
_suppressEmptyChangeSets = suppressEmptyChangeSets;
}

public IObservable<IChangeSet<TObject, TKey>> Run()
Expand Down Expand Up @@ -59,7 +61,11 @@ public IObservable<IChangeSet<TObject, TKey>> Run()
return filteredData.CaptureChanges();
});

return refresher.Merge(dataChanged).NotEmpty().SubscribeSafe(observer);
var source = refresher.Merge(dataChanged);
if (_suppressEmptyChangeSets)
source = source.NotEmpty();

return source.SubscribeSafe(observer);
});
}

Expand Down
Loading