Skip to content

Commit

Permalink
Fixed that .DisposeMany() was not disposing items after downstream-…
Browse files Browse the repository at this point in the history
…teardown of the stream, I.E. unsubscription. (#761)
  • Loading branch information
JakenVeina authored Nov 21, 2023
1 parent fd9083f commit 2a2757d
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 63 deletions.
17 changes: 17 additions & 0 deletions src/DynamicData.Tests/Cache/DisposeManyFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,23 @@ public void RemainingItemsAreDisposedAfterError()
_results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("items remaining in the list should be disposed");
}

[Fact]
public void RemainingItemsAreDisposedAfterUnsubscription()
{
var items = new[]
{
new DisposableObject(1),
new DisposableObject(2),
new DisposableObject(3)
};

_itemsSource.AddOrUpdate(items);

_results.Dispose();

items.All(item => item.IsDisposed).Should().BeTrue("Items remaining in the list should be disposed");
}

private class DisposableObject : IDisposable
{
public DisposableObject(int id)
Expand Down
17 changes: 17 additions & 0 deletions src/DynamicData.Tests/List/DisposeManyFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ public void RemainingItemsAreDisposedAfterError()
_results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("Items remaining in the list should be disposed");
}

[Fact]
public void RemainingItemsAreDisposedAfterUnsubscription()
{
var items = new[]
{
new DisposableObject(1),
new DisposableObject(2),
new DisposableObject(3)
};

_itemsSource.AddRange(items);

_results.Dispose();

items.All(item => item.IsDisposed).Should().BeTrue("Items remaining in the list should be disposed");
}

private class DisposableObject : IDisposable
{
public DisposableObject(int id)
Expand Down
64 changes: 38 additions & 26 deletions src/DynamicData/Cache/Internal/DisposeMany.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for full license information.

using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace DynamicData.Cache.Internal;
Expand All @@ -16,42 +17,53 @@ internal sealed class DisposeMany<TObject, TKey>(IObservable<IChangeSet<TObject,
public IObservable<IChangeSet<TObject, TKey>> Run()
=> Observable.Create<IChangeSet<TObject, TKey>>(observer =>
{
// Will be locking on cachedItems directly, instead of using an anonymous gate object. This is acceptable, since it's a privately-held object, there's no risk of deadlock from other consumers locking on it.
var cachedItems = new Dictionary<TKey, TObject>();

return _source.SubscribeSafe(Observer.Create<IChangeSet<TObject, TKey>>(
onNext: changeSet =>
{
observer.OnNext(changeSet);

foreach (var change in changeSet.ToConcreteType())
var sourceSubscription = _source
.Synchronize(cachedItems)
.SubscribeSafe(Observer.Create<IChangeSet<TObject, TKey>>(
onNext: changeSet =>
{
switch (change.Reason)
observer.OnNext(changeSet);

foreach (var change in changeSet.ToConcreteType())
{
case ChangeReason.Update:
if (change.Previous.HasValue && !EqualityComparer<TObject>.Default.Equals(change.Current, change.Previous.Value))
(change.Previous.Value as IDisposable)?.Dispose();
break;
switch (change.Reason)
{
case ChangeReason.Update:
if (change.Previous.HasValue && !EqualityComparer<TObject>.Default.Equals(change.Current, change.Previous.Value))
(change.Previous.Value as IDisposable)?.Dispose();
break;

case ChangeReason.Remove:
(change.Current as IDisposable)?.Dispose();
break;
case ChangeReason.Remove:
(change.Current as IDisposable)?.Dispose();
break;
}
}
}

cachedItems.Clone(changeSet);
},
onError: error =>
{
observer.OnError(error);
cachedItems.Clone(changeSet);
},
onError: error =>
{
observer.OnError(error);

ProcessFinalization(cachedItems);
},
onCompleted: () =>
{
observer.OnCompleted();
ProcessFinalization(cachedItems);
},
onCompleted: () =>
{
observer.OnCompleted();

ProcessFinalization(cachedItems);
}));

return Disposable.Create(() =>
{
sourceSubscription.Dispose();

lock (cachedItems)
ProcessFinalization(cachedItems);
}));
});
});

private static void ProcessFinalization(Dictionary<TKey, TObject> cachedItems)
Expand Down
86 changes: 49 additions & 37 deletions src/DynamicData/List/Internal/DisposeMany.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for full license information.

using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace DynamicData.List.Internal;
Expand All @@ -18,52 +19,63 @@ public DisposeMany(IObservable<IChangeSet<T>> source)
public IObservable<IChangeSet<T>> Run()
=> Observable.Create<IChangeSet<T>>(observer =>
{
// Will be locking on cachedItems directly, instead of using an anonymous gate object. This is acceptable, since it's a privately-held object, there's no risk of deadlock from other consumers locking on it.
var cachedItems = new List<T>();

return _source.SubscribeSafe(Observer.Create<IChangeSet<T>>(
onNext: changeSet =>
{
observer.OnNext(changeSet);

foreach (var change in changeSet)
var sourceSubscription = _source
.Synchronize(cachedItems)
.SubscribeSafe(Observer.Create<IChangeSet<T>>(
onNext: changeSet =>
{
switch (change.Reason)
observer.OnNext(changeSet);

foreach (var change in changeSet)
{
case ListChangeReason.Clear:
foreach (var item in cachedItems)
(item as IDisposable)?.Dispose();
break;

case ListChangeReason.Remove:
(change.Item.Current as IDisposable)?.Dispose();
break;

case ListChangeReason.RemoveRange:
foreach (var item in change.Range)
(item as IDisposable)?.Dispose();
break;

case ListChangeReason.Replace:
if (change.Item.Previous.HasValue)
(change.Item.Previous.Value as IDisposable)?.Dispose();
break;
switch (change.Reason)
{
case ListChangeReason.Clear:
foreach (var item in cachedItems)
(item as IDisposable)?.Dispose();
break;

case ListChangeReason.Remove:
(change.Item.Current as IDisposable)?.Dispose();
break;

case ListChangeReason.RemoveRange:
foreach (var item in change.Range)
(item as IDisposable)?.Dispose();
break;

case ListChangeReason.Replace:
if (change.Item.Previous.HasValue)
(change.Item.Previous.Value as IDisposable)?.Dispose();
break;
}
}
}

cachedItems.Clone(changeSet);
},
onError: error =>
{
observer.OnError(error);
cachedItems.Clone(changeSet);
},
onError: error =>
{
observer.OnError(error);

ProcessFinalization(cachedItems);
},
onCompleted: () =>
{
observer.OnCompleted();
ProcessFinalization(cachedItems);
},
onCompleted: () =>
{
observer.OnCompleted();

ProcessFinalization(cachedItems);
}));

return Disposable.Create(() =>
{
sourceSubscription.Dispose();

lock (cachedItems)
ProcessFinalization(cachedItems);
}));
});
});

private static void ProcessFinalization(List<T> cachedItems)
Expand Down

0 comments on commit 2a2757d

Please sign in to comment.