Skip to content

Commit

Permalink
implement and use TryWaitForAvailableAsync
Browse files Browse the repository at this point in the history
update packages
  • Loading branch information
TYoungSL committed Apr 23, 2022
1 parent 499b9d2 commit 5be488a
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ public async Task GeneralQueueOperations()

[Test]
[SuppressMessage("ReSharper", "AccessToDisposedClosure")]
[SuppressMessage("ReSharper", "CognitiveComplexity")]
//[SuppressMessage(" Maintainability", "CA1502")] // TODO: split test
public async Task MultipleQueueConsumption()
{
using var cts = new CancellationTokenSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@

<ItemGroup>
<PackageReference Include="AutoBogus" Version="2.13.1" />
<PackageReference Include="FluentAssertions" Version="6.5.1" />
<PackageReference Include="GitHubActionsTestLogger" Version="1.3.0">
<PackageReference Include="FluentAssertions" Version="6.6.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="1.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="JetBrains.Annotations" Version="2021.2.0" />
<PackageReference Include="JetBrains.Annotations" Version="2022.1.0" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.2.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="StirlingLabs.BigSpans" Version="21.8.1" />
<PackageReference Include="StirlingLabs.BigSpans.NUnit" Version="21.8.1" />
<PackageReference Include="StirlingLabs.BigSpans" Version="22.4.1" />
<PackageReference Include="StirlingLabs.BigSpans.NUnit" Version="22.4.1" />
<PackageReference Include="coverlet.collector" Version="3.1.2" PrivateAssets="all" />
<PackageReference Include="coverlet.msbuild" Version="3.1.2" PrivateAssets="all" />
<PackageReference Include="ReportGenerator" Version="5.1.3" PrivateAssets="all" />
<PackageReference Include="ReportGenerator" Version="5.1.4" PrivateAssets="all" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ public async ValueTask<bool> MoveNextAsync()
Current = item;
break;
}
await Collection.WaitForAvailableAsync(_cancellationToken);
if (!await Collection.TryWaitForAvailableAsync(_cancellationToken))
{
Collection.TryToComplete();
Collection.CheckDisposed();
return false;
}
} while (!Collection.IsCompletedInternal);
Collection.TryToComplete();
Collection.CheckDisposed();
Expand All @@ -145,7 +150,8 @@ public bool MoveNext()
Current = item;
break;
}
Collection.WaitForAvailable(_cancellationToken);
if (!Collection.TryWaitForAvailable(_cancellationToken))
break;
} while (!Collection.IsCompletedInternal);
Collection.TryToComplete();
Collection.CheckDisposed();
Expand Down
104 changes: 97 additions & 7 deletions StirlingLabs.Utilities.Collections/AsyncProducerConsumerCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ public AsyncProducerConsumerCollection(IProducerConsumerCollection<T> collection
}

public AsyncProducerConsumerCollection(IProducerConsumerCollection<T> collection, IEnumerable<T> items)
: this(collection)
: this(collection)
{
if (!ReferenceEquals(collection, items))
TryAddRange(items);
}

public AsyncProducerConsumerCollection(IEnumerable<T> items)
: this(items is IProducerConsumerCollection<T> pcc ? pcc : new ConcurrentQueue<T>(), items)
{ }
: this(items is IProducerConsumerCollection<T> pcc ? pcc : new ConcurrentQueue<T>(), items) { }

public AsyncProducerConsumerCollection()
: this(new ConcurrentQueue<T>()) { }
Expand Down Expand Up @@ -127,8 +126,14 @@ public async ValueTask<T> TakeAsync(bool continueOnCapturedContext, Cancellation

for (;;)
{
await WaitForAvailableAsync(continueOnCapturedContext, cancellationToken)
.ConfigureAwait(continueOnCapturedContext);
if (!await TryWaitForAvailableAsync(continueOnCapturedContext, cancellationToken)
.ConfigureAwait(continueOnCapturedContext))
{
if (TryToComplete())
throw new OperationCanceledException("The collection has completed.", _complete.Token);
cancellationToken.ThrowIfCancellationRequested();
throw new NotImplementedException();
}

if (_collection.TryTake(out var item))
{
Expand Down Expand Up @@ -165,7 +170,20 @@ public async ValueTask WaitForAvailableAsync(bool continueOnCapturedContext, Can
else
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(_addingComplete.Token, cancellationToken);
await _semaphore.WaitAsync(cts.Token).ConfigureAwait(continueOnCapturedContext);
try
{
await _semaphore.WaitAsync(cts.Token).ConfigureAwait(continueOnCapturedContext);
}
catch (OperationCanceledException oce)
{
if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(oce.Message, oce, cancellationToken);
if (IsAddingCompleted)
throw new OperationCanceledException(
"The AsyncQueue completed adding, therefore there will not be any more available items.",
oce, _addingComplete.Token);
throw;
}
}
}

Expand All @@ -176,7 +194,8 @@ public void WaitForAvailable(CancellationToken cancellationToken)
if (IsCompletedInternal)
throw new InvalidOperationException("The AsyncQueue has already fully completed.");

if (!IsEmpty) return;
if (!IsEmpty)
return;

if (IsAddingCompleted)
throw new OperationCanceledException("The AsyncQueue completed adding, therefore there will not be any more available items.");
Expand All @@ -190,6 +209,77 @@ public void WaitForAvailable(CancellationToken cancellationToken)
}
}

[DebuggerStepThrough]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<bool> TryWaitForAvailableAsync(CancellationToken cancellationToken = default)
=> TryWaitForAvailableAsync(true, cancellationToken);

public async ValueTask<bool> TryWaitForAvailableAsync(bool continueOnCapturedContext, CancellationToken cancellationToken)
{
CheckDisposed();

if (IsCompletedInternal)
return false;

if (!IsEmpty)
return true;

if (IsAddingCompleted)
return false;

if (cancellationToken == default)
await _semaphore.WaitAsync(_addingComplete.Token).ConfigureAwait(continueOnCapturedContext);
else
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(_addingComplete.Token, cancellationToken);
try
{
await _semaphore.WaitAsync(cts.Token).ConfigureAwait(continueOnCapturedContext);
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested
|| IsAddingCompleted)
return false;
throw;
}
}
return true;
}

public bool TryWaitForAvailable(CancellationToken cancellationToken)
{
CheckDisposed();

if (IsCompletedInternal)
return false;

if (!IsEmpty)
return true;

if (IsAddingCompleted)
return false;

try
{
if (cancellationToken == default)
_semaphore.Wait(_addingComplete.Token);
else
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(_addingComplete.Token, cancellationToken);
_semaphore.Wait(cts.Token);
}
}
catch (OperationCanceledException oce)
{
if (oce.CancellationToken == cancellationToken
&& IsAddingCompleted)
return false;
throw;
}
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsEmptyInternal()
=> _collection switch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</ImportGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2021.3.0" PrivateAssets="all" />
<PackageReference Include="JetBrains.Annotations" Version="2022.1.0" PrivateAssets="all" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="6.0.0" />
</ItemGroup>
Expand Down

0 comments on commit 5be488a

Please sign in to comment.