-
-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: MergeChangeSets for Observable Cache ChangeSets #743
Merged
RolandPheasant
merged 12 commits into
reactivemarbles:main
from
dwcullop:feature/cache-merge-changesets
Oct 30, 2023
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
63a50ce
Updated List to have same new MergeMany behavior
dwcullop 00fd129
Added List -> Cache Merge ChangeSets and MergeChangeSets for Cache
dwcullop 380790b
All unit tests pass except for 2
dwcullop 6306f90
More tests
dwcullop 5dc2026
Revert changes that will go in another PR
dwcullop 228a417
Merge branch 'main' into feature/cache-merge-changesets
dwcullop 84f6fad
More Progress
dwcullop 04597a9
Established all the prototypes for MergeChangeSets
dwcullop 1b457d4
Code cleanup and more unit tests
dwcullop 5c34180
Finished more unit tests
dwcullop 58c25b3
Add Fail on Child ChangeSet Failure
dwcullop fe16096
Merge branch 'main' into feature/cache-merge-changesets
RolandPheasant File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
1,056 changes: 1,056 additions & 0 deletions
1,056
src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
using System; | ||
using System.Diagnostics; | ||
using System.Linq; | ||
using System.Reactive.Disposables; | ||
using System.Reactive.Linq; | ||
using System.Threading; | ||
|
||
namespace DynamicData.Tests.Utilities; | ||
|
||
internal static class ObservableSpy | ||
{ | ||
private static readonly string ChangeSetEntrySpacing = Environment.NewLine + "\t"; | ||
|
||
/// <summary> | ||
/// Spys on the given IObservable{T} by emitting logging information that is tagged with the current ThreadId for all related | ||
/// events including every invocation on the Observer, subscriptions, disposes, and exceptions. | ||
/// </summary> | ||
/// <typeparam name="T">The type of the Observable.</typeparam> | ||
/// <param name="source">The source IObservable to be Spied on.</param> | ||
/// <param name="logger">The logger instance to use for logging.</param> | ||
/// <param name="infoText">Optional text to include with each log message.</param> | ||
/// <param name="formatter">Optional Value transformer to control how the observed values are logged.</param> | ||
/// <param name="showSubs">Indicates whether or not subscription related messages should be emitted.</param> | ||
/// <param name="showTimestamps">Indicates whether or not timestamps should be prepended to messages.</param> | ||
/// <returns>An IObservable{T} with the Spy events included.</returns> | ||
/// <remarks>Adapted from https://stackoverflow.com/q/20220755/.</remarks> | ||
public static IObservable<T> Spy<T>(this IObservable<T> source, string? infoText = null, Action<string>? logger = null, | ||
Func<T, string?>? formatter = null, bool showSubs = true, | ||
bool showTimestamps = true) | ||
{ | ||
static string NoTimestamp() => string.Empty; | ||
static string HighResTimestamp() => DateTimeOffset.UtcNow.ToString("HH:mm:ss.fffffff") + " "; | ||
|
||
formatter ??= (t => t?.ToString() ?? "{Null}"); | ||
logger = CreateLogger(logger ?? Console.WriteLine, showTimestamps ? HighResTimestamp : NoTimestamp, infoText ?? $"IObservable<{typeof(T).Name}>"); | ||
|
||
logger("Creating Observable"); | ||
|
||
int subscriptionCounter = 0; | ||
return Observable.Create<T>(obs => | ||
{ | ||
var valueCounter = 0; | ||
bool? completedSuccessfully = null; | ||
|
||
if (showSubs) | ||
{ | ||
logger("Creating Subscription"); | ||
} | ||
try | ||
{ | ||
var subscription = source | ||
.Do(x => logger($"OnNext() (#{Interlocked.Increment(ref valueCounter)}): {formatter(x)}"), | ||
ex => { logger($"OnError() ({valueCounter} Values) [Exception: {ex}]"); completedSuccessfully = false; }, | ||
() => { logger($"OnCompleted() ({valueCounter} Values)"); completedSuccessfully = true; }) | ||
.Subscribe(t => | ||
{ | ||
try | ||
{ | ||
obs.OnNext(t); | ||
} | ||
catch (Exception ex) | ||
{ | ||
logger($"Downstream exception ({ex})"); | ||
throw; | ||
} | ||
}, obs.OnError, obs.OnCompleted); | ||
|
||
return Disposable.Create(() => | ||
{ | ||
if (showSubs) | ||
{ | ||
switch (completedSuccessfully) | ||
{ | ||
case true: logger("Disposing because Observable Sequence Completed Successfully"); break; | ||
case false: logger("Disposing due to Failed Observable Sequence"); break; | ||
case null: logger("Disposing due to Unsubscribe"); break; | ||
} | ||
} | ||
subscription?.Dispose(); | ||
int count = Interlocked.Decrement(ref subscriptionCounter); | ||
if (showSubs) | ||
{ | ||
logger($"Dispose Completed! ({count} Active Subscriptions)"); | ||
} | ||
}); | ||
} | ||
finally | ||
{ | ||
int count = Interlocked.Increment(ref subscriptionCounter); | ||
if (showSubs) | ||
{ | ||
logger($"Subscription Created! ({count} Active Subscriptions)"); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
public static IObservable<IChangeSet<T, TKey>> Spy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source, | ||
string? opName = null, Action<string>? logger = null, | ||
Func<T, string?>? formatter = null, bool showSubs = true, | ||
bool showTimestamps = true) | ||
where T : notnull | ||
where TKey : notnull | ||
{ | ||
formatter = formatter ?? (t => t?.ToString() ?? "{Null}"); | ||
return Spy(source, opName, logger, cs => "[Cache Change Set]" + ChangeSetEntrySpacing + string.Join(ChangeSetEntrySpacing, | ||
cs.Select((change, n) => $"#{n} [{change.Reason}] {change.Key}: {FormatChange(formatter!, change)}")), showSubs, showTimestamps); | ||
} | ||
|
||
public static IObservable<IChangeSet<T>> Spy<T>(this IObservable<IChangeSet<T>> source, | ||
string? opName = null, Action<string>? logger = null, | ||
Func<T, string?>? formatter = null, bool showSubs = true, | ||
bool showTimestamps = true) | ||
where T : notnull | ||
{ | ||
formatter = formatter ?? (t => t?.ToString() ?? "{Null}"); | ||
return Spy(source, opName, logger, cs => "[List Change Set]" + ChangeSetEntrySpacing + string.Join(ChangeSetEntrySpacing, | ||
cs.Select(change => $"[{change.Reason}] {FormatChange(formatter!, change)}")), showSubs, showTimestamps); | ||
} | ||
|
||
public static IObservable<T> DebugSpy<T>(this IObservable<T> source, string? opName = null, | ||
Func<T, string?>? formatter = null, bool showSubs = true, | ||
bool showTimestamps = true) | ||
{ | ||
#if DEBUG | ||
return source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps); | ||
#else | ||
return source; | ||
#endif | ||
} | ||
|
||
public static IObservable<IChangeSet<T, TKey>> DebugSpy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source, | ||
string? opName = null, | ||
Func<T, string?>? formatter = null, bool showSubs = true, | ||
bool showTimestamps = true) | ||
where T : notnull | ||
where TKey : notnull | ||
{ | ||
#if DEBUG | ||
return source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps); | ||
#else | ||
return source; | ||
#endif | ||
} | ||
|
||
public static IObservable<IChangeSet<T>> DebugSpy<T>(this IObservable<IChangeSet<T>> source, | ||
string? opName = null, | ||
Func<T, string?>? formatter = null, bool showSubs = true, | ||
bool showTimestamps = true) | ||
where T : notnull | ||
{ | ||
#if DEBUG | ||
return source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps); | ||
#else | ||
return source; | ||
#endif | ||
} | ||
|
||
private static string FormatChange<T, TKey>(Func<T, string> formatter, Change<T, TKey> change) | ||
where T : notnull | ||
where TKey : notnull => | ||
change.Reason switch | ||
{ | ||
ChangeReason.Update => $"{formatter(change.Current)} [Previous: {formatter(change.Previous.Value)}]", | ||
_ => formatter(change.Current), | ||
}; | ||
|
||
private static string FormatChange<T>(Func<T, string> formatter, Change<T> change) | ||
where T : notnull => | ||
change.Reason switch | ||
{ | ||
ListChangeReason.AddRange => string.Join(", ", change.Range.Select(n => formatter(n))), | ||
ListChangeReason.RemoveRange => string.Join(", ", change.Range.Select(n => formatter(n))), | ||
_ => formatter(change.Item.Current), | ||
}; | ||
|
||
private static Action<string> CreateLogger(Action<string> baseLogger, Func<string> timeStamper, string opName) => | ||
msg => baseLogger($"{timeStamper()}[{Thread.CurrentThread.ManagedThreadId:X2}] |{opName}| {msg}"); | ||
|
||
#if DEBUG | ||
static void DebugLogger(string str) => Debug.WriteLine(str); | ||
#endif | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. | ||
// Roland Pheasant licenses this file to you under the MIT license. | ||
// See the LICENSE file in the project root for full license information. | ||
|
||
using System.Reactive.Linq; | ||
|
||
namespace DynamicData.Cache.Internal; | ||
|
||
/// <summary> | ||
/// Wraps an Observable ChangeSet while maintaining a copy of the aggregated changes. | ||
/// </summary> | ||
/// <typeparam name="TObject">ChangeSet Object Type.</typeparam> | ||
/// <typeparam name="TKey">ChangeSet Key Type.</typeparam> | ||
internal class ChangeSetCache<TObject, TKey> | ||
where TObject : notnull | ||
where TKey : notnull | ||
{ | ||
public ChangeSetCache(IObservable<IChangeSet<TObject, TKey>> source) | ||
{ | ||
Source = source.IgnoreSameReferenceUpdate().Do(Cache.Clone); | ||
} | ||
|
||
public Cache<TObject, TKey> Cache { get; } = new(); | ||
|
||
public IObservable<IChangeSet<TObject, TKey>> Source { get; } | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super cool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is amazing. I wish I could take credit, but it's not my idea... I just fleshed it out a bit. It is game-changing in terms of debugging Rx. I did add the ChangeSet-specific overloads that allow it to peer into the individual changes though. This has saved me a ton of time and hassle and helps get you right to the root of an issue. I really needed its help for a couple of the test-cases and so I ported it over from my personal library.
I think you should consider promoting it to the main library instead of the test library, but I didn't want to be presumptuous. Also, I don't think the interface is quite ready for mainstream. It's fine for debugging, but for production, people will want to integrate their own loggers and not have to specify the logging facility each time. Not exactly sure what that would look like, but it was definitely out of scope for this PR, so I included it as-is.