-
Notifications
You must be signed in to change notification settings - Fork 359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WX-1217 Workflow completion callback #7213
Changes from 9 commits
05a30bb
106b669
5fc1e66
3e8500f
a114372
4ff381d
14584e9
a87cf07
891121e
a5454aa
f37d614
04b8ce7
a9c2ce2
27dac91
b190a44
84b6461
5f1e3cb
2870c92
ade1ca2
7cddd10
0270448
a91ddd9
c8ea146
9867876
6c94189
fd1b171
25e2f3a
5392525
9dcfe21
f8983cf
d51bece
029b24c
b291a1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
package cromwell.engine.workflow | ||
|
||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
import akka.actor.SupervisorStrategy.Stop | ||
import akka.actor._ | ||
import com.typesafe.config.Config | ||
|
@@ -23,6 +22,7 @@ import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor | |
import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor.{DeleteWorkflowFilesFailedResponse, DeleteWorkflowFilesSucceededResponse, StartWorkflowFilesDeletion} | ||
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor | ||
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor._ | ||
import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand | ||
import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationFailedResponse, WorkflowFinalizationSucceededResponse} | ||
import cromwell.engine.workflow.lifecycle.finalization.{CopyWorkflowLogsActor, CopyWorkflowOutputsActor, WorkflowFinalizationActor} | ||
import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor | ||
|
@@ -52,6 +52,7 @@ object WorkflowActor { | |
final case class AbortWorkflowWithExceptionCommand(exception: Throwable) extends WorkflowActorCommand | ||
case object SendWorkflowHeartbeatCommand extends WorkflowActorCommand | ||
case object AwaitMetadataIntegrity | ||
case class PerformWorkflowCallback(uri: Option[String], workflowState: WorkflowState) | ||
|
||
case class WorkflowFailedResponse(workflowId: WorkflowId, inState: WorkflowActorState, reasons: Seq[Throwable]) | ||
|
||
|
@@ -149,7 +150,7 @@ object WorkflowActor { | |
initializationData: AllBackendInitializationData, | ||
lastStateReached: StateCheckpoint, | ||
effectiveStartableState: StartableState, | ||
workflowFinalOutputs: Set[WomValue] = Set.empty, | ||
workflowFinalOutputs: Option[CallOutputs] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change was made so that the |
||
workflowAllOutputs: Set[WomValue] = Set.empty, | ||
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty, | ||
failedInitializationAttempts: Int = 0) | ||
|
@@ -176,6 +177,7 @@ object WorkflowActor { | |
ioActor: ActorRef, | ||
serviceRegistryActor: ActorRef, | ||
workflowLogCopyRouter: ActorRef, | ||
workflowCallbackActor: Option[ActorRef], | ||
jobStoreActor: ActorRef, | ||
subWorkflowStoreActor: ActorRef, | ||
callCacheReadActor: ActorRef, | ||
|
@@ -199,6 +201,7 @@ object WorkflowActor { | |
ioActor = ioActor, | ||
serviceRegistryActor = serviceRegistryActor, | ||
workflowLogCopyRouter = workflowLogCopyRouter, | ||
workflowCallbackActor = workflowCallbackActor, | ||
jobStoreActor = jobStoreActor, | ||
subWorkflowStoreActor = subWorkflowStoreActor, | ||
callCacheReadActor = callCacheReadActor, | ||
|
@@ -226,6 +229,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart, | |
ioActor: ActorRef, | ||
override val serviceRegistryActor: ActorRef, | ||
workflowLogCopyRouter: ActorRef, | ||
workflowCallbackActor: Option[ActorRef], | ||
jobStoreActor: ActorRef, | ||
subWorkflowStoreActor: ActorRef, | ||
callCacheReadActor: ActorRef, | ||
|
@@ -516,6 +520,11 @@ class WorkflowActor(workflowToStart: WorkflowToStart, | |
stay() | ||
case Event(AwaitMetadataIntegrity, data) => | ||
goto(MetadataIntegrityValidationState) using data.copy(lastStateReached = data.lastStateReached.copy(state = stateName)) | ||
case Event(PerformWorkflowCallback(uri, workflowState), data) => | ||
workflowCallbackActor.foreach { wca => | ||
wca ! PerformCallbackCommand(workflowId, uri, workflowState, data.workflowFinalOutputs.getOrElse(CallOutputs.empty)) | ||
} | ||
stay() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TOL: Is it worth putting this handler into the state we expect this to happen from, and having a separate |
||
} | ||
|
||
onTransition { | ||
|
@@ -534,26 +543,28 @@ class WorkflowActor(workflowToStart: WorkflowToStart, | |
case _ => // The WMA is waiting for the WorkflowActorWorkComplete message. No extra information needed here. | ||
} | ||
|
||
// Copy/Delete workflow logs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section is just a copy/paste of this code to move the brute force workflow options up in scope (outside |
||
if (WorkflowLogger.isEnabled) { | ||
/* | ||
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are | ||
* being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still | ||
* be copied by accessing the workflow options outside of the EngineWorkflowDescriptor. | ||
*/ | ||
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions | ||
val system = context.system | ||
val ec = context.system.dispatcher | ||
def bruteForcePathBuilders: Future[List[PathBuilder]] = { | ||
// Protect against path builders that may throw an exception instead of returning a failed future | ||
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten | ||
} | ||
/* | ||
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are | ||
* being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still | ||
* be copied by accessing the workflow options outside of the EngineWorkflowDescriptor. Used for both | ||
* copying workflow log and sending workflow callback. | ||
*/ | ||
jgainerdewar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions | ||
|
||
val system = context.system | ||
val ec = context.system.dispatcher | ||
def bruteForcePathBuilders: Future[List[PathBuilder]] = { | ||
// Protect against path builders that may throw an exception instead of returning a failed future | ||
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten | ||
} | ||
|
||
val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match { | ||
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders)) | ||
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders) | ||
} | ||
val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match { | ||
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders)) | ||
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders) | ||
} | ||
|
||
// Copy/Delete workflow logs | ||
if (WorkflowLogger.isEnabled) { | ||
workflowOptions.get(FinalWorkflowLogDir).toOption match { | ||
case Some(destinationDir) => | ||
pathBuilders | ||
|
@@ -566,6 +577,10 @@ class WorkflowActor(workflowToStart: WorkflowToStart, | |
} | ||
} | ||
|
||
// Attempt to perform workflow completion callback | ||
val callbackUri = workflowOptions.get(WorkflowOptions.WorkflowCallbackUri).toOption | ||
self ! PerformWorkflowCallback(callbackUri, terminalState.workflowState) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I initially implemented this as part of finalization (you can see that working in earlier commits) but that turned out not to work well. Workflows are still Running, and may still fail, during finalization. Also, if a workflow fails very early in its lifecycle we may not ever perform finalization. Putting it here, at the moment of transition to terminal state, seemed right. I wanted to send a message directly to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There might be a |
||
|
||
// We can't transition from within another transition function, but we can instruct ourselves to with a message: | ||
self ! AwaitMetadataIntegrity | ||
jgainerdewar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
@@ -627,7 +642,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart, | |
rootWorkflowId = rootWorkflowId, | ||
rootWorkflowRootPaths = data.initializationData.getWorkflowRoots(), | ||
rootAndSubworkflowIds = data.rootAndSubworkflowIds, | ||
workflowFinalOutputs = data.workflowFinalOutputs, | ||
workflowFinalOutputs = data.workflowFinalOutputs.map(out => out.outputs.values.toSet).getOrElse(Set.empty), | ||
workflowAllOutputs = data.workflowAllOutputs, | ||
pathBuilders = data.workflowDescriptor.get.pathBuilders, | ||
serviceRegistryActor = serviceRegistryActor, | ||
|
@@ -689,7 +704,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart, | |
finalizationActor ! StartFinalizationCommand | ||
goto(FinalizingWorkflowState) using data.copy( | ||
lastStateReached = StateCheckpoint (lastStateOverride.getOrElse(stateName), failures), | ||
workflowFinalOutputs = workflowFinalOutputs.outputs.values.toSet, | ||
workflowFinalOutputs = Option(workflowFinalOutputs), | ||
workflowAllOutputs = workflowAllOutputs, | ||
rootAndSubworkflowIds = rootAndSubworkflowIds | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One could imagine a
Map[WorkflowState, String]
but definitely premature generalization for now