Skip to content

Commit

Permalink
make TryWaitForAvailableAsync part of the IAsyncConsumer interface
Browse files Browse the repository at this point in the history
make use of TryWaitForAvailableAsync in FairAsyncConsumerIMux
  • Loading branch information
TYoungSL committed Apr 23, 2022
1 parent 0837a12 commit f0204c1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
2 changes: 2 additions & 0 deletions StirlingLabs.Utilities.Collections/AsyncConsumerIMux.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using JetBrains.Annotations;

Expand All @@ -19,6 +20,7 @@ public static AsyncConsumerIMux<T> Fair<T>(params IAsyncConsumer<T>[] collection
=> new FairAsyncConsumerIMux<T>(collections);
}

[SuppressMessage("Design", "CA1063", Justification = "Not necessary")]
public abstract class AsyncConsumerIMux<T>
: IAsyncEnumerable<T>, IDisposable
{
Expand Down
3 changes: 3 additions & 0 deletions StirlingLabs.Utilities.Collections/EmptyAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ public bool TryMoveNext(out T? item)
public ValueTask WaitForAvailableAsync(bool continueOnCapturedContext, CancellationToken cancellationToken)
=> default;

public ValueTask<bool> TryWaitForAvailableAsync(bool continueOnCapturedContext, CancellationToken cancellationToken)
=> default;

public void Dispose() { }
}
32 changes: 21 additions & 11 deletions StirlingLabs.Utilities.Collections/FairAsyncConsumerIMux.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,20 @@ public void WithLock([InstantHandle] Action<FairAsyncConsumerIMux<T>> action)
if (action is null) throw new ArgumentNullException(nameof(action));
lock (_lock) action(this);
}
public TResult WithLock<TResult>([InstantHandle]Func<FairAsyncConsumerIMux<T>, TResult> fn)

public TResult WithLock<TResult>([InstantHandle] Func<FairAsyncConsumerIMux<T>, TResult> fn)
{
if (fn is null) throw new ArgumentNullException(nameof(fn));
lock (_lock) return fn(this);
}

public void WithLock([InstantHandle]Action<FairAsyncConsumerIMux<T>, object> action)
public void WithLock([InstantHandle] Action<FairAsyncConsumerIMux<T>, object> action)
{
if (action is null) throw new ArgumentNullException(nameof(action));
lock (_lock) action(this, _lock);
}
public TResult WithLock<TResult>([InstantHandle]Func<FairAsyncConsumerIMux<T>, object, TResult> fn)

public TResult WithLock<TResult>([InstantHandle] Func<FairAsyncConsumerIMux<T>, object, TResult> fn)
{
if (fn is null) throw new ArgumentNullException(nameof(fn));
lock (_lock) return fn(this, _lock);
Expand Down Expand Up @@ -236,21 +236,31 @@ public async ValueTask<bool> MoveNextAsync(bool continueOnCapturedContext)
{
var remainingConsumers = _consumerIMux._consumers
.Where(c => !c.IsCompleted)
.Select(c => c.WaitForAvailableAsync(continueOnCapturedContext, _cancellationToken).AsTask())
.ToArray();
.Select(c => c.TryWaitForAvailableAsync(continueOnCapturedContext, _cancellationToken).AsTask())
.ToList();

if (remainingConsumers.Length == 0)
if (remainingConsumers.Count == 0)
return false;

await Task.WhenAny(remainingConsumers)
.ConfigureAwait(continueOnCapturedContext);
for (;;)
{
var available = await Task.WhenAny(remainingConsumers)
.ConfigureAwait(continueOnCapturedContext);

#pragma warning disable CA2007
if (await available) break;
#pragma warning restore CA2007

remainingConsumers.Remove(available);
}

}
catch (OperationCanceledException) { }
catch (InvalidOperationException) { }
}
return true;
}

private bool TryConsume(FairAsyncConsumerIMux<T> c, object l)
{
var consumers = c._consumers;
Expand Down
2 changes: 2 additions & 0 deletions StirlingLabs.Utilities.Collections/IAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ public interface IAsyncConsumer<T> : IAsyncEnumerable<T>, IEnumerable<T>, IAsync
bool TryMoveNext(out T? item);

ValueTask WaitForAvailableAsync(bool continueOnCapturedContext, CancellationToken cancellationToken);

ValueTask<bool> TryWaitForAvailableAsync(bool continueOnCapturedContext, CancellationToken cancellationToken);
}

0 comments on commit f0204c1

Please sign in to comment.