Skip to content

Commit

Permalink
BW-1206 - Combine all Wes Endpoints & add Tests (#6833)
Browse files Browse the repository at this point in the history
* Add tests, getting frid of WesRunRoutes.scala

* wesWorkflowId fix, ec implicits errors gone

* Refactoring path for GET /runs

* Indentation fix

* Commit to rollback

* Revert "Indentation fix"

This reverts commit 63fc484.

* PR trigger

* Optimize imports

* Missed import
  • Loading branch information
kpierre13 authored Aug 24, 2022
1 parent 3a19be6 commit e802f29
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 240 deletions.
4 changes: 1 addition & 3 deletions engine/src/main/scala/cromwell/server/CromwellServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import cromwell.services.instrumentation.CromwellInstrumentationActor
import cromwell.webservice.SwaggerService
import cromwell.webservice.routes.CromwellApiService
import cromwell.webservice.routes.wes.WesRouteSupport
import cromwell.webservice.routes.wes.WesRunRoutes

import scala.concurrent.Future
import scala.util.{Failure, Success}
Expand All @@ -37,7 +36,6 @@ class CromwellServerActor(cromwellSystem: CromwellSystem, gracefulShutdown: Bool
with CromwellApiService
with CromwellInstrumentationActor
with WesRouteSupport
with WesRunRoutes
with SwaggerService
with ActorLogging {
implicit val actorSystem = context.system
Expand All @@ -53,7 +51,7 @@ class CromwellServerActor(cromwellSystem: CromwellSystem, gracefulShutdown: Bool
* cromwell.yaml is broken unless the swagger index.html is patched. Copy/paste the code from rawls or cromiam if
* actual cromwell+swagger+oauth+/api support is needed.
*/
val apiRoutes: Route = pathPrefix("api")(concat(workflowRoutes, womtoolRoutes, wesRoutes, runRoutes))
val apiRoutes: Route = pathPrefix("api")(concat(workflowRoutes, womtoolRoutes, wesRoutes))
val nonApiRoutes: Route = concat(engineRoutes, swaggerUiResourceRoute)
val allRoutes: Route = concat(apiRoutes, nonApiRoutes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ import scala.io.Source
import scala.util.{Failure, Success, Try}

trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils with WesCromwellRouteSupport {

import CromwellApiService._

implicit def actorRefFactory: ActorRefFactory

implicit val materializer: ActorMaterializer
implicit val ec: ExecutionContext

Expand All @@ -57,9 +55,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w
}
},
path("engine" / Segment / "version") { _ =>
get {
complete(versionResponse)
}
get { complete(versionResponse) }
},
path("engine" / Segment / "status") { _ =>
onComplete(serviceRegistryActor.ask(GetCurrentStatus).mapTo[StatusCheckResponse]) {
Expand All @@ -74,11 +70,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w

val workflowRoutes =
path("workflows" / Segment / "backends") { _ =>
get {
instrumentRequest {
complete(ToResponseMarshallable(backendResponse))
}
}
get { instrumentRequest { complete(ToResponseMarshallable(backendResponse)) } }
} ~
path("workflows" / Segment / "callcaching" / "diff") { _ =>
parameterSeq { parameters =>
Expand Down Expand Up @@ -144,7 +136,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w
val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor) flatMap { workflowId =>
workflowStoreActor.ask(WorkflowStoreActor.WorkflowOnHoldToSubmittedCommand(workflowId)).mapTo[WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedResponse]
}
onComplete(response) {
onComplete(response){
case Success(WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedFailure(_, e: NotInOnHoldStateException)) => e.errorRequest(StatusCodes.Forbidden)
case Success(WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedFailure(_, e)) => e.errorRequest(StatusCodes.InternalServerError)
case Success(r: WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedSuccess) => completeResponse(StatusCodes.OK, toResponse(r.workflowId, WorkflowSubmitted), Seq.empty)
Expand Down Expand Up @@ -180,93 +172,93 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w
case Failure(e) => e.failRequest(StatusCodes.InternalServerError)
}
}
}

object CromwellApiService {
}

import spray.json._
object CromwellApiService {
import spray.json._

/**
* Sends a request to abort the workflow. Provides configurable success & error handlers to allow
* for different API endpoints to provide different effects in the appropriate situations, e.g. HTTP codes
* and error messages
*/
def abortWorkflow(possibleWorkflowId: String,
workflowStoreActor: ActorRef,
workflowManagerActor: ActorRef,
successHandler: PartialFunction[SuccessfulAbortResponse, Route] = standardAbortSuccessHandler,
errorHandler: PartialFunction[Throwable, Route] = standardAbortErrorHandler)
(implicit timeout: Timeout): Route = {
handleExceptions(ExceptionHandler(errorHandler)) {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(workflowId) =>
val response = workflowStoreActor.ask(WorkflowStoreActor.AbortWorkflowCommand(workflowId)).mapTo[AbortResponse]
onComplete(response) {
case Success(x: SuccessfulAbortResponse) => successHandler(x)
case Success(x: WorkflowAbortFailureResponse) => throw x.failure
case Failure(e) => throw e
}
case Failure(_) => throw InvalidWorkflowException(possibleWorkflowId)
}
/**
* Sends a request to abort the workflow. Provides configurable success & error handlers to allow
* for different API endpoints to provide different effects in the appropriate situations, e.g. HTTP codes
* and error messages
*/
def abortWorkflow(possibleWorkflowId: String,
workflowStoreActor: ActorRef,
workflowManagerActor: ActorRef,
successHandler: PartialFunction[SuccessfulAbortResponse, Route] = standardAbortSuccessHandler,
errorHandler: PartialFunction[Throwable, Route] = standardAbortErrorHandler)
(implicit timeout: Timeout): Route = {
handleExceptions(ExceptionHandler(errorHandler)) {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(workflowId) =>
val response = workflowStoreActor.ask(WorkflowStoreActor.AbortWorkflowCommand(workflowId)).mapTo[AbortResponse]
onComplete(response) {
case Success(x: SuccessfulAbortResponse) => successHandler(x)
case Success(x: WorkflowAbortFailureResponse) => throw x.failure
case Failure(e) => throw e
}
case Failure(_) => throw InvalidWorkflowException(possibleWorkflowId)
}
}
}

/**
* The abort success handler for typical cases, i.e. cromwell's API.
*/
private def standardAbortSuccessHandler: PartialFunction[SuccessfulAbortResponse, Route] = {
case WorkflowAbortedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborted.toString)))
case WorkflowAbortRequestedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborting.toString)))
}
/**
* The abort success handler for typical cases, i.e. cromwell's API.
*/
private def standardAbortSuccessHandler: PartialFunction[SuccessfulAbortResponse, Route] = {
case WorkflowAbortedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborted.toString)))
case WorkflowAbortRequestedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborting.toString)))
}

