Skip to content

Commit a280d0e

Browse files
committed
extend observerv2 with onFileStaged
Signed-off-by: Robrecht Cannoodt <rcannood@gmail.com>
1 parent fe62764 commit a280d0e

File tree

4 files changed

+63
-0
lines changed

4 files changed

+63
-0
lines changed

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ import nextflow.trace.TraceObserverV2
7777
import nextflow.trace.TraceRecord
7878
import nextflow.trace.WorkflowStatsObserver
7979
import nextflow.trace.event.FilePublishEvent
80+
import nextflow.trace.event.FileStagingEvent
8081
import nextflow.trace.event.TaskEvent
8182
import nextflow.trace.event.WorkflowOutputEvent
8283
import nextflow.util.Barrier
@@ -1116,6 +1117,10 @@ class Session implements ISession {
11161117
notifyEvent(observersV2, ob -> ob.onFilePublish(event))
11171118
}
11181119

1120+
void notifyFileStaged(FileStagingEvent event) {
1121+
notifyEvent(observersV2, ob -> ob.onFileStaged(event))
1122+
}
1123+
11191124
void notifyFlowComplete() {
11201125
notifyEvent(observersV1, ob -> ob.onFlowComplete())
11211126
notifyEvent(observersV2, ob -> ob.onFlowComplete())

modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import groovy.transform.ToString
3939
import groovy.util.logging.Slf4j
4040
import nextflow.Session
4141
import nextflow.exception.ProcessStageException
42+
import nextflow.trace.event.FileStagingEvent
4243
import nextflow.extension.FilesEx
4344
import nextflow.util.CacheHelper
4445
import nextflow.util.Duration
@@ -100,6 +101,15 @@ class FilePorter {
100101
if( batch.size() ) {
101102
log.trace "Stage foreign files: $batch"
102103
submitStagingActions(batch.foreignPaths)
104+
105+
// Notify observers about file staging completion events
106+
for( FileCopy copy : batch.foreignPaths ) {
107+
session.notifyFileStaged(new FileStagingEvent(
108+
source: copy.source,
109+
target: copy.target
110+
))
111+
}
112+
103113
log.trace "Stage foreign files completed: $batch"
104114
}
105115
}

modules/nextflow/src/main/groovy/nextflow/trace/TraceObserverV2.groovy

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import groovy.transform.CompileStatic
2121
import nextflow.Session
2222
import nextflow.processor.TaskProcessor
2323
import nextflow.trace.event.FilePublishEvent
24+
import nextflow.trace.event.FileStagingEvent
2425
import nextflow.trace.event.TaskEvent
2526
import nextflow.trace.event.WorkflowOutputEvent
2627

@@ -135,4 +136,11 @@ interface TraceObserverV2 {
135136
*/
136137
default void onFilePublish(FilePublishEvent event) {}
137138

139+
/**
140+
* Invoked when a file staging operation completes (after the file has been copied).
141+
*
142+
* @param event
143+
*/
144+
default void onFileStaged(FileStagingEvent event) {}
145+
138146
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.trace.event
18+
19+
import java.nio.file.Path
20+
21+
import groovy.transform.Canonical
22+
import groovy.transform.CompileStatic
23+
24+
/**
25+
* Models a file staging event.
26+
*
27+
* @author Robrecht Cannoodt <robrecht.cannoodt@gmail.com>
28+
*/
29+
@Canonical
30+
@CompileStatic
31+
class FileStagingEvent {
32+
/**
33+
* The original source path (e.g., remote URL or path).
34+
*/
35+
Path source
36+
/**
37+
* The target staged path in the work directory.
38+
*/
39+
Path target
40+
}

0 commit comments

Comments
 (0)