Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: TrueForAny emitting prematurely and multiple times #922

Closed
kierenkumulia opened this issue Jul 26, 2024 · 6 comments · Fixed by #923
Closed

[Bug]: TrueForAny emitting prematurely and multiple times #922

kierenkumulia opened this issue Jul 26, 2024 · 6 comments · Fixed by #923
Assignees
Labels

Comments

@kierenkumulia
Copy link

Describe the bug 🐞

When subscribing to an Observable that evaluates the inner observables for 'TrueForAny', it emits prematurely if the first value is false.

Current

Plugin A has value false
Outcome: False
Plugin A has value false
Plugin B has value true
Outcome: True

Step to reproduce

Attached below is the code to reproduce the behaviour above

using DynamicData;

using System.Reactive.Linq;

using System.Reactive.Disposables;

using DynamicData.Kernel;

using System.Reactive.Subjects;

namespace TestDynamicData

{
    internal class Plugin

    {

        private readonly ISubject<bool> _busy = new Subject<bool>();

        private bool? _value;

        public string Id { get; }

        public IObservable<bool> IsBusy => _value.HasValue ? _busy.StartWith(_value.Value) : _busy;

 

        public Plugin(string id)

        {

            Id = id;

        }

 

        public void SetBusy(bool isBusy)

        {

            _value = isBusy;

            _busy.OnNext(isBusy);

        }

    }

 

    public class Program

    {

        static void Main(string[] args)

        {

            var pluginCache = new SourceCache<Plugin, string>(x => x.Id);

 

            var pluginA = new Plugin("A");

            var pluginB = new Plugin("B");

 

            pluginCache.AddOrUpdate(pluginA);

            pluginCache.AddOrUpdate(pluginB);

 

            pluginA.SetBusy(false);

            pluginB.SetBusy(true);

 

            var busyObservable = pluginCache.Connect().TrueForAny(x => x.IsBusy, (p, x) =>

            {

                Console.WriteLine($"Plugin {p.Id} has value {x}");

                return x;

            });

 

            busyObservable.Subscribe(outcome =>

            {

                Console.WriteLine($"Outcome: {outcome}");

            });

        }

    }

}

Reproduction repository

See code in issue

Expected behavior

See modified source code below to emit only one value

Expected

Plugin A has value false
Plugin B has value true
Outcome: True
using DynamicData;

using System.Reactive.Linq;

using System.Reactive.Disposables;

using DynamicData.Kernel;

using System.Reactive.Subjects;

namespace TestDynamicData

{

    public static class DynamicDataExtensions

    {

        public static IObservable<bool> TrueForAny<TObject, TKey, TValue>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IObservable<TValue>> observableSelector, Func<TObject, TValue, bool> equalityCondition)

            where TObject : notnull

            where TKey : notnull

            where TValue : notnull => source.TrueFor(observableSelector, items => items.Any(o => o.LatestValue.HasValue && equalityCondition(o.Item, o.LatestValue.Value)));

 

 

