Skip to content

Commit

Permalink
Build Threading.Tasks.DataFlow and ComponentModel.Annotations for Net…
Browse files Browse the repository at this point in the history
…CoreAppCurrent

Fix #48662
  • Loading branch information
eerhardt committed Feb 23, 2021
1 parent 464208f commit e803a1b
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.1</TargetFrameworks>
<TargetFrameworks>$(NetCoreAppCurrent);netstandard2.1</TargetFrameworks>
<ExcludeCurrentNetCoreAppFromPackage>true</ExcludeCurrentNetCoreAppFromPackage>
<Nullable>enable</Nullable>
<!--
Since many resource strings in this library are shown to an end-user,
Expand Down Expand Up @@ -59,4 +60,16 @@
<Compile Include="$(CommonPath)System\NotImplemented.cs"
Link="Common\System\NotImplemented.cs" />
</ItemGroup>
<ItemGroup Condition="$([MSBuild]::GetTargetFrameworkIdentifier('$(TargetFramework)')) == '.NETCoreApp'">
<Reference Include="System.Collections" />
<Reference Include="System.Collections.Concurrent" />
<Reference Include="System.ComponentModel" />
<Reference Include="System.ComponentModel.Primitives" />
<Reference Include="System.ComponentModel.TypeConverter" />
<Reference Include="System.Linq" />
<Reference Include="System.Runtime" />
<Reference Include="System.Resources.ResourceManager" />
<Reference Include="System.Text.RegularExpressions" />
<Reference Include="System.Threading" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private bool RunPredicate(T item)

/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
#pragma warning disable 8617
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
#pragma warning restore 8617
{
// Validate arguments. Some targets may have a null source, but FilteredLinkPropagator
Expand Down Expand Up @@ -939,7 +939,7 @@ public static TOutput Receive<TOutput>(

// Do fast path checks for both cancellation and data already existing.
cancellationToken.ThrowIfCancellationRequested();
TOutput fastCheckedItem;
TOutput? fastCheckedItem;
var receivableSource = source as IReceivableSourceBlock<TOutput>;
if (receivableSource != null && receivableSource.TryReceive(null, out fastCheckedItem))
{
Expand Down Expand Up @@ -995,7 +995,7 @@ private static Task<TOutput> ReceiveCore<TOutput>(
{
try
{
TOutput fastCheckedItem;
TOutput? fastCheckedItem;
if (receivableSource.TryReceive(null, out fastCheckedItem))
{
return Task.FromResult<TOutput>(fastCheckedItem);
Expand Down Expand Up @@ -1091,7 +1091,7 @@ private static Task<TOutput> ReceiveCoreByLinking<TOutput>(ISourceBlock<TOutput>
// So we are racing to dispose of the unlinker.
if (Volatile.Read(ref target._cleanupReserved))
{
IDisposable disposableUnlink = Interlocked.CompareExchange(ref target._unlink, null, unlink);
IDisposable? disposableUnlink = Interlocked.CompareExchange<IDisposable?>(ref target._unlink, null, unlink);
if (disposableUnlink != null) disposableUnlink.Dispose();
}
}
Expand Down Expand Up @@ -1173,7 +1173,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
{
// Accept the message if possible and complete this task with the message's value.
bool consumed = true;
T acceptedValue = consumeToAccept ? source!.ConsumeMessage(messageHeader, this, out consumed) : messageValue;
T? acceptedValue = consumeToAccept ? source!.ConsumeMessage(messageHeader, this, out consumed) : messageValue;
if (consumed)
{
status = DataflowMessageStatus.Accepted;
Expand Down Expand Up @@ -1962,7 +1962,7 @@ private static bool TryChooseFromSource<T>(
Debug.Assert(scheduler != null, "Expected a non-null scheduler");

// Try to receive from the source. If we can't, bail.
T result;
T? result;
var receivableSource = source as IReceivableSourceBlock<T>;
if (receivableSource == null || !receivableSource.TryReceive(out result))
{
Expand Down Expand Up @@ -2198,7 +2198,7 @@ public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T
if (consumeToAccept)
{
bool consumed;
messageValue = source!.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source!.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, _owningBatch, out consumed);
messageValue = source.ConsumeMessage(messageHeader, _owningBatch, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down Expand Up @@ -1004,7 +1004,7 @@ private void ConsumeReservedMessagesNonGreedy()
KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage
bool consumed;
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
if (!consumed)
{
// The protocol broke down, so throw an exception, as this is fatal. Before doing so, though,
Expand Down Expand Up @@ -1056,7 +1056,7 @@ private void ConsumeReservedMessagesGreedyBounded()
KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>> sourceAndMessage = reserved[i];
reserved[i] = default(KeyValuePair<ISourceBlock<T>, KeyValuePair<DataflowMessageHeader, T>>); // in case of exception from ConsumeMessage
bool consumed;
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
if (consumed)
{
var consumedMessage = new KeyValuePair<DataflowMessageHeader, T>(sourceAndMessage.Value.Key, consumedValue!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public BatchedJoinBlock(int batchSize, GroupingDataflowBlockOptions dataflowBloc
// messages, and thus when there may be a few stragglers we need to make a batch out of.
Action createBatchAction = () =>
{
if (_target1.Count > 0 || _target2.Count > 0)
if (_target1!.Count > 0 || _target2!.Count > 0)
{
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages()));
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2!.GetAndEmptyMessages()));
}
};

Expand Down Expand Up @@ -329,9 +329,9 @@ public BatchedJoinBlock(int batchSize, GroupingDataflowBlockOptions dataflowBloc
// messages, and thus when there may be a few stragglers we need to make a batch out of.
Action createBatchAction = () =>
{
if (_target1.Count > 0 || _target2.Count > 0 || _target3.Count > 0)
if (_target1!.Count > 0 || _target2!.Count > 0 || _target3!.Count > 0)
{
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2.GetAndEmptyMessages(), _target3.GetAndEmptyMessages()));
_source.AddMessage(Tuple.Create(_target1.GetAndEmptyMessages(), _target2!.GetAndEmptyMessages(), _target3!.GetAndEmptyMessages()));
}
};

Expand Down Expand Up @@ -598,7 +598,7 @@ public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
_messages.Add(messageValue!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down Expand Up @@ -352,7 +352,7 @@ private bool ConsumeAndStoreOneMessageIfAvailable()
bool consumed = false;
try
{
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
if (consumed)
{
_source.AddMessage(consumedValue!);
Expand Down Expand Up @@ -577,7 +577,7 @@ internal bool TryReceive(Predicate<TOutput>? filter, [MaybeNullWhen(false)] out
// synchronizing with other activities on the block.
// We don't want to execute the user-provided cloning delegate
// while holding the lock.
TOutput message;
TOutput? message;
bool isValid;
lock (OutgoingLock)
{
Expand Down Expand Up @@ -607,7 +607,7 @@ internal bool TryReceiveAll([NotNullWhen(true)] out IList<TOutput>? items)
{
// Try to receive the one item this block may have.
// If we can, give back an array of one item. Otherwise, give back null.
TOutput item;
TOutput? item;
if (TryReceive(null, out item))
{
items = new TOutput[] { item };
Expand Down Expand Up @@ -683,7 +683,7 @@ private void OfferCurrentMessageToNewTarget(ITargetBlock<TOutput> target)
Common.ContractAssertMonitorStatus(ValueLock, held: false);

// Get the current message if there is one
TOutput currentMessage;
TOutput? currentMessage;
bool isValid;
lock (ValueLock)
{
Expand Down Expand Up @@ -725,7 +725,7 @@ private bool OfferToTargets()
Common.ContractAssertMonitorStatus(ValueLock, held: false);

DataflowMessageHeader header = default(DataflowMessageHeader);
TOutput message = default(TOutput);
TOutput? message = default(TOutput);
int numDequeuedMessages = 0;
lock (ValueLock)
{
Expand Down Expand Up @@ -1053,7 +1053,7 @@ internal IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions li
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (target == null) throw new ArgumentNullException(nameof(target));

TOutput valueToClone;
TOutput? valueToClone;
lock (OutgoingLock) // We may currently be calling out under this lock to the target; requires it to be reentrant
{
lock (ValueLock)
Expand Down Expand Up @@ -1125,7 +1125,7 @@ internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlo
// If someone else holds the reservation, bail.
if (_nextMessageReservedFor != target) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);

TOutput messageToReoffer;
TOutput? messageToReoffer;
lock (ValueLock)
{
// If this is not the message at the head of the queue, bail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down Expand Up @@ -354,7 +354,7 @@ private bool ConsumeAndStoreOneMessageIfAvailable()
bool consumed = false;
try
{
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
T? consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
if (consumed)
{
_source.AddMessage(consumedValue!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public JoinBlock(GroupingDataflowBlockOptions dataflowBlockOptions)
_sharedResources = new JoinBlockTargetSharedResources(this, targets,
() =>
{
_source.AddMessage(Tuple.Create(_target1.GetOneMessage(), _target2.GetOneMessage()));
_source.AddMessage(Tuple.Create(_target1!.GetOneMessage(), _target2!.GetOneMessage()));
},
exception =>
{
Volatile.Write(ref _sharedResources._hasExceptions, true);
Volatile.Write(ref _sharedResources!._hasExceptions, true);
_source.AddException(exception);
},
dataflowBlockOptions);
Expand Down Expand Up @@ -297,10 +297,10 @@ public JoinBlock(GroupingDataflowBlockOptions dataflowBlockOptions)
// Configure the targets
var targets = new JoinBlockTargetBase[3];
_sharedResources = new JoinBlockTargetSharedResources(this, targets,
() => _source.AddMessage(Tuple.Create(_target1.GetOneMessage(), _target2.GetOneMessage(), _target3.GetOneMessage())),
() => _source.AddMessage(Tuple.Create(_target1!.GetOneMessage(), _target2!.GetOneMessage(), _target3!.GetOneMessage())),
exception =>
{
Volatile.Write(ref _sharedResources._hasExceptions, true);
Volatile.Write(ref _sharedResources!._hasExceptions, true);
_source.AddException(exception);
},
dataflowBlockOptions);
Expand Down Expand Up @@ -660,7 +660,7 @@ internal override bool ConsumeReservedMessage()
Debug.Assert(_nonGreedy!.ReservedMessage.Key != null, "This target must have a reserved message");

bool consumed;
T consumedValue = _nonGreedy.ReservedMessage.Key.ConsumeMessage(_nonGreedy.ReservedMessage.Value, this, out consumed);
T? consumedValue = _nonGreedy.ReservedMessage.Key.ConsumeMessage(_nonGreedy.ReservedMessage.Value, this, out consumed);

// Null out our reservation
_nonGreedy.ReservedMessage = default(KeyValuePair<ISourceBlock<T>, DataflowMessageHeader>);
Expand Down Expand Up @@ -721,7 +721,7 @@ internal override bool ConsumeOnePostponedMessage()

// Try to consume the popped message
bool consumed;
T consumedValue = next.Key.ConsumeMessage(next.Value, this, out consumed);
T? consumedValue = next.Key.ConsumeMessage(next.Value, this, out consumed);
if (consumed)
{
lock (_sharedResources.IncomingLock)
Expand Down Expand Up @@ -861,7 +861,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");

bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
if (_sharedResources._boundingState != null && HasTheHighestNumberOfMessagesAvailable) _sharedResources._boundingState.CurrentCount += 1; // track this new item against our bound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private TransformBlock(Func<TInput, TOutput>? transformSync, Func<TInput, Task<T
private void ProcessMessage(Func<TInput, TOutput> transform, KeyValuePair<TInput, long> messageWithId)
{
// Process the input message to get the output message
TOutput outputItem = default(TOutput);
TOutput? outputItem = default(TOutput);
bool itemIsValid = false;
try
{
Expand Down Expand Up @@ -272,7 +272,7 @@ private void AsyncCompleteProcessMessageWithTask(Task<TOutput> completed, KeyVal

bool isBounded = _target.IsBounded;
bool gotOutputItem = false;
TOutput outputItem = default(TOutput);
TOutput? outputItem = default(TOutput);

switch (completed.Status)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ bool IReceivableSourceBlock<T>.TryReceiveAll([NotNullWhen(true)] out IList<T>? i
// Try to receive the one item this block may have.
// If we can, give back an array of one item. Otherwise,
// give back null.
T item;
T? item;
if (TryReceive(null, out item))
{
items = new T[] { item };
Expand Down Expand Up @@ -357,7 +357,7 @@ DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader message
if (consumeToAccept)
{
bool consumed;
messageValue = source!.ConsumeMessage(messageHeader, this, out consumed);
messageValue = source!.ConsumeMessage(messageHeader, this, out consumed)!;
if (!consumed) return DataflowMessageStatus.NotAvailable;
}

Expand Down
Loading

0 comments on commit e803a1b

Please sign in to comment.