Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full execution provenance resolution #5639

Draft
wants to merge 54 commits into
base: master
Choose a base branch
from
Draft

Conversation

pditommaso
Copy link
Member

This PR implements the ability to trace the full provenance of a Nextflow pipeline, so that once a task execution is completed, it reports the set of direct upstream tasks that have originated one or more inputs.

How it works

Each output value that's emitted by a task or an operator is wrapped with an object instance. This makes it possible to assign to each emitted value a unique identity based on the underlying Java object identity.

Each object is associated with the corresponding task or operator run (i.e. TaskRun and OperatorRun).

Once the output value is received as an input by task, the upstream task is determined by inspecting the output-run association table.

Required changes

This approach requires enclosing each output value with a wrapper object, and "unwrap" it once it is received by the downstream task or operator, so that the corresponding operation is not altered.

The input unwrapping can be automated easily both for tasks and operators because they have a common message receive interface.

However the output wrapping requires modifying all nextflow operators because each of them of a custom logic to produce the outputs

Possible problems

It should be assessed the impact of creating an object instance for each output value generated by the workflow execution on the underlying Java heap.

Similarity, keeping a heap reference for each task and operator run may determine memory pressure on large workflow graphs.

Current state and next steps

The current implementation demonstrates that this approach is viable. The solution already supports any tasks and the operators: branch, map, flatMap, collectFile.

Tests are available in this case.

The remaining operators should be added to fully support existing workflow applications.

Alternative solution

A simpler solution is possible using the output file paths as the identity value to track the tasks provenance using a logic very similar to the above proposal.

However, the path approach is limited to the case in which all workflow tasks and operator produce file values. The provenance can be tracked for task having one or more non-file input/output values.

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@pditommaso pditommaso requested review from jorgee and bentsherman and removed request for jorgee January 5, 2025 13:06
Copy link

netlify bot commented Jan 5, 2025

Deploy Preview for nextflow-docs-staging canceled.

Name Link
🔨 Latest commit 0e30a8f
🔍 Latest deploy log https://app.netlify.com/sites/nextflow-docs-staging/deploys/678ed618926b500008a64754

@pditommaso pditommaso marked this pull request as draft January 5, 2025 13:22
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@bentsherman
Copy link
Member

This is why we need fewer operators 😆

The splitter operators should work similarly to flatMap

@pditommaso
Copy link
Member Author

I know, I know but they exists

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@kenibrewer

This comment was marked as off-topic.

@pditommaso

This comment was marked as off-topic.

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@bentsherman
Copy link
Member

For posterity, here is an example from fetchngs that would not be captured by file-based tracking:

https://github.com/nf-core/fetchngs/blob/8ec2d934f9301c818d961b1e4fdf7fc79610bdc5/workflows/sra/main.nf#L54-L57

SRA_RUNINFO_TO_FTP outputs a csv file that is split into records using the splitCsv operator. These records are filtered and eventually passed to CUSTOM_SRATOOLSNCBISETTINGS:

https://github.com/nf-core/fetchngs/blob/8ec2d934f9301c818d961b1e4fdf7fc79610bdc5/subworkflows/nf-core/fastq_download_prefetch_fasterqdump_sratools/main.nf#L20

So fetchngs should be a good test case for identity-based tracking.

@bentsherman
Copy link
Member

After a first pass, I feel good about the overall approach. Most of the changes seem to be general cleanup, which is appreciated, and most of the new behavior is isolated into new packages, which should keep it easy to evolve. I only have some minor questions that we can discuss later.

The trickiest part is clearly the operators -- linking inputs to outputs correctly, wrapping/unwrapping values correctly, especially for the scatter and gather operators. I will want to dig into this bit to see if we can simplify anything, but even the current amount of overhead looks acceptable.

It looks like the memory impact will be manageable. The Msg wrapper itself shouldn't cost much, at most a few MB. My main concern was keeping lots of intermediate objects alive, but looks like you avoided this by keeping only the object identity of Msg and not the reference itself. You do have to keep all task runs alive, which in truth I have not stress-tested, but nf-prov is already doing this so no change there.