        private static IObservable<bool> TrueFor<TObject, TKey, TValue>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IObservable<TValue>> observableSelector, Func<IEnumerable<ObservableWithValue<TObject, TValue>>, bool> collectionMatcher)

            where TObject : notnull

            where TKey : notnull

            where TValue : notnull => new TrueFor<TObject, TKey, TValue>(source, observableSelector, collectionMatcher).Run();

    }

 

    internal sealed class TrueFor<TObject, TKey, TValue>(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IObservable<TValue>> observableSelector, Func<IEnumerable<ObservableWithValue<TObject, TValue>>, bool> collectionMatcher)

        where TObject : notnull

        where TKey : notnull

        where TValue : notnull

    {

        private readonly Func<IEnumerable<ObservableWithValue<TObject, TValue>>, bool> _collectionMatcher = collectionMatcher ?? throw new ArgumentNullException(nameof(collectionMatcher));

 

        private readonly Func<TObject, IObservable<TValue>> _observableSelector = observableSelector ?? throw new ArgumentNullException(nameof(observableSelector));

 

        private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

 

        public IObservable<bool> Run() => Observable.Create<bool>(

                observer =>

                {

                    var transformed = _source.Transform(t => new ObservableWithValue<TObject, TValue>(t, _observableSelector(t))).Publish();

                    var inlineChanges = transformed.MergeMany(t => t.Observable);

                    var queried = transformed.ToCollection();

 

                    // nb: we do not care about the inline change because we are only monitoring it to cause a re-evaluation of all items

                   

                    // Modified Code

                    var publisher = inlineChanges.CombineLatest(queried, (_, items) => _collectionMatcher(items)).DistinctUntilChanged().SubscribeSafe(observer);

                   

                    // Current code

                    //var publisher = queried.CombineLatest(inlineChanges, (items, _) => _collectionMatcher(items)).DistinctUntilChanged().SubscribeSafe(observer);

 

                    return new CompositeDisposable(publisher, transformed.Connect());

                });

    }

 

    internal sealed class ObservableWithValue<TObject, TValue>

        where TValue : notnull

    {

        public ObservableWithValue(TObject item, IObservable<TValue> source)

        {

            Item = item;

            Observable = source.Do(value => LatestValue = value);

        }

 

        public TObject Item { get; }

 

        public Optional<TValue> LatestValue { get; private set; } = Optional<TValue>.None;

 

        public IObservable<TValue> Observable { get; }

    }

 

    internal class Plugin

    {

        private readonly ISubject<bool> _busy = new Subject<bool>();

        private bool? _value;

        public string Id { get; }

        public IObservable<bool> IsBusy => _value.HasValue ? _busy.StartWith(_value.Value) : _busy;

 

        public Plugin(string id)

        {

            Id = id;

        }

 

        public void SetBusy(bool isBusy)

        {

            _value = isBusy;

            _busy.OnNext(isBusy);

        }

    }

 

    public class Program

    {

        static void Main(string[] args)

        {

            var pluginCache = new SourceCache<Plugin, string>(x => x.Id);

 

            var pluginA = new Plugin("A");

            var pluginB = new Plugin("B");

 

            pluginCache.AddOrUpdate(pluginA);

            pluginCache.AddOrUpdate(pluginB);

 

            pluginA.SetBusy(false);

            pluginB.SetBusy(true);

 

            var busyObservable = pluginCache.Connect().TrueForAny(x => x.IsBusy, (p, x) =>

            {

                Console.WriteLine($"Plugin {p.Id} has value {x}");

                return x;

            });

 

            busyObservable.Subscribe(outcome =>

            {

                Console.WriteLine($"Outcome: {outcome}");

            });

        }

    }

}

Screenshots 🖼️

No response

IDE

Visual Studio 2022

Operating system

Windows 10

Version

No response

Device

No response

DynamicData Version

9.0.1

Additional information ℹ️

.net 8

@JakenVeina
Copy link
Collaborator

JakenVeina commented Jul 27, 2024

Yeah, this is definitely an implementation flaw. The operator uses emissions from any of the item value streams to trigger a re-evaluation of the whole collection, but begins subscribing to the value streams after the initial collection has been built (with each item's value marked as uninitialized, and thus equate to false). Luckily, we can just swap the order of subscriptions here, and guarantee that all value streams are subscribed before the initial collection is built. Also luckily, this fix only affects the 3 other TrueFor operators.

In the meantime, the following query will get you the same thing, without the premature emission. I'll bet it's actually more efficient, too.

var busyObservable = pluginCache.Connect()
    .FilterOnObservable(plugin => plugin.IsBusy)
    .Count()
    .Select(busyCount => busyCount is not 0);

@kierenkumulia
Copy link
Author

Yeah, this is definitely an implementation flaw. The operator uses emissions from any of the item value streams to trigger a re-evaluation of the whole collection, but begins subscribing to the value streams after the initial collection has been built (with each item's value marked as uninitialized, and thus equate to false). Luckily, we can just swap the order of subscriptions here, and guarantee that all value streams are subscribed before the initial collection is built. Also luckily, this fix only affects the 3 other TrueFor operators.

In the meantime, the following query will get you the same thing, without the premature emission. I'll bet it's actually more efficient, too.

var busyObservable = pluginCache.Connect()
    .FilterOnObservable(plugin => plugin.IsBusy)
    .Count()
    .Select(busyCount => busyCount is not 0);

Thanks @JakenVeina for the prompt reply and fix. When are you all targeting a new release?

@RolandPheasant
Copy link
Collaborator

RolandPheasant commented Jul 28, 2024

@JakenVeina do you have permission to release? I can't properly check because I am in the Alps trekking right now.

@JakenVeina
Copy link
Collaborator

I'm not sure. I'll take a look, later today.

Copy link

This issue has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Aug 12, 2024
@JakenVeina
Copy link
Collaborator

Version 9.0.3 should show up on NuGet shortly, with this fix.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
3 participants