-
Notifications
You must be signed in to change notification settings - Fork 651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Full execution provenance resolution #5639
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
✅ Deploy Preview for nextflow-docs-staging canceled.
|
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
3e21298
to
2b4c5b3
Compare
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
This is why we need fewer operators 😆 The splitter operators should work similarly to flatMap |
I know, I know but they exists |
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
ad6fa38
to
4a77f54
Compare
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
4a77f54
to
ff74318
Compare
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
For posterity, here is an example from fetchngs that would not be captured by file-based tracking:
So fetchngs should be a good test case for identity-based tracking. |
After a first pass, I feel good about the overall approach. Most of the changes seem to be general cleanup, which is appreciated, and most of the new behavior is isolated into new packages, which should keep it easy to evolve. I only have some minor questions that we can discuss later. The trickiest part is clearly the operators -- linking inputs to outputs correctly, wrapping/unwrapping values correctly, especially for the scatter and gather operators. I will want to dig into this bit to see if we can simplify anything, but even the current amount of overhead looks acceptable. It looks like the memory impact will be manageable. The I'm curious to see how you handle groupTuple... should be similar to buffer/collate, just harder 😆 |
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
50bb038
to
6b926f9
Compare
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@@ -55,20 +62,21 @@ class GroupTupleOp { | |||
|
|||
private sort | |||
|
|||
GroupTupleOp(Map params, DataflowReadChannel source) { | |||
private OpContext context = new ContextRunPerThread() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does groupTuple use the run-per-thread context? Same question for collate. I would expect both to use ContextGrouping, whereas ContextRunPerThread seems to be for combining multiple input channels. But groupTuple and collate are both single-threaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ContextRunPerThread
is needed when an operator uses multiple DataflowProcessor
s under the hood. Likely here also ContextSequential
should work, because the real logic is provided by this snipper that unwrap the values and create a new OperatorRun
instead with the corresponding inputs ids
nextflow/modules/nextflow/src/main/groovy/nextflow/extension/GroupTupleOp.groovy
Lines 168 to 177 in 61c153c
for( Object it : tuple ) { | |
if( it instanceof ArrayBag ) { | |
final bag = it | |
for( int i=0; i<bag.size(); i++ ) { | |
bag[i] = OpDatum.unwrap(bag[i], inputs) | |
} | |
} | |
} | |
return new OperatorRun(new LinkedHashSet<Integer>(inputs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you could make it work with either the ContextGrouping or the ContextSequential. Still trying to understand how the OpDatum is being used but it seems like a good way to associate inputs with outputs when the mapping is not simply 1-to-1.
I figure we should avoid the ContextRunPerThread where it isn't needed, since both collate and groupTuple have only one DataflowProcessor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OpDatum keep together a concrete value with the OperatorRun instance that acquired it and ultimately the set of inputs IDs. When the operator emission is composed, a new run instance is created with all inputs that concurred to that outputs. Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that makes sense. Perhaps the better question is, what does an "operator run" mean for an operator like groupTuple? Each input is a separate "run" in one sense, but only one of those "runs" will emit an output. So is there an OperatorRun for each input or one for each emitted group?
I will write some tests and use my mermaid diagram to test some of these operators. I suspect something is missing for buffer / collate / groupTuple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle it should be an instance for each run, but in the reality it's used as object holder to associate an output message with the upstream inputs. See here
nextflow/modules/nextflow/src/main/groovy/nextflow/prov/Tracker.groovy
Lines 114 to 131 in 0e30a8f
protected Set<TaskId> findUpstreamTasks0(final int msgId, Set<TaskId> upstream) { | |
final run = messages.get(msgId) | |
if( run instanceof TaskRun ) { | |
upstream.add(run.id) | |
return upstream | |
} | |
if( run instanceof OperatorRun ) { | |
for( Integer it : run.inputIds ) { | |
if( it!=msgId ) { | |
findUpstreamTasks0(it, upstream) | |
} | |
else { | |
log.trace "Skip duplicate provenance message id=${msgId}" | |
} | |
} | |
} | |
return upstream | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Maybe it would be better to rename OperatorRun
to OperatorLink
, and rename TrailRun
to ProvenanceLink
, to clarify their purpose as a link from a set of inputs to an output
I ripped the code from nf-prov to generate a mermaid diagram of the task graph using your provenance method. Your rnaseq-nf toy pipeline works fine: I tried to run against fetchngs, but the run hangs at the very end 😞 You should be able to reproduce it with: make pack
./build/releases/nextflow-24.11.0-edge-dist run nf-core/fetchngs -r 1.12.0 -profile test,conda --outdir results |
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Well done. I'll check fetchngs asap |
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
import groovy.util.logging.Slf4j | ||
import nextflow.prov.OperatorRun | ||
/** | ||
* Implements an operator context that binds a new run to the current thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth commenting here that a "thread" here refers to a DataflowProcessor (i.e. input channel).
My custom tests on join
and mix
are working correctly, so I'm assuming that each DataflowProcessor uses the same thread for all of its runs (as long as maxForks is 1).
For operators like collate
and groupTuple
that use only one DP, the run-per-thread context essentially allows you to manually override how the provenance links are recorded. They are also working correctly with my tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even better would be to replace the thread-local with a Map<DataflowProcessor,OperatorLink>
, but not a big deal if that would be too complicated
This PR implements the ability to trace the full provenance of a Nextflow pipeline, so that once a task execution is completed, it reports the set of direct upstream tasks that have originated one or more inputs.
How it works
Each output value that's emitted by a task or an operator is wrapped with an object instance. This makes it possible to assign to each emitted value a unique identity based on the underlying Java object identity.
Each object is associated with the corresponding task or operator run (i.e.
TaskRun
andOperatorRun
).Once the output value is received as an input by task, the upstream task is determined by inspecting the output-run association table.
Required changes
This approach requires enclosing each output value with a wrapper object, and "unwrap" it once it is received by the downstream task or operator, so that the corresponding operation is not altered.
The input unwrapping can be automated easily both for tasks and operators because they have a common message receive interface.
However the output wrapping requires modifying all nextflow operators because each of them of a custom logic to produce the outputs
Possible problems
It should be assessed the impact of creating an object instance for each output value generated by the workflow execution on the underlying Java heap.
Similarity, keeping a heap reference for each task and operator run may determine memory pressure on large workflow graphs.
Current state and next steps
The current implementation demonstrates that this approach is viable. The solution already supports any tasks and the operators:
branch
,map
,flatMap
,collectFile
.Tests are available in this case.
The remaining operators should be added to fully support existing workflow applications.
Alternative solution
A simpler solution is possible using the output file paths as the identity value to track the tasks provenance using a logic very similar to the above proposal.
However, the path approach is limited to the case in which all workflow tasks and operator produce file values. The provenance can be tracked for task having one or more non-file input/output values.