I'm curious to see how you handle groupTuple... should be similar to buffer/collate, just harder 😆

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@@ -55,20 +62,21 @@ class GroupTupleOp {

private sort

GroupTupleOp(Map params, DataflowReadChannel source) {
private OpContext context = new ContextRunPerThread()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does groupTuple use the run-per-thread context? Same question for collate. I would expect both to use ContextGrouping, whereas ContextRunPerThread seems to be for combining multiple input channels. But groupTuple and collate are both single-threaded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ContextRunPerThread is needed when an operator uses multiple DataflowProcessors under the hood. Likely here also ContextSequential should work, because the real logic is provided by this snipper that unwrap the values and create a new OperatorRun instead with the corresponding inputs ids

for( Object it : tuple ) {
if( it instanceof ArrayBag ) {
final bag = it
for( int i=0; i<bag.size(); i++ ) {
bag[i] = OpDatum.unwrap(bag[i], inputs)
}
}
}
return new OperatorRun(new LinkedHashSet<Integer>(inputs))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you could make it work with either the ContextGrouping or the ContextSequential. Still trying to understand how the OpDatum is being used but it seems like a good way to associate inputs with outputs when the mapping is not simply 1-to-1.

I figure we should avoid the ContextRunPerThread where it isn't needed, since both collate and groupTuple have only one DataflowProcessor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpDatum keep together a concrete value with the OperatorRun instance that acquired it and ultimately the set of inputs IDs. When the operator emission is composed, a new run instance is created with all inputs that concurred to that outputs. Makes sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that makes sense. Perhaps the better question is, what does an "operator run" mean for an operator like groupTuple? Each input is a separate "run" in one sense, but only one of those "runs" will emit an output. So is there an OperatorRun for each input or one for each emitted group?

I will write some tests and use my mermaid diagram to test some of these operators. I suspect something is missing for buffer / collate / groupTuple

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle it should be an instance for each run, but in the reality it's used as object holder to associate an output message with the upstream inputs. See here

protected Set<TaskId> findUpstreamTasks0(final int msgId, Set<TaskId> upstream) {
final run = messages.get(msgId)
if( run instanceof TaskRun ) {
upstream.add(run.id)
return upstream
}
if( run instanceof OperatorRun ) {
for( Integer it : run.inputIds ) {
if( it!=msgId ) {
findUpstreamTasks0(it, upstream)
}
else {
log.trace "Skip duplicate provenance message id=${msgId}"
}
}
}
return upstream
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Maybe it would be better to rename OperatorRun to OperatorLink, and rename TrailRun to ProvenanceLink, to clarify their purpose as a link from a set of inputs to an output

@bentsherman
Copy link
Member

bentsherman commented Jan 18, 2025

I ripped the code from nf-prov to generate a mermaid diagram of the task graph using your provenance method.

Your rnaseq-nf toy pipeline works fine:

image

I tried to run against fetchngs, but the run hangs at the very end 😞

You should be able to reproduce it with:

make pack
./build/releases/nextflow-24.11.0-edge-dist run nf-core/fetchngs -r 1.12.0 -profile test,conda --outdir results

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@pditommaso
Copy link
Member Author

Well done. I'll check fetchngs asap

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
import groovy.util.logging.Slf4j
import nextflow.prov.OperatorRun
/**
* Implements an operator context that binds a new run to the current thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth commenting here that a "thread" here refers to a DataflowProcessor (i.e. input channel).

My custom tests on join and mix are working correctly, so I'm assuming that each DataflowProcessor uses the same thread for all of its runs (as long as maxForks is 1).

For operators like collate and groupTuple that use only one DP, the run-per-thread context essentially allows you to manually override how the provenance links are recorded. They are also working correctly with my tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better would be to replace the thread-local with a Map<DataflowProcessor,OperatorLink>, but not a big deal if that would be too complicated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants