Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintenance: Centralized Value for Default IScheduler instance #862

Merged
merged 6 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/AutoRefresh.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal sealed class AutoRefresh<TObject, TKey, TAny>(IObservable<IChangeSet<TO
{
private readonly Func<TObject, TKey, IObservable<TAny>> _reEvaluator = reEvaluator ?? throw new ArgumentNullException(nameof(reEvaluator));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/BatchIf.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal sealed class BatchIf<TObject, TKey>(IObservable<IChangeSet<TObject, TKe
{
private readonly IObservable<bool> _pauseIfTrueSelector = pauseIfTrueSelector ?? throw new ArgumentNullException(nameof(pauseIfTrueSelector));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
3 changes: 1 addition & 2 deletions src/DynamicData/Cache/Internal/GroupOnProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;
Expand All @@ -28,7 +27,7 @@ public IObservable<IGroupChangeSet<TObject, TKey, TGroup>> Run() => _source.Publ
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;
Expand All @@ -17,7 +16,7 @@ internal sealed class GroupOnPropertyWithImmutableState<TObject, TKey, TGroup>(I
where TGroup : notnull
{
private readonly Func<TObject, TGroup> _groupSelector = groupSelectorKey.Compile();
private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
4 changes: 2 additions & 2 deletions src/DynamicData/Cache/Internal/ToObservableChangeSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ToObservableChangeSet(
_expireAfter = expireAfter;
_keySelector = keySelector;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

_source = Observable.Create<IEnumerable<TObject>>(observer =>
{
Expand Down Expand Up @@ -57,7 +57,7 @@ public ToObservableChangeSet(
_expireAfter = expireAfter;
_keySelector = keySelector;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? GlobalConfig.DefaultScheduler;
_source = source;
}

Expand Down
20 changes: 11 additions & 9 deletions src/DynamicData/Cache/ObservableCacheEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Runtime.CompilerServices;
using DynamicData;
using DynamicData.Binding;
using DynamicData.Cache;
using DynamicData.Cache.Internal;
using DynamicData.Internal;
using DynamicData.Kernel;

// ReSharper disable once CheckNamespace
Expand Down Expand Up @@ -324,7 +326,7 @@ public static IObservable<IChangeSet<TObject, TKey>> AutoRefresh<TObject, TKey>(
return t.WhenAnyPropertyChanged();
}

return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -356,7 +358,7 @@ public static IObservable<IChangeSet<TObject, TKey>> AutoRefresh<TObject, TKey,
return t.WhenPropertyChanged(propertyAccessor, false);
}

return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -416,7 +418,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Batch<TObject, TKey>(this I
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return source.Buffer(timeSpan, scheduler ?? Scheduler.Default).FlattenBufferResult();
return source.Buffer(timeSpan, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult();
}

/// <summary>
Expand Down Expand Up @@ -832,7 +834,7 @@ public static IObservable<IChangeSet<TObject, TKey>> BufferInitial<TObject, TKey
where TKey : notnull => source.DeferUntilLoaded().Publish(
shared =>
{
var initial = shared.Buffer(initialBuffer, scheduler ?? Scheduler.Default).FlattenBufferResult().Take(1);
var initial = shared.Buffer(initialBuffer, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult().Take(1);

return initial.Concat(shared);
});
Expand Down Expand Up @@ -1357,7 +1359,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Except<TObject, TKey>(this
/// </exception>
public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TimeSpan?> timeSelector)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, GlobalConfig.DefaultScheduler);

/// <summary>
/// Automatically removes items from the stream after the time specified by
Expand Down Expand Up @@ -1401,7 +1403,7 @@ public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(
/// timeSelector.</exception>
public static IObservable<IChangeSet<TObject, TKey>> ExpireAfter<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TimeSpan?> timeSelector, TimeSpan? pollingInterval)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, pollingInterval, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, pollingInterval, GlobalConfig.DefaultScheduler);

/// <summary>
/// Automatically removes items from the stream on the next poll after the time specified by
Expand Down Expand Up @@ -1463,7 +1465,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<
/// timeSelector.</exception>
public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<TObject, TKey>(this ISourceCache<TObject, TKey> source, Func<TObject, TimeSpan?> timeSelector, TimeSpan? interval = null)
where TObject : notnull
where TKey : notnull => ExpireAfter(source, timeSelector, interval, Scheduler.Default);
where TKey : notnull => ExpireAfter(source, timeSelector, interval, GlobalConfig.DefaultScheduler);

/// <summary>
/// Ensures there are no duplicated keys in the observable changeset.
Expand Down Expand Up @@ -1504,7 +1506,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ExpireAfter<
source.ThrowArgumentNullExceptionIfNull(nameof(source));
timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector));

scheduler ??= Scheduler.Default;
scheduler ??= GlobalConfig.DefaultScheduler;

return Observable.Create<IEnumerable<KeyValuePair<TKey, TObject>>>(
observer => source.Connect().ForExpiry(timeSelector, pollingInterval, scheduler).Finally(observer.OnCompleted).Subscribe(
Expand Down Expand Up @@ -2499,7 +2501,7 @@ public static IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> LimitSizeTo<
long orderItemWasAdded = -1;
var sizeLimiter = new SizeLimiter<TObject, TKey>(sizeLimit);

return source.Connect().Finally(observer.OnCompleted).ObserveOn(scheduler ?? Scheduler.Default).Transform((t, v) => new ExpirableItem<TObject, TKey>(t, v, DateTime.Now, Interlocked.Increment(ref orderItemWasAdded))).Select(sizeLimiter.CloneAndReturnExpiredOnly).Where(expired => expired.Length != 0).Subscribe(
return source.Connect().Finally(observer.OnCompleted).ObserveOn(scheduler ?? GlobalConfig.DefaultScheduler).Transform((t, v) => new ExpirableItem<TObject, TKey>(t, v, DateTime.Now, Interlocked.Increment(ref orderItemWasAdded))).Select(sizeLimiter.CloneAndReturnExpiredOnly).Where(expired => expired.Length != 0).Subscribe(
toRemove =>
{
try
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/Experimental/ExperimentalEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public static IWatcher<TObject, TKey> AsWatcher<TObject, TKey>(this IObservable<
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return new Watcher<TObject, TKey>(source, scheduler ?? Scheduler.Default);
return new Watcher<TObject, TKey>(source, scheduler ?? GlobalConfig.DefaultScheduler);
}
}
12 changes: 12 additions & 0 deletions src/DynamicData/GlobalConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive.Concurrency;

namespace DynamicData;

internal static class GlobalConfig
{
public static IScheduler DefaultScheduler => TaskPoolScheduler.Default;
}
2 changes: 1 addition & 1 deletion src/DynamicData/List/Internal/AutoRefresh.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public IObservable<IChangeSet<TObject>> Run() => Observable.Create<IChangeSet<TO
// create a change set, either buffered or one item at the time
IObservable<IEnumerable<TObject>> itemsChanged = buffer is null ?
itemHasChanged.Select(t => new[] { t }) :
itemHasChanged.Buffer(buffer.Value, scheduler ?? Scheduler.Default).Where(list => list.Count > 0);
itemHasChanged.Buffer(buffer.Value, scheduler ?? GlobalConfig.DefaultScheduler).Where(list => list.Count > 0);

IObservable<IChangeSet<TObject>> requiresRefresh = itemsChanged.Synchronize(locker).Select(
items => // catch all the indices of items which have been refreshed
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/List/Internal/BufferIf.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal sealed class BufferIf<T>(IObservable<IChangeSet<T>> source, IObservable
{
private readonly IObservable<bool> _pauseIfTrueSelector = pauseIfTrueSelector ?? throw new ArgumentNullException(nameof(pauseIfTrueSelector));

private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default;
private readonly IScheduler _scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

private readonly IObservable<IChangeSet<T>> _source = source ?? throw new ArgumentNullException(nameof(source));

Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/List/Internal/FilterOnObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public IObservable<IChangeSet<TObject>> Run() => Observable.Create<IChangeSet<TO
// create a change set, either buffered or one item at the time
var itemsChanged = buffer is null ?
itemHasChanged.Select(t => new[] { t }) :
itemHasChanged.Buffer(buffer.Value, scheduler ?? Scheduler.Default).Where(list => list.Count > 0);
itemHasChanged.Buffer(buffer.Value, scheduler ?? GlobalConfig.DefaultScheduler).Where(list => list.Count > 0);

var requiresRefresh = itemsChanged.Synchronize(locker).Select(
items => // catch all the indices of items which have been refreshed
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/List/Internal/GroupOnProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public IObservable<IChangeSet<IGroup<TObject, TGroup>>> Run() => _source.Publish
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public IObservable<IChangeSet<IGrouping<TObject, TGroup>>> Run() => _source.Publ
// add a throttle if specified
if (throttle is not null)
{
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? Scheduler.Default);
regrouper = regrouper.Throttle(throttle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
}

// Use property changes as a trigger to re-evaluate Grouping
Expand Down
4 changes: 2 additions & 2 deletions src/DynamicData/List/Internal/ToObservableChangeSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public ToObservableChangeSet(
{
_expireAfter = expireAfter;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? GlobalConfig.DefaultScheduler;

_source = Observable.Create<IEnumerable<TObject>>(observer =>
{
Expand All @@ -51,7 +51,7 @@ public ToObservableChangeSet(
{
_expireAfter = expireAfter;
_limitSizeTo = limitSizeTo;
_scheduler = scheduler ?? Scheduler.Default;
_scheduler = scheduler ?? GlobalConfig.DefaultScheduler;
_source = source;
}

Expand Down
11 changes: 6 additions & 5 deletions src/DynamicData/List/ObservableListEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;

using DynamicData;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.Kernel;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static IObservable<IChangeSet<TObject>> AutoRefresh<TObject>(this IObserv
return t.WhenAnyPropertyChanged();
}

return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -215,7 +216,7 @@ public static IObservable<IChangeSet<TObject>> AutoRefresh<TObject, TProperty>(t
return t.WhenPropertyChanged(propertyAccessor, false);
}

return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default);
return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
},
changeSetBuffer,
scheduler);
Expand Down Expand Up @@ -442,7 +443,7 @@ public static IObservable<IChangeSet<TObject>> BufferInitial<TObject>(this IObse
where TObject : notnull => source.DeferUntilLoaded().Publish(
shared =>
{
var initial = shared.Buffer(initialBuffer, scheduler ?? Scheduler.Default).FlattenBufferResult().Take(1);
var initial = shared.Buffer(initialBuffer, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult().Take(1);

return initial.Concat(shared);
});
Expand Down Expand Up @@ -679,7 +680,7 @@ public static IObservable<IEnumerable<T>> ExpireAfter<T>(this ISourceList<T> sou
timeSelector.ThrowArgumentNullExceptionIfNull(nameof(timeSelector));

var locker = new object();
var limiter = new ExpireAfter<T>(source, timeSelector, pollingInterval, scheduler ?? Scheduler.Default, locker);
var limiter = new ExpireAfter<T>(source, timeSelector, pollingInterval, scheduler ?? GlobalConfig.DefaultScheduler, locker);

return limiter.Run().Synchronize(locker).Do(source.RemoveMany);
}
Expand Down Expand Up @@ -930,7 +931,7 @@ public static IObservable<IEnumerable<T>> LimitSizeTo<T>(this ISourceList<T> sou
}

var locker = new object();
var limiter = new LimitSizeTo<T>(source, sizeLimit, scheduler ?? Scheduler.Default, locker);
var limiter = new LimitSizeTo<T>(source, sizeLimit, scheduler ?? GlobalConfig.DefaultScheduler, locker);

return limiter.Run().Synchronize(locker).Do(source.RemoveMany);
}
Expand Down