Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintenance: Centralized Value for Default IScheduler instance #862

Merged
merged 6 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/DynamicData/Cache/Internal/AutoRefresh.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;

namespace DynamicData.Cache.Internal;

Expand All @@ -14,7 +15,7 @@ internal sealed class AutoRefresh<TObject, TKey, TAny>(IObservable<IChangeSet<TO
{
private readonly Func<TObject, TKey, IObservable<TAny>> _reEvaluator = reEvaluator ?? throw new ArgumentNullException(nameof(reEvaluator));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? Defaults.Scheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
3 changes: 2 additions & 1 deletion src/DynamicData/Cache/Internal/BatchIf.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;

namespace DynamicData.Cache.Internal;

Expand All @@ -15,7 +16,7 @@ internal sealed class BatchIf<TObject, TKey>(IObservable<IChangeSet<TObject, TKe
{
private readonly IObservable<bool> _pauseIfTrueSelector = pauseIfTrueSelector ?? throw new ArgumentNullException(nameof(pauseIfTrueSelector));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? Defaults.Scheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
4 changes: 2 additions & 2 deletions src/DynamicData/Cache/Internal/GroupOnProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;
Expand All @@ -28,7 +28,7 @@ public IObservable<IGroupChangeSet<TObject, TKey, TGroup>> Run() => _source.Publ
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Defaults.Scheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;
Expand All @@ -17,7 +17,7 @@ internal sealed class GroupOnPropertyWithImmutableState<TObject, TKey, TGroup>(I
where TGroup : notnull
{
private readonly Func<TObject, TGroup> _groupSelector = groupSelectorKey.Compile();
private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? Defaults.Scheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
5 changes: 3 additions & 2 deletions src/DynamicData/Cache/Internal/ToObservableChangeSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;

namespace DynamicData.Cache.Internal;

Expand All @@ -29,7 +30,7 @@ public ToObservableChangeSet(
_expireAfter = expireAfter;
_keySelector = keySelector;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? Defaults.Scheduler;

_source = Observable.Create<IEnumerable<TObject>>(observer =>
{
Expand Down Expand Up @@ -57,7 +58,7 @@ public ToObservableChangeSet(
_expireAfter = expireAfter;
_keySelector = keySelector;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? Defaults.Scheduler;
_source = source;
}

Expand Down
19 changes: 10 additions & 9 deletions src/DynamicData/Cache/ObservableCacheEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using DynamicData.Binding;
using DynamicData.Cache;
using DynamicData.Cache.Internal;
using DynamicData.Internal;
using DynamicData.Kernel;

// ReSharper disable once CheckNamespace
Expand Down Expand Up @@ -324,7 +325,7 @@ public static IObservable<IChangeSet<TObject, TKey>> AutoRefresh<TObject, TKey>(
return t.WhenAnyPropertyChanged();
}

return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? Defaults.Scheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -356,7 +357,7 @@ public static IObservable<IChangeSet<TObject, TKey>> AutoRefresh<TObject, TKey,
return t.WhenPropertyChanged(propertyAccessor, false);
}

return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? Defaults.Scheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -416,7 +417,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Batch<TObject, TKey>(this I
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return source.Buffer(timeSpan, scheduler ?? Scheduler.Default).FlattenBufferResult();
return source.Buffer(timeSpan, scheduler ?? Defaults.Scheduler).FlattenBufferResult();
}

/// <summary>
Expand Down Expand Up @@ -832,7 +833,7 @@ public static IObservable<IChangeSet<TObject, TKey>> BufferInitial<TObject, TKey
where TKey : notnull => source.DeferUntilLoaded().Publish(
shared =>
{
var initial = shared.Buffer(initialBuffer, scheduler ?? Scheduler.Default).FlattenBufferResult().Take(1);
var initial = shared.Buffer(initialBuffer, scheduler ?? Defaults.Scheduler).FlattenBufferResult().Take(1);

return initial.Concat(shared);
});
Expand Down Expand Up @@ -1357,7 +1358,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Except<TObject, TKey>(this
/// </exception>
public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TimeSpan?> timeSelector)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, Defaults.Scheduler);

/// <summary>
/// Automatically removes items from the stream after the time specified by
Expand Down Expand Up @@ -1401,7 +1402,7 @@ public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(
/// timeSelector.</exception>
public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TimeSpan?> timeSelector, TimeSpan? pollingInterval)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, pollingInterval, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, pollingInterval, Defaults.Scheduler);

/// <summary>
/// Automatically removes items from the stream on the next poll after the time specified by
Expand Down Expand Up @@ -1463,7 +1464,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<
/// timeSelector.</exception>
public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<TObject, TKey>(this ISourceCache<TObject, TKey> source, Func<TObject, TimeSpan?> timeSelector, TimeSpan? interval = null)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, interval, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, interval, Defaults.Scheduler);

/// <summary>
/// Ensures there are no duplicated keys in the observable changeset.
Expand Down Expand Up @@ -1504,7 +1505,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<
source.ThrowArgumentNullExceptionIfNull(nameof(source));
timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector));

scheduler ??= Scheduler.Default;
scheduler ??= Defaults.Scheduler;

return Observable.Create<IEnumerable<KeyValuePair<TKey, TObject>>>(
observer => source.Connect().ForExpiry(timeSelector, pollingInterval, scheduler).Finally(observer.OnCompleted).Subscribe(
Expand Down Expand Up @@ -2499,7 +2500,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> LimitSizeTo<
long orderItemWasAdded = -1;
var sizeLimiter = new SizeLimiter<TObject, TKey>(sizeLimit);

return source.Connect().Finally(observer.OnCompleted).ObserveOn(scheduler ?? Scheduler.Default).Transform((t, v) => new ExpirableItem<TObject, TKey>(t, v, DateTime.Now, Interlocked.Increment(ref orderItemWasAdded))).Select(sizeLimiter.CloneAndReturnExpiredOnly).Where(expired => expired.Length != 0).Subscribe(
return source.Connect().Finally(observer.OnCompleted).ObserveOn(scheduler ?? Defaults.Scheduler).Transform((t, v) => new ExpirableItem<TObject, TKey>(t, v, DateTime.Now, Interlocked.Increment(ref orderItemWasAdded))).Select(sizeLimiter.CloneAndReturnExpiredOnly).Where(expired => expired.Length != 0).Subscribe(
toRemove =>
{
try
Expand Down
3 changes: 2 additions & 1 deletion src/DynamicData/Experimental/ExperimentalEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for full license information.

using System.Reactive.Concurrency;
using DynamicData.Internal;

namespace DynamicData.Experimental;

Expand All @@ -26,6 +27,6 @@ public static IWatcher<TObject, TKey> AsWatcher<TObject, TKey>(this IObservable<
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return new Watcher<TObject, TKey>(source, scheduler ?? Scheduler.Default);
return new Watcher<TObject, TKey>(source, scheduler ?? Defaults.Scheduler);
}
}
12 changes: 12 additions & 0 deletions src/DynamicData/Internal/Defaults.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive.Concurrency;

namespace DynamicData.Internal;

internal static class Defaults
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RolandPheasant Do you think this should be a public thing so that consumers can change the value if they want to? We could call it DynamicData.GlobalConfig with this as a property or something.

It does sound like a foot-gun though. Maybe it should just be some enum that allows us to ensure it only gets set to schedulers that will work. Then again, maybe they have a reason for doing everything on the Intermediate Scheduler... Who can say?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect in this case, making it internal would probably be sufficient and it allows up to change our mind and give the consumers the best choice out of the box.

Also when I added the binding options I accidentally added the global options class in the binding namespace. That should be moved to the root namespace. It would technically be a breaking change, but I'd suggest we should not increase the bug version as it would be correcting a mistake for a bit of code for which there's probably only one or two users.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny you should ask... I just moved it to the main namespace.

Are you asking me to move the BindingOptions as part of this PR? They can't go in the same class if one is public and the other is internal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it to the main namespace and gave it a better name but kept it as internal. If you want, I can move the DynamicDataOptions as well (but it might be better as another PR).

Then later we can discuss adding a configurable Default Scheduler value to it...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RolandPheasant See #864 for those changes.

{
public static IScheduler Scheduler { get; } = TaskPoolScheduler.Default;
}
4 changes: 2 additions & 2 deletions src/DynamicData/List/Internal/AutoRefresh.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.List.Internal;
Expand All @@ -32,7 +32,7 @@ public IObservable<IChangeSet<TObject>> Run() => Observable.Create<IChangeSet<TO
// create a change set, either buffered or one item at the time
IObservable<IEnumerable<TObject>> itemsChanged = buffer is null ?
itemHasChanged.Select(t => new[] { t }) :
itemHasChanged.Buffer(buffer.Value, scheduler ?? Scheduler.Default).Where(list => list.Count > 0);
itemHasChanged.Buffer(buffer.Value, scheduler ?? Defaults.Scheduler).Where(list => list.Count > 0);

IObservable<IChangeSet<TObject>> requiresRefresh = itemsChanged.Synchronize(locker).Select(
items => // catch all the indices of items which have been refreshed
Expand Down
3 changes: 2 additions & 1 deletion src/DynamicData/List/Internal/BufferIf.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using DynamicData.Internal;

namespace DynamicData.List.Internal;

Expand All @@ -14,7 +15,7 @@ internal sealed class BufferIf<T>(IObservable<IChangeSet<T>> source, IObservable
{
private readonly IObservable<bool> _pauseIfTrueSelector = pauseIfTrueSelector ?? throw new ArgumentNullException(nameof(pauseIfTrueSelector));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? Defaults.Scheduler;

private readonly IObservable<IChangeSet<T>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
3 changes: 2 additions & 1 deletion src/DynamicData/List/Internal/FilterOnObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;

namespace DynamicData.List.Internal;

Expand All @@ -31,7 +32,7 @@ public IObservable<IChangeSet<TObject>> Run() => Observable.Create<IChangeSet<TO
// create a change set, either buffered or one item at the time
var itemsChanged = buffer is null ?
itemHasChanged.Select(t => new[] { t }) :
itemHasChanged.Buffer(buffer.Value, scheduler ?? Scheduler.Default).Where(list => list.Count > 0);
itemHasChanged.Buffer(buffer.Value, scheduler ?? Defaults.Scheduler).Where(list => list.Count > 0);

var requiresRefresh = itemsChanged.Synchronize(locker).Select(
items => // catch all the indices of items which have been refreshed
Expand Down
4 changes: 2 additions & 2 deletions src/DynamicData/List/Internal/GroupOnProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.List.Internal;
Expand All @@ -27,7 +27,7 @@ public IObservable<IChangeSet<IGroup<TObject, TGroup>>> Run() => _source.Publish
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Defaults.Scheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.List.Internal;
Expand All @@ -27,7 +27,7 @@ public IObservable<IChangeSet<IGrouping<TObject, TGroup>>> Run() => _source.Publ
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Defaults.Scheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
5 changes: 3 additions & 2 deletions src/DynamicData/List/Internal/ToObservableChangeSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;

namespace DynamicData.List.Internal;

Expand All @@ -25,7 +26,7 @@ public ToObservableChangeSet(
{
_expireAfter = expireAfter;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? Defaults.Scheduler;

_source = Observable.Create<IEnumerable<TObject>>(observer =>
{
Expand All @@ -51,7 +52,7 @@ public ToObservableChangeSet(
{
_expireAfter = expireAfter;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? Defaults.Scheduler;
_source = source;
}

Expand Down
11 changes: 6 additions & 5 deletions src/DynamicData/List/ObservableListEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.Internal;
using DynamicData.Kernel;
using DynamicData.List.Internal;
using DynamicData.List.Linq;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static IObservable<IChangeSet<TObject>> AutoRefresh<TObject>(this IObserv
return t.WhenAnyPropertyChanged();
}

return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? Defaults.Scheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -215,7 +216,7 @@ public static IObservable<IChangeSet<TObject>> AutoRefresh<TObject, TProperty>(t
return t.WhenPropertyChanged(propertyAccessor, false);
}

return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? Defaults.Scheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -442,7 +443,7 @@ public static IObservable<IChangeSet<TObject>> BufferInitial<TObject>(this IObse
where TObject : notnull => source.DeferUntilLoaded().Publish(
shared =>
{
var initial = shared.Buffer(initialBuffer, scheduler ?? Scheduler.Default).FlattenBufferResult().Take(1);
var initial = shared.Buffer(initialBuffer, scheduler ?? Defaults.Scheduler).FlattenBufferResult().Take(1);

return initial.Concat(shared);
});
Expand Down Expand Up @@ -679,7 +680,7 @@ public static IObservable<IEnumerable<T>> ExpireAfter<T>(this ISourceList<T> sou
timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector));

var locker = new object();
var limiter = new ExpireAfter<T>(source, timeSelector, pollingInterval, scheduler ?? Scheduler.Default, locker);
var limiter = new ExpireAfter<T>(source, timeSelector, pollingInterval, scheduler ?? Defaults.Scheduler, locker);

return limiter.Run().Synchronize(locker).Do(source.RemoveMany);
}
Expand Down Expand Up @@ -930,7 +931,7 @@ public static IObservable<IEnumerable<T>> LimitSizeTo<T>(this ISourceList<T> sou
}

var locker = new object();
var limiter = new LimitSizeTo<T>(source, sizeLimit, scheduler ?? Scheduler.Default, locker);
var limiter = new LimitSizeTo<T>(source, sizeLimit, scheduler ?? Defaults.Scheduler, locker);

return limiter.Run().Synchronize(locker).Do(source.RemoveMany);
}
Expand Down