Skip to content

Commit

Permalink
Maintenance: Centralized Value for Default IScheduler instance (#862)
Browse files Browse the repository at this point in the history
* Use centralized constant for Default IScheduler value
  • Loading branch information
dwcullop authored Feb 22, 2024
1 parent 1e0e11b commit ef1e8c4
Show file tree
Hide file tree
Showing 15 changed files with 43 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/AutoRefresh.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal sealed class AutoRefresh<TObject, TKey, TAny>(IObservable<IChangeSet<TO
{
private readonly Func<TObject, TKey, IObservable<TAny>> _reEvaluator = reEvaluator ?? throw new ArgumentNullException(nameof(reEvaluator));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/BatchIf.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal sealed class BatchIf<TObject, TKey>(IObservable<IChangeSet<TObject, TKe
{
private readonly IObservable<bool> _pauseIfTrueSelector = pauseIfTrueSelector ?? throw new ArgumentNullException(nameof(pauseIfTrueSelector));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
3 changes: 1 addition & 2 deletions src/DynamicData/Cache/Internal/GroupOnProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;
Expand All @@ -28,7 +27,7 @@ public IObservable<IGroupChangeSet<TObject, TKey, TGroup>> Run() => _source.Publ
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;
Expand All @@ -17,7 +16,7 @@ internal sealed class GroupOnPropertyWithImmutableState<TObject, TKey, TGroup>(I
where TGroup : notnull
{
private readonly Func<TObject, TGroup> _groupSelector = groupSelectorKey.Compile();
private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
4 changes: 2 additions & 2 deletions src/DynamicData/Cache/Internal/ToObservableChangeSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ToObservableChangeSet(
_expireAfter = expireAfter;
_keySelector = keySelector;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

_source = Observable.Create<IEnumerable<TObject>>(observer =>
{
Expand Down Expand Up @@ -57,7 +57,7 @@ public ToObservableChangeSet(
_expireAfter = expireAfter;
_keySelector = keySelector;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? GlobalConfig.DefaultScheduler;
_source = source;
}

Expand Down
20 changes: 11 additions & 9 deletions src/DynamicData/Cache/ObservableCacheEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Runtime.CompilerServices;
using DynamicData;
using DynamicData.Binding;
using DynamicData.Cache;
using DynamicData.Cache.Internal;
using DynamicData.Internal;
using DynamicData.Kernel;

// ReSharper disable once CheckNamespace
Expand Down Expand Up @@ -324,7 +326,7 @@ public static IObservable<IChangeSet<TObject, TKey>> AutoRefresh<TObject, TKey>(
return t.WhenAnyPropertyChanged();
}

return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -356,7 +358,7 @@ public static IObservable<IChangeSet<TObject, TKey>> AutoRefresh<TObject, TKey,
return t.WhenPropertyChanged(propertyAccessor, false);
}

return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -416,7 +418,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Batch<TObject, TKey>(this I
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return source.Buffer(timeSpan, scheduler ?? Scheduler.Default).FlattenBufferResult();
return source.Buffer(timeSpan, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult();
}

/// <summary>
Expand Down Expand Up @@ -832,7 +834,7 @@ public static IObservable<IChangeSet<TObject, TKey>> BufferInitial<TObject, TKey
where TKey : notnull => source.DeferUntilLoaded().Publish(
shared =>
{
var initial = shared.Buffer(initialBuffer, scheduler ?? Scheduler.Default).FlattenBufferResult().Take(1);
var initial = shared.Buffer(initialBuffer, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult().Take(1);

return initial.Concat(shared);
});
Expand Down Expand Up @@ -1357,7 +1359,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Except<TObject, TKey>(this
/// </exception>
public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TimeSpan?> timeSelector)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, GlobalConfig.DefaultScheduler);

/// <summary>
/// Automatically removes items from the stream after the time specified by
Expand Down Expand Up @@ -1401,7 +1403,7 @@ public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(
/// timeSelector.</exception>
public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TimeSpan?> timeSelector, TimeSpan? pollingInterval)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, pollingInterval, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, pollingInterval, GlobalConfig.DefaultScheduler);

/// <summary>
/// Automatically removes items from the stream on the next poll after the time specified by
Expand Down Expand Up @@ -1463,7 +1465,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<
/// timeSelector.</exception>
public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<TObject, TKey>(this ISourceCache<TObject, TKey> source, Func<TObject, TimeSpan?> timeSelector, TimeSpan? interval = null)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, interval, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, interval, GlobalConfig.DefaultScheduler);

/// <summary>
/// Ensures there are no duplicated keys in the observable changeset.
Expand Down Expand Up @@ -1504,7 +1506,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<
source.ThrowArgumentNullExceptionIfNull(nameof(source));
timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector));

scheduler ??= Scheduler.Default;
scheduler ??= GlobalConfig.DefaultScheduler;

return Observable.Create<IEnumerable<KeyValuePair<TKey, TObject>>>(
observer => source.Connect().ForExpiry(timeSelector, pollingInterval, scheduler).Finally(observer.OnCompleted).Subscribe(
Expand Down Expand Up @@ -2499,7 +2501,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> LimitSizeTo<
long orderItemWasAdded = -1;
var sizeLimiter = new SizeLimiter<TObject, TKey>(sizeLimit);

return source.Connect().Finally(observer.OnCompleted).ObserveOn(scheduler ?? Scheduler.Default).Transform((t, v) => new ExpirableItem<TObject, TKey>(t, v, DateTime.Now, Interlocked.Increment(ref orderItemWasAdded))).Select(sizeLimiter.CloneAndReturnExpiredOnly).Where(expired => expired.Length != 0).Subscribe(
return source.Connect().Finally(observer.OnCompleted).ObserveOn(scheduler ?? GlobalConfig.DefaultScheduler).Transform((t, v) => new ExpirableItem<TObject, TKey>(t, v, DateTime.Now, Interlocked.Increment(ref orderItemWasAdded))).Select(sizeLimiter.CloneAndReturnExpiredOnly).Where(expired => expired.Length != 0).Subscribe(
toRemove =>
{
try
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/Experimental/ExperimentalEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public static IWatcher<TObject, TKey> AsWatcher<TObject, TKey>(this IObservable<
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return new Watcher<TObject, TKey>(source, scheduler ?? Scheduler.Default);
return new Watcher<TObject, TKey>(source, scheduler ?? GlobalConfig.DefaultScheduler);
}
}
12 changes: 12 additions & 0 deletions src/DynamicData/GlobalConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive.Concurrency;

namespace DynamicData;

internal static class GlobalConfig
{
public static IScheduler DefaultScheduler => TaskPoolScheduler.Default;
}
2 changes: 1 addition & 1 deletion src/DynamicData/List/Internal/AutoRefresh.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public IObservable<IChangeSet<TObject>> Run() => Observable.Create<IChangeSet<TO
// create a change set, either buffered or one item at the time
IObservable<IEnumerable<TObject>> itemsChanged = buffer is null ?
itemHasChanged.Select(t => new[] { t }) :
itemHasChanged.Buffer(buffer.Value, scheduler ?? Scheduler.Default).Where(list => list.Count > 0);
itemHasChanged.Buffer(buffer.Value, scheduler ?? GlobalConfig.DefaultScheduler).Where(list => list.Count > 0);

IObservable<IChangeSet<TObject>> requiresRefresh = itemsChanged.Synchronize(locker).Select(
items => // catch all the indices of items which have been refreshed
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/List/Internal/BufferIf.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal sealed class BufferIf<T>(IObservable<IChangeSet<T>> source, IObservable
{
private readonly IObservable<bool> _pauseIfTrueSelector = pauseIfTrueSelector ?? throw new ArgumentNullException(nameof(pauseIfTrueSelector));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

private readonly IObservable<IChangeSet<T>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/List/Internal/FilterOnObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public IObservable<IChangeSet<TObject>> Run() => Observable.Create<IChangeSet<TO
// create a change set, either buffered or one item at the time
var itemsChanged = buffer is null ?
itemHasChanged.Select(t => new[] { t }) :
itemHasChanged.Buffer(buffer.Value, scheduler ?? Scheduler.Default).Where(list => list.Count > 0);
itemHasChanged.Buffer(buffer.Value, scheduler ?? GlobalConfig.DefaultScheduler).Where(list => list.Count > 0);

var requiresRefresh = itemsChanged.Synchronize(locker).Select(
items => // catch all the indices of items which have been refreshed
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/List/Internal/GroupOnProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public IObservable<IChangeSet<IGroup<TObject, TGroup>>> Run() => _source.Publish
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public IObservable<IChangeSet<IGrouping<TObject, TGroup>>> Run() => _source.Publ
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
4 changes: 2 additions & 2 deletions src/DynamicData/List/Internal/ToObservableChangeSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public ToObservableChangeSet(
{
_expireAfter = expireAfter;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

_source = Observable.Create<IEnumerable<TObject>>(observer =>
{
Expand All @@ -51,7 +51,7 @@ public ToObservableChangeSet(
{
_expireAfter = expireAfter;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? GlobalConfig.DefaultScheduler;
_source = source;
}

Expand Down
11 changes: 6 additions & 5 deletions src/DynamicData/List/ObservableListEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;

using DynamicData;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.Kernel;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static IObservable<IChangeSet<TObject>> AutoRefresh<TObject>(this IObserv
return t.WhenAnyPropertyChanged();
}

return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -215,7 +216,7 @@ public static IObservable<IChangeSet<TObject>> AutoRefresh<TObject, TProperty>(t
return t.WhenPropertyChanged(propertyAccessor, false);
}

return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -442,7 +443,7 @@ public static IObservable<IChangeSet<TObject>> BufferInitial<TObject>(this IObse
where TObject : notnull => source.DeferUntilLoaded().Publish(
shared =>
{
var initial = shared.Buffer(initialBuffer, scheduler ?? Scheduler.Default).FlattenBufferResult().Take(1);
var initial = shared.Buffer(initialBuffer, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult().Take(1);

return initial.Concat(shared);
});
Expand Down Expand Up @@ -679,7 +680,7 @@ public static IObservable<IEnumerable<T>> ExpireAfter<T>(this ISourceList<T> sou
timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector));

var locker = new object();
var limiter = new ExpireAfter<T>(source, timeSelector, pollingInterval, scheduler ?? Scheduler.Default, locker);
var limiter = new ExpireAfter<T>(source, timeSelector, pollingInterval, scheduler ?? GlobalConfig.DefaultScheduler, locker);

return limiter.Run().Synchronize(locker).Do(source.RemoveMany);
}
Expand Down Expand Up @@ -930,7 +931,7 @@ public static IObservable<IEnumerable<T>> LimitSizeTo<T>(this ISourceList<T> sou
}

var locker = new object();
var limiter = new LimitSizeTo<T>(source, sizeLimit, scheduler ?? Scheduler.Default, locker);
var limiter = new LimitSizeTo<T>(source, sizeLimit, scheduler ?? GlobalConfig.DefaultScheduler, locker);

return limiter.Run().Synchronize(locker).Do(source.RemoveMany);
}
Expand Down

0 comments on commit ef1e8c4

Please sign in to comment.