diff --git a/src/DynamicData.Tests/List/MergeManyFixture.cs b/src/DynamicData.Tests/List/MergeManyFixture.cs index 03296d88a..bda676261 100644 --- a/src/DynamicData.Tests/List/MergeManyFixture.cs +++ b/src/DynamicData.Tests/List/MergeManyFixture.cs @@ -1,4 +1,5 @@ using System; +using System.Reactive.Linq; using System.Reactive.Subjects; using FluentAssertions; @@ -72,6 +73,93 @@ public void RemovedItemWillNotCauseInvocation() stream.Dispose(); } + /// + /// Merged stream does not complete if a child stream is still active. + /// + [Fact] + public void MergedStreamDoesNotCompleteWhileItemStreamActive() + { + var streamCompleted = false; + var sourceCompleted = false; + + var item = new ObjectWithObservable(1); + _source.Add(item); + + using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true) + .MergeMany(o => o.Observable).Subscribe(_ => { }, () => streamCompleted = true); + + _source.Dispose(); + + sourceCompleted.Should().BeTrue(); + streamCompleted.Should().BeFalse(); + } + + /// + /// Stream completes only when source and all child are complete. + /// + [Fact] + public void MergedStreamCompletesWhenSourceAndItemsComplete() + { + var streamCompleted = false; + var sourceCompleted = false; + + var item = new ObjectWithObservable(1); + _source.Add(item); + + using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true) + .MergeMany(o => o.Observable).Subscribe(_ => { }, () => streamCompleted = true); + + _source.Dispose(); + item.CompleteObservable(); + + sourceCompleted.Should().BeTrue(); + streamCompleted.Should().BeTrue(); + } + + /// + /// Stream completes even if one of the children fails. + /// + [Fact] + public void MergedStreamCompletesIfLastItemFails() + { + var receivedError = default(Exception); + var streamCompleted = false; + var sourceCompleted = false; + + var item = new ObjectWithObservable(1); + _source.Add(item); + + using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true) + .MergeMany(o => o.Observable).Subscribe(_ => { }, err => receivedError = err, () => streamCompleted = true); + + _source.Dispose(); + item.FailObservable(new Exception("Test exception")); + + receivedError.Should().Be(default); + sourceCompleted.Should().BeTrue(); + streamCompleted.Should().BeTrue(); + } + + /// + /// If the source stream has an error, the merged steam should also. + /// + [Fact] + public void MergedStreamFailsWhenSourceFails() + { + var receivedError = default(Exception); + var expectedError = new Exception("Test exception"); + var throwObservable = Observable.Throw>(expectedError); + var stream = _source.Connect().Concat(throwObservable) + .MergeMany(o => o.Observable).Subscribe(_ => { }, err => receivedError = err); + + var item = new ObjectWithObservable(1); + _source.Add(item); + + _source.Dispose(); + + receivedError.Should().Be(expectedError); + } + private class ObjectWithObservable { private readonly ISubject _changed = new Subject(); @@ -87,6 +175,16 @@ public ObjectWithObservable(int id) public IObservable Observable => _changed; + public void CompleteObservable() + { + _changed.OnCompleted(); + } + + public void FailObservable(Exception ex) + { + _changed.OnError(ex); + } + public void InvokeObservable(bool value) { _value = value; diff --git a/src/DynamicData/List/Internal/MergeMany.cs b/src/DynamicData/List/Internal/MergeMany.cs index 157713577..2232b9c49 100644 --- a/src/DynamicData/List/Internal/MergeMany.cs +++ b/src/DynamicData/List/Internal/MergeMany.cs @@ -3,7 +3,9 @@ // See the LICENSE file in the project root for full license information. using System; +using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; namespace DynamicData.List.Internal; @@ -25,8 +27,43 @@ public IObservable Run() return Observable.Create( observer => { + var counter = new SubscriptionCounter(); var locker = new object(); - return _source.SubscribeMany(t => _observableSelector(t).Synchronize(locker).Subscribe(observer.OnNext)).Subscribe(_ => { }, observer.OnError); + var disposable = _source.Concat(counter.DeferCleanup) + .SubscribeMany(t => + { + counter.Added(); + return _observableSelector(t).Synchronize(locker).Finally(() => counter.Finally()).Subscribe(observer.OnNext, _ => { }, () => { }); + }) + .Subscribe(_ => { }, observer.OnError, observer.OnCompleted); + + return new CompositeDisposable(disposable, counter); }); } + + private sealed class SubscriptionCounter : IDisposable + { + private readonly Subject> _subject = new(); + private int _subscriptionCount = 1; + + public IObservable> DeferCleanup => Observable.Defer(() => + { + CheckCompleted(); + return _subject.AsObservable(); + }); + + public void Added() => _ = Interlocked.Increment(ref _subscriptionCount); + + public void Finally() => CheckCompleted(); + + public void Dispose() => _subject.Dispose(); + + private void CheckCompleted() + { + if (Interlocked.Decrement(ref _subscriptionCount) == 0) + { + _subject.OnCompleted(); + } + } + } }