Skip to content

Commit

Permalink
Updated List to have same new MergeMany behavior (#742)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwcullop committed Oct 26, 2023
1 parent b01fe12 commit b30102f
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 1 deletion.
98 changes: 98 additions & 0 deletions src/DynamicData.Tests/List/MergeManyFixture.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

using FluentAssertions;
Expand Down Expand Up @@ -72,6 +73,93 @@ public void RemovedItemWillNotCauseInvocation()
stream.Dispose();
}

/// <summary>
/// Merged stream does not complete if a child stream is still active.
/// </summary>
[Fact]
public void MergedStreamDoesNotCompleteWhileItemStreamActive()
{
var streamCompleted = false;
var sourceCompleted = false;

var item = new ObjectWithObservable(1);
_source.Add(item);

using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true)
.MergeMany(o => o.Observable).Subscribe(_ => { }, () => streamCompleted = true);

_source.Dispose();

sourceCompleted.Should().BeTrue();
streamCompleted.Should().BeFalse();
}

/// <summary>
/// Stream completes only when source and all child are complete.
/// </summary>
[Fact]
public void MergedStreamCompletesWhenSourceAndItemsComplete()
{
var streamCompleted = false;
var sourceCompleted = false;

var item = new ObjectWithObservable(1);
_source.Add(item);

using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true)
.MergeMany(o => o.Observable).Subscribe(_ => { }, () => streamCompleted = true);

_source.Dispose();
item.CompleteObservable();

sourceCompleted.Should().BeTrue();
streamCompleted.Should().BeTrue();
}

/// <summary>
/// Stream completes even if one of the children fails.
/// </summary>
[Fact]
public void MergedStreamCompletesIfLastItemFails()
{
var receivedError = default(Exception);
var streamCompleted = false;
var sourceCompleted = false;

var item = new ObjectWithObservable(1);
_source.Add(item);

using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true)
.MergeMany(o => o.Observable).Subscribe(_ => { }, err => receivedError = err, () => streamCompleted = true);

_source.Dispose();
item.FailObservable(new Exception("Test exception"));

receivedError.Should().Be(default);
sourceCompleted.Should().BeTrue();
streamCompleted.Should().BeTrue();
}

/// <summary>
/// If the source stream has an error, the merged steam should also.
/// </summary>
[Fact]
public void MergedStreamFailsWhenSourceFails()
{
var receivedError = default(Exception);
var expectedError = new Exception("Test exception");
var throwObservable = Observable.Throw<IChangeSet<ObjectWithObservable>>(expectedError);
var stream = _source.Connect().Concat(throwObservable)
.MergeMany(o => o.Observable).Subscribe(_ => { }, err => receivedError = err);

var item = new ObjectWithObservable(1);
_source.Add(item);

_source.Dispose();

receivedError.Should().Be(expectedError);
}

private class ObjectWithObservable
{
private readonly ISubject<bool> _changed = new Subject<bool>();
Expand All @@ -87,6 +175,16 @@ public ObjectWithObservable(int id)

public IObservable<bool> Observable => _changed;

public void CompleteObservable()
{
_changed.OnCompleted();
}

public void FailObservable(Exception ex)
{
_changed.OnError(ex);
}

public void InvokeObservable(bool value)
{
_value = value;
Expand Down
39 changes: 38 additions & 1 deletion src/DynamicData/List/Internal/MergeMany.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
// See the LICENSE file in the project root for full license information.

using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace DynamicData.List.Internal;

Expand All @@ -25,8 +27,43 @@ public IObservable<TDestination> Run()
return Observable.Create<TDestination>(
observer =>
{
var counter = new SubscriptionCounter();
var locker = new object();
return _source.SubscribeMany(t => _observableSelector(t).Synchronize(locker).Subscribe(observer.OnNext)).Subscribe(_ => { }, observer.OnError);
var disposable = _source.Concat(counter.DeferCleanup)
.SubscribeMany(t =>
{
counter.Added();
return _observableSelector(t).Synchronize(locker).Finally(() => counter.Finally()).Subscribe(observer.OnNext, _ => { }, () => { });
})
.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
return new CompositeDisposable(disposable, counter);
});
}

private sealed class SubscriptionCounter : IDisposable
{
private readonly Subject<IChangeSet<T>> _subject = new();
private int _subscriptionCount = 1;

public IObservable<IChangeSet<T>> DeferCleanup => Observable.Defer(() =>
{
CheckCompleted();
return _subject.AsObservable();
});

public void Added() => _ = Interlocked.Increment(ref _subscriptionCount);

public void Finally() => CheckCompleted();

public void Dispose() => _subject.Dispose();

private void CheckCompleted()
{
if (Interlocked.Decrement(ref _subscriptionCount) == 0)
{
_subject.OnCompleted();
}
}
}
}

0 comments on commit b30102f

Please sign in to comment.