/**
* The abort error handler for typical cases, i.e. cromwell's API
*/
private def standardAbortErrorHandler: PartialFunction[Throwable, Route] = {
case e: InvalidWorkflowException => e.failRequest(StatusCodes.BadRequest)
case e: WorkflowNotFoundException => e.errorRequest(StatusCodes.NotFound)
case _: AskTimeoutException if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse
case e: TimeoutException => e.failRequest(StatusCodes.ServiceUnavailable)
case e: Exception => e.errorRequest(StatusCodes.InternalServerError)
}
/**
* The abort error handler for typical cases, i.e. cromwell's API
*/
private def standardAbortErrorHandler: PartialFunction[Throwable, Route] = {
case e: InvalidWorkflowException => e.failRequest(StatusCodes.BadRequest)
case e: WorkflowNotFoundException => e.errorRequest(StatusCodes.NotFound)
case _: AskTimeoutException if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse
case e: TimeoutException => e.failRequest(StatusCodes.ServiceUnavailable)
case e: Exception => e.errorRequest(StatusCodes.InternalServerError)
}

def validateWorkflowIdInMetadata(possibleWorkflowId: String,
serviceRegistryActor: ActorRef)
(implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadata(w)).mapTo[WorkflowValidationResponse] flatMap {
case RecognizedWorkflowId => Future.successful(w)
case UnrecognizedWorkflowId => validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor)
case FailedToCheckWorkflowId(t) => Future.failed(t)
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}
def validateWorkflowIdInMetadata(possibleWorkflowId: String,
serviceRegistryActor: ActorRef)
(implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadata(w)).mapTo[WorkflowValidationResponse] flatMap {
case RecognizedWorkflowId => Future.successful(w)
case UnrecognizedWorkflowId => validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor)
case FailedToCheckWorkflowId(t) => Future.failed(t)
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}
}

def validateWorkflowIdInMetadataSummaries(possibleWorkflowId: String,
serviceRegistryActor: ActorRef)
(implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadataSummaries(w)).mapTo[WorkflowValidationResponse] map {
case RecognizedWorkflowId => w
case UnrecognizedWorkflowId => throw UnrecognizedWorkflowException(w)
case FailedToCheckWorkflowId(t) => throw t
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}
def validateWorkflowIdInMetadataSummaries(possibleWorkflowId: String,
serviceRegistryActor: ActorRef)
(implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadataSummaries(w)).mapTo[WorkflowValidationResponse] map {
case RecognizedWorkflowId => w
case UnrecognizedWorkflowId => throw UnrecognizedWorkflowException(w)
case FailedToCheckWorkflowId(t) => throw t
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}
}

final case class BackendResponse(supportedBackends: List[String], defaultBackend: String)
final case class BackendResponse(supportedBackends: List[String], defaultBackend: String)

final case class UnrecognizedWorkflowException(id: WorkflowId) extends Exception(s"Unrecognized workflow ID: $id")
final case class UnrecognizedWorkflowException(id: WorkflowId) extends Exception(s"Unrecognized workflow ID: $id")

final case class InvalidWorkflowException(possibleWorkflowId: String) extends Exception(s"Invalid workflow ID: '$possibleWorkflowId'.")
final case class InvalidWorkflowException(possibleWorkflowId: String) extends Exception(s"Invalid workflow ID: '$possibleWorkflowId'.")

val cromwellVersion = VersionUtil.getVersion("cromwell-engine")
val swaggerUiVersion = VersionUtil.getVersion("swagger-ui", VersionUtil.sbtDependencyVersion("swaggerUi"))
val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name)
val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson))
val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable)
}
val cromwellVersion = VersionUtil.getVersion("cromwell-engine")
val swaggerUiVersion = VersionUtil.getVersion("swagger-ui", VersionUtil.sbtDependencyVersion("swaggerUi"))
val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name)
val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson))
val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ trait WesCromwellRouteSupport extends WebServiceUtils {
implicit val timeout: Timeout = duration

implicit def actorRefFactory: ActorRefFactory

implicit val materializer: ActorMaterializer
implicit val ec: ExecutionContext

Expand Down
Loading

0 comments on commit e802f29

Please sign in to comment.