Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: MergeChangeSets for Observable Cache ChangeSets #743

Merged

Conversation

dwcullop
Copy link
Member

@dwcullop dwcullop commented Oct 23, 2023

Description

This PR creates MergeChangeSets operators for Cache Observable ChangeSets. This operator is very similar to Merge except it is ChangeSet aware and allows the consumer to specify how to handle duplicate keys.

It leverages the same changeset merging logic that is used by MergeManyChangeSets (except it doesn't require the changesets to be children of some other changeset). This same logic will also be needed for the List -> Cache version of MergeManyChangeSets.

Summary of Changes

  1. Moves ChangeSet Merge logic out of MergeManyChangeSets class and into a pair of internal helper classes so it can be re-used by other operators:
    • ChangeSetCache<TObject, TKey>
    • ChangeSetMergeTracker<TObject, TKey>
  2. Adds tracking for OnComplete to ChangeSetAggregator
  3. Adds Spy operators to the test project (very helpful in debugging anything Rx related)
    Includes special overloads for debugging observable changesets.
  4. Lots of unit tests

Examples

You can use it with a collection of changesets:

interface IItem
{
    public IObservable<IChangeSet<ISubItem, int>>  SubItemChanges { get; }
}

IEnumerable<IItem> itemCollection = GetTheItems();

// Merge all the change sets
using var mergedSubItems = itemCollection.Select(i => i.SubItemChanges).MergeChangeSets().AsObservableCache();

You can use it with Observables with ChangeSets:

IObservable<IItem> itemObservable = GetItemObservable();

// Merge all the change sets as Items come in
using var mergedSubItems = itemObservable.Select(i => i.SubItemChanges).MergeChangeSets().AsObservableCache();

You can call it with just two changesets:

IItem item1 = GetItem(1);
IItem item2 = GetItem(2);

using var bothSubItems = item1.SubItemChanges.MergeChangeSets(item2.SubItemChanges).AsObservableCache();

You can call it as an operator on ChangeSets with a collection of changesets:

IItem mainItem = GetItem(1);
IEnumerable<IItem> otherItems = GetOthers();

using var allSubItems = mainItem.SubItemChanges.MergeChangeSets(otherItems.Select(i => i.SubItemChanges)).AsObservableCache();

An Advanced Example

The problem with merging multiple changesets is what to do if two changesets add a value with the same key. All of the overloads MergeChangeSets, like MergeManyChangeSets, allow the consumer to provide a IEqualityComparer and/or a IComparer to control which of the values is emitted in the resulting merged changeset.

Different IComparer instances can be used to create different resulting ChangeSets based on the criteria. An example of this is shown below.

interface IMarketPrice
{
    Guid MarketId { get; }
    string ItemId { get; }
    decimal Price { get; } 
    DateTimeOffset TimeStamp { get; }
}

class LowestPriceComparer : IComparer<MarketPrice>
{
    public int Compare(MarketPrice x, MarketPrice y) => x.Price.CompareTo(y);
}

class LatestPriceComparer : IComparer<MarketPrice>
{
    public int Compare(MarketPrice x, MarketPrice y) => y.TimeStamp.CompareTo(x);
}

IEnumerable<IObservableCache<MarketPrice, string>> priceCaches = GetTheMarketPrices();

// Create a cache with the lowest prices from all the markets
using var lowestPrices = priceCaches.Select(m => m.Connect()).MergeChangeSets(new LowestPriceComparer());

// Create a cache with the most recent prices across all markets
using var latestPrices = priceCaches.Select(m => m.Connect()).MergeChangeSets(new LatestPriceComparer());

As prices are added/removed/updated from all of the Price Cache instances, they will be reflected in the lowestPrice or latestPrice caches, but only if they're unique or they're the best choice according to the comparer.

If a "best choice" is removed from one of the sources, the result cache will see an "Update" event for the 2nd best choice (or a "Remove" event if there are no other choices).

@dwcullop
Copy link
Member Author

@RolandPheasant Would appreciate your feedback on this one. Functionality is very close to Or operator except it is optimized for just that case (not And or Xor, etc) and optionally supports extra parameters (comparer and/or equality comparer).

Do you think this should be a new operator? Or new extra overloads for Or? Or something else?

It basically gives you the same functionality as MergeManyChangesets but doesn't require you to have child change sets. You can just have a collection of change sets (like with Or).

@RolandPheasant
Copy link
Collaborator

Functionality is very close to Or operator except it is optimized for just that case (not And or Xor, etc) and optionally supports extra parameters (comparer and/or equality comparer).
Do you think this should be a new operator? Or new extra overloads for Or? Or something else?

Whilst it's very close to Or, due to historic differences I am included to keep it as it is here. We can consider later whether Or could be overloaded.

It basically gives you the same functionality as MergeManyChangesets but doesn't require you to have child change sets. You can just have a collection of change sets (like with Or).

That's great

@dwcullop
Copy link
Member Author

dwcullop commented Oct 25, 2023

Functionality is very close to Or operator except it is optimized for just that case (not And or Xor, etc) and optionally supports extra parameters (comparer and/or equality comparer).
Do you think this should be a new operator? Or new extra overloads for Or? Or something else?

Whilst it's very close to Or, due to historic differences I am included to keep it as it is here. We can consider later whether Or could be overloaded.

That makes sense to me as well. We don't want to change the behavior to which people are accustomed. I just wanted to make sure you were okay with adding something that is similar to an existing operator.

There is a difference between this and Or which might break things for some people: MergeChangeSets fires OnCompleted when all the source changesets have fired OnCompleted and fires OnError if any of the source changesets fire OnError (I wanted the same behavior as Observable.Merge).

@dwcullop dwcullop marked this pull request as ready for review October 28, 2023 20:15
@dwcullop
Copy link
Member Author

dwcullop commented Oct 28, 2023

Hey Reactive @RolandPheasant!

I think this one is ready for your review. Let me know if you think there is some use-case that isn't covered by the existing tests or if there's an overload that you think I'm missing.

@codecov
Copy link

codecov bot commented Oct 28, 2023

Codecov Report

Merging #743 (fe16096) into main (b01fe12) will increase coverage by 0.31%.
Report is 1 commits behind head on main.
The diff coverage is 85.71%.

@@            Coverage Diff             @@
##             main     #743      +/-   ##
==========================================
+ Coverage   64.29%   64.61%   +0.31%     
==========================================
  Files         222      225       +3     
  Lines       11224    11354     +130     
  Branches     2271     2318      +47     
==========================================
+ Hits         7217     7336     +119     
- Misses       3062     3067       +5     
- Partials      945      951       +6     
Files Coverage Δ
src/DynamicData/Cache/Internal/ChangeSetCache.cs 100.00% <100.00%> (ø)
src/DynamicData/Cache/Internal/MergeChangeSets.cs 100.00% <100.00%> (ø)
...micData/Cache/Internal/MergeManyCacheChangeSets.cs 100.00% <100.00%> (+13.82%) ⬆️
src/DynamicData/Cache/Tests/ChangeSetAggregator.cs 93.10% <100.00%> (+0.24%) ⬆️
src/DynamicData/List/Internal/MergeMany.cs 93.75% <100.00%> (+13.75%) ⬆️
src/DynamicData/Cache/ObservableCacheEx.cs 34.68% <75.00%> (+1.80%) ⬆️
...ynamicData/Cache/Internal/ChangeSetMergeTracker.cs 77.92% <77.92%> (ø)

... and 1 file with indirect coverage changes

@RolandPheasant
Copy link
Collaborator

This operator is very similar to Merge except it is ChangeSet aware and allows the consumer to specify how to handle duplicate keys.

This option is inspired

@RolandPheasant
Copy link
Collaborator

There is a difference between this and Or which might break things for some people: MergeChangeSets fires OnCompleted when all the source changesets have fired OnCompleted and fires OnError if any of the source changesets fire OnError (I wanted the same behavior as Observable.Merge).

That's probably what I should have done with Or in the first place.

Copy link
Collaborator

@RolandPheasant RolandPheasant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a major contribution. Thanks


namespace DynamicData.Tests.Cache;

public sealed class MergeChangeSetsFixture : IDisposable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very impressive level of testing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, I copied over most of the tests from the MergeManyChangeSets fixture and then tweaked them. Saving all that time allowed me to just add a few more, which is how found one corner case that I had missed in MergeManyChangeSets, but now that the implementation is shared, fixing one fixed them both.

But it was a real corner-corner case that required using a custom EqualityComparer and custom Comparer together.

The issue was:

  1. Values are added (merged results sees the add)
  2. Values are updated to values that are equal according to the EqualityComparer (merge results sees nothing)
  3. Values are refreshed (merged results sees refresh events for objects it has never observed) and they still are equal to the originally added (and updated) values

I changed it so that now, at 3, it won't see the refresh events either because something shouldn't be refreshed if it hasn't been seen.

I'm pretty sure that's the right behavior. If the refresh had caused the updated value to be the new best value, then it would have observed the Update at that point. Tell me if you think that's the wrong behavior.

/// <param name="showTimestamps">Indicates whether or not timestamps should be prepended to messages.</param>
/// <returns>An IObservable{T} with the Spy events included.</returns>
/// <remarks>Adapted from https://stackoverflow.com/q/20220755/.</remarks>
public static IObservable<T> Spy<T>(this IObservable<T> source, string? infoText = null, Action<string>? logger = null,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super cool

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is amazing. I wish I could take credit, but it's not my idea... I just fleshed it out a bit. It is game-changing in terms of debugging Rx. I did add the ChangeSet-specific overloads that allow it to peer into the individual changes though. This has saved me a ton of time and hassle and helps get you right to the root of an issue. I really needed its help for a couple of the test-cases and so I ported it over from my personal library.

I think you should consider promoting it to the main library instead of the test library, but I didn't want to be presumptuous. Also, I don't think the interface is quite ready for mainstream. It's fine for debugging, but for production, people will want to integrate their own loggers and not have to specify the logging facility each time. Not exactly sure what that would look like, but it was definitely out of scope for this PR, so I included it as-is.

@RolandPheasant RolandPheasant merged commit 0434568 into reactivemarbles:main Oct 30, 2023
3 checks passed
Copy link

This pull request has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Nov 14, 2023
@dwcullop dwcullop deleted the feature/cache-merge-changesets branch November 23, 2023 01:00
@dwcullop dwcullop self-assigned this Dec 20, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants