Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System.Threading.Tasks.Dataflow.DataflowBlock.ReceiveAsync does not use custom scheduler #83159

Open
cretz opened this issue Mar 8, 2023 · 7 comments
Labels
area-System.Threading.Tasks documentation Documentation bug or enhancement, does not impact product or test code
Milestone

Comments

@cretz
Copy link

cretz commented Mar 8, 2023

Description

ReceiveAsync creates a ReceiveTarget at

var target = new ReceiveTarget<TOutput>();
that does not allow any configuration of a task scheduler like https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-specify-a-task-scheduler-in-a-dataflow-block would have one believe.

Reproduction Steps

Create block, e.g. var block = new BufferBlock<bool>(new() { TaskScheduler = TaskScheduler.Current }); and confirm that block.ReceiveAsync() does not use it..

Expected behavior

Should use configured task scheduler or at least be well documented that there is no configuration of task scheduling for some dataflow extension methods.

Actual behavior

Surprising behavior of using unconfigurable global default scheduler. While global default may be sensible default, should either allow customization or document the limitation.

Regression?

No response

Known Workarounds

None, have to not use the extension methods.

Configuration

No response

Other information

No response

@ghost ghost added the untriaged New issue has not been triaged by the area owner label Mar 8, 2023
@ghost
Copy link

ghost commented Mar 8, 2023

Tagging subscribers to this area: @dotnet/area-system-threading-tasks
See info in area-owners.md if you want to be subscribed.

Issue Details

Description

ReceiveAsync creates a ReceiveTarget at

var target = new ReceiveTarget<TOutput>();
that does not allow any configuration of a task scheduler like https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-specify-a-task-scheduler-in-a-dataflow-block would have one believe.

Reproduction Steps

Create block, e.g. var block = new BufferBlock<bool>(new() { TaskScheduler = TaskScheduler.Current }); and confirm that block.ReceiveAsync() does not use it..

Expected behavior

Should use configured task scheduler or at least be well documented that there is no configuration of task scheduling for some dataflow extension methods.

Actual behavior

Surprising behavior of using unconfigurable global default scheduler. While global default may be sensible default, should either allow customization or document the limitation.

Regression?

No response

Known Workarounds

None, have to not use the extension methods.

Configuration

No response

Other information

No response

Author: cretz
Assignees: -
Labels:

area-System.Threading.Tasks

Milestone: -

@stephentoub
Copy link
Member

Which tasks aren't being scheduled to the block's scheduler that you'd expect to be?

@cretz
Copy link
Author

cretz commented Mar 9, 2023

SendAsync causes it for a receive target on ReceiveAsync. I can explain my use case.

I am making a deterministic manual task scheduler for Temporal workflows at https://github.com/temporalio/sdk-dotnet. I already have a custom event source listener that checks that all tasks are created on my scheduler when running with my scheduler as the current scheduler to prevent anyone from using the default scheduler in workflows. And of course https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2008 catches some (though I request they actually do use Current).

But this triggered my detector because the task for SendAsync/ReceiveAsync schedules a task on a different scheduler. https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-specify-a-task-scheduler-in-a-dataflow-block implies I can customize the scheduler for dataflow block use. Here's the stack:

Temporalio.Exceptions.InvalidWorkflowOperationException: Task scheduled during workflow run was not scheduled on workflow scheduler
   at System.Environment.get_StackTrace()
   at Temporalio.Worker.WorkflowTaskEventListener.OnEventWritten(EventWrittenEventArgs eventData) in c:\work\tem\sdk-dotnet\temporal-sdk-dotnet\src\Temporalio\Worker\WorkflowTaskEventListener.cs:line 120
   at System.Diagnostics.Tracing.EventSource.DispatchToAllListeners(EventWrittenEventArgs eventCallbackArgs)
   at System.Diagnostics.Tracing.EventSource.WriteToAllListeners(EventWrittenEventArgs eventCallbackArgs, Int32 eventDataCount, EventData* data)
   at System.Diagnostics.Tracing.EventSource.WriteEventWithRelatedActivityIdCore(Int32 eventId, Guid* relatedActivityId, Int32 eventDataCount, EventData* data)
   at System.Threading.Tasks.TplEventSource.TaskScheduled(Int32 OriginatingTaskSchedulerID, Int32 OriginatingTaskID, Int32 TaskID, Int32 CreatingTaskID, Int32 TaskCreationOptions, Int32 appDomain)
   at System.Threading.Tasks.Task.FireTaskScheduledIfNeeded(TaskScheduler ts)
   at System.Threading.Tasks.TaskScheduler.InternalQueueTask(Task task)
   at System.Threading.Tasks.Task.ScheduleAndStart(Boolean needsProtection)
   at System.Threading.Tasks.Task.InternalStartNew(Task creatingTask, Delegate action, Object state, CancellationToken cancellationToken, TaskScheduler scheduler, TaskCreationOptions options, InternalTaskOptions internalOptions)
   at System.Threading.Tasks.TaskFactory.StartNew(Action`1 action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskScheduler scheduler)
   at System.Threading.Tasks.Dataflow.DataflowBlock.ReceiveTarget`1.CleanupAndComplete(ReceiveCoreByLinkingCleanupReason reason)
   at System.Threading.Tasks.Dataflow.DataflowBlock.ReceiveTarget`1.System.Threading.Tasks.Dataflow.ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock`1 source, Boolean consumeToAccept)
   at System.Threading.Tasks.Dataflow.Internal.SourceCore`1.OfferToTargets(ITargetBlock`1 linkToTarget)
   at System.Threading.Tasks.Dataflow.Internal.SourceCore`1.OfferMessagesLoopCore()
   at System.Threading.Tasks.Dataflow.Internal.SourceCore`1.<>c.<OfferAsyncIfNecessary_Slow>b__44_0(Object thisSourceCore)
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__272_0(Object obj)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
   at System.Threading.Tasks.Task.ExecuteEntry()
   at System.Threading.Tasks.TaskScheduler.TryExecuteTask(Task task)

I can understand if this won't be supported, but maybe at least some docs saying dataflow extension calls cannot use custom scheduling.

@cretz
Copy link
Author

cretz commented Mar 9, 2023

Specifically

System.Threading.Tasks.Task.Factory.StartNew(static state =>
{
// Complete with the received value
var target = (ReceiveTarget<T>)state!;
try { target.TrySetResult(target._receivedValue!); }
catch (ObjectDisposedException) { /* benign race if returned task is already disposed */ }
}, this, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default);

@ericstj
Copy link
Member

ericstj commented Aug 11, 2023

@stephentoub what's your thought on this one. Is there an issue here with the scheduling or should we just document this?

@ericstj ericstj removed the untriaged New issue has not been triaged by the area owner label Aug 11, 2023
@ericstj ericstj added this to the 9.0.0 milestone Aug 11, 2023
@ericstj ericstj added the question Answer questions and provide assistance, not an issue with source code or documentation. label Aug 11, 2023
@stephentoub
Copy link
Member

stephentoub commented Dec 11, 2023

@stephentoub what's your thought on this one. Is there an issue here with the scheduling or should we just document this?

This is behaving as designed, so from that perspective, it would just be further clarification as needed in the docs. Factoring in a different scheduler would require new APIs / overloads; if that's what's desired, it'd be good to go through the API proposal route.

I can explain my use case.

Thanks for the details. If you'd like to propose a new overload, please feel free to open an API issue for that.

@cretz
Copy link
Author

cretz commented Dec 11, 2023

👍 I don't have a concrete proposal, so just some docs clarity would be ideal (thanks!)

@stephentoub stephentoub added documentation Documentation bug or enhancement, does not impact product or test code and removed question Answer questions and provide assistance, not an issue with source code or documentation. labels Jun 28, 2024
@stephentoub stephentoub modified the milestones: 9.0.0, Future Jun 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-System.Threading.Tasks documentation Documentation bug or enhancement, does not impact product or test code
Projects
None yet
Development

No branches or pull requests

3 